You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by gu...@apache.org on 2019/06/11 14:18:11 UTC
[lucene-solr] branch master updated: SOLR-13439 - Adds ability to
locally cache collection properties for a specified duration.
This is an automated email from the ASF dual-hosted git repository.
gus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git
The following commit(s) were added to refs/heads/master by this push:
new abeecfa SOLR-13439 - Adds ability to locally cache collection properties for a specified duration.
abeecfa is described below
commit abeecfacd40e8bc345e9e4aa8e719fee143d2088
Author: Gus Heck <gu...@apache.org>
AuthorDate: Tue Jun 11 10:12:52 2019 -0400
SOLR-13439 - Adds ability to locally cache collection properties for a specified duration.
---
.../overseer/ZkCollectionPropsCachingTest.java | 100 ++++++++++++++
.../apache/solr/common/cloud/ZkStateReader.java | 151 +++++++++++++++++----
2 files changed, 226 insertions(+), 25 deletions(-)
diff --git a/solr/core/src/test/org/apache/solr/cloud/overseer/ZkCollectionPropsCachingTest.java b/solr/core/src/test/org/apache/solr/cloud/overseer/ZkCollectionPropsCachingTest.java
new file mode 100644
index 0000000..a765ada
--- /dev/null
+++ b/solr/core/src/test/org/apache/solr/cloud/overseer/ZkCollectionPropsCachingTest.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.cloud.overseer;
+
+import java.io.IOException;
+import java.lang.invoke.MethodHandles;
+
+import org.apache.lucene.util.LuceneTestCase;
+import org.apache.solr.SolrTestCaseJ4;
+import org.apache.solr.client.solrj.request.CollectionAdminRequest;
+import org.apache.solr.client.solrj.response.CollectionAdminResponse;
+import org.apache.solr.cloud.SolrCloudTestCase;
+import org.apache.solr.common.cloud.CollectionProperties;
+import org.apache.solr.common.cloud.ZkStateReader;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@LuceneTestCase.Slow
+@SolrTestCaseJ4.SuppressSSL
+public class ZkCollectionPropsCachingTest extends SolrCloudTestCase {
+ //
+ // NOTE: This class can only have one test because our test for caching is to nuke the SolrZkClient to
+ // verify that a cached load is going to hit the cache, not try to talk to zk. Any other ZK related test
+ // method in this class will fail if it runs after testReadWriteCached, so don't add one! :)
+ //
+ private String collectionName;
+ private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+ @BeforeClass
+ public static void setupClass() throws Exception {
+ Boolean useLegacyCloud = rarely();
+ log.info("Using legacyCloud?: {}", useLegacyCloud);
+
+ configureCluster(4)
+ .withProperty(ZkStateReader.LEGACY_CLOUD, String.valueOf(useLegacyCloud))
+ .addConfig("conf", configset("cloud-minimal"))
+ .configure();
+ }
+
+ @Before
+ @Override
+ public void setUp() throws Exception {
+ super.setUp();
+
+ collectionName = "CollectionPropsTest" + System.nanoTime();
+
+ CollectionAdminRequest.Create request = CollectionAdminRequest.createCollection(collectionName, "conf", 2, 2);
+ CollectionAdminResponse response = request.process(cluster.getSolrClient());
+ assertTrue("Unable to create collection: " + response.toString(), response.isSuccess());
+ }
+
+ @Test
+ public void testReadWriteCached() throws InterruptedException, IOException {
+ ZkStateReader zkStateReader = cluster.getSolrClient().getZkStateReader();
+
+ CollectionProperties collectionProps = new CollectionProperties(zkClient());
+
+ collectionProps.setCollectionProperty(collectionName, "property1", "value1");
+ checkValue("property1", "value1"); //Should be no cache, so the change should take effect immediately
+
+ zkStateReader.getCollectionProperties(collectionName,9000);
+ zkStateReader.getZkClient().close();
+ assertFalse(zkStateReader.isClosed());
+ checkValue("property1", "value1"); //Should be cached, so the change should not try to hit zk
+
+ Thread.sleep(10000); // test the timeout feature
+ try {
+ checkValue("property1", "value1"); //Should not be cached anymore
+ fail("cache should have expired, prev line should throw an exception trying to access zookeeper after closed");
+ } catch (Exception e) {
+ // expected, because we killed the client in zkStateReader.
+ }
+ }
+
+ private void checkValue(String propertyName, String expectedValue) throws InterruptedException {
+ final Object value = cluster.getSolrClient().getZkStateReader().getCollectionProperties(collectionName).get(propertyName);
+ assertEquals("Unexpected value for collection property: " + propertyName, expectedValue, value);
+ }
+
+
+
+}
diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java b/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java
index f00cdff..e09f89e 100644
--- a/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java
+++ b/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java
@@ -35,6 +35,7 @@ import java.util.TreeSet;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
@@ -171,6 +172,9 @@ public class ZkStateReader implements SolrCloseable {
/** Collection properties being actively watched */
private final ConcurrentHashMap<String, VersionedCollectionProps> watchedCollectionProps = new ConcurrentHashMap<>();
+ /** Collection properties being actively watched */
+ private final ConcurrentHashMap<String, PropsWatcher> collectionPropsWatchers = new ConcurrentHashMap<>();
+
private volatile SortedSet<String> liveNodes = emptySortedSet();
private volatile Map<String, Object> clusterProperties = Collections.emptyMap();
@@ -183,7 +187,8 @@ public class ZkStateReader implements SolrCloseable {
private ConcurrentHashMap<String, CollectionWatch<CollectionStateWatcher>> collectionWatches = new ConcurrentHashMap<>();
- private ConcurrentHashMap<String, CollectionWatch<CollectionPropsWatcher>> collectionPropsWatches = new ConcurrentHashMap<>();
+ // named this observers so there's less confusion between CollectionPropsWatcher map and the PropsWatcher map.
+ private ConcurrentHashMap<String, CollectionWatch<CollectionPropsWatcher>> collectionPropsObservers = new ConcurrentHashMap<>();
private Set<CloudCollectionsListener> cloudCollectionsListeners = ConcurrentHashMap.newKeySet();
@@ -198,6 +203,8 @@ public class ZkStateReader implements SolrCloseable {
private static final long LAZY_CACHE_TIME = TimeUnit.NANOSECONDS.convert(STATE_UPDATE_DELAY, TimeUnit.MILLISECONDS);
+ private Future<?> collectionPropsCacheCleaner; // only kept to identify if the cleaner has already been started.
+
/**
* Get current {@link AutoScalingConfig}.
* @return current configuration from <code>autoscaling.json</code>. NOTE:
@@ -484,8 +491,8 @@ public class ZkStateReader implements SolrCloseable {
securityData = getSecurityProps(true);
}
- collectionPropsWatches.forEach((k,v) -> {
- new PropsWatcher(k).refreshAndWatch(true);
+ collectionPropsObservers.forEach((k, v) -> {
+ collectionPropsWatchers.computeIfAbsent(k, PropsWatcher::new).refreshAndWatch(true);
});
}
@@ -1094,17 +1101,60 @@ public class ZkStateReader implements SolrCloseable {
/**
* Get collection properties for a given collection. If the collection is watched, simply return it from the cache,
- * otherwise fetch it directly from zookeeper.
+ * otherwise fetch it directly from zookeeper. This is a convenience for {@code getCollectionProperties(collection,0)}
+ *
+ * @param collection the collection for which properties are desired
+ * @return a map representing the key/value properties for the collection.
*/
public Map<String, String> getCollectionProperties(final String collection) {
+ return getCollectionProperties(collection,0);
+ }
+
+ /**
+ * Get and cache collection properties for a given collection. If the collection is watched, or still cached
+ * simply return it from the cache, otherwise fetch it directly from zookeeper and retain the value for at
+ * least cacheForMillis milliseconds. Cached properties are watched in zookeeper and updated automatically.
+ * This version of {@code getCollectionProperties} should be used when properties need to be consulted
+ * frequently in the absence of an active {@link CollectionPropsWatcher}.
+ *
+ * @param collection The collection for which properties are desired
+ * @param cacheForMillis The minimum number of milliseconds to maintain a cache for the specified collection's
+ * properties. Setting a {@code CollectionPropsWatcher} will override this value and retain
+ * the cache for the life of the watcher. A lack of changes in zookeeper may allow the
+ * caching to remain for a greater duration up to the cycle time of {@link CacheCleaner}.
+ * Passing zero for this value will explicitly remove the cached copy if and only if it is
+ * due to expire and no watch exists. Any positive value will extend the expiration time
+ * if required.
+ * @return a map representing the key/value properties for the collection.
+ */
+ public Map<String, String> getCollectionProperties(final String collection, long cacheForMillis) {
synchronized (watchedCollectionProps) { // making decisions based on the result of a get...
+ Watcher watcher = null;
+ if (cacheForMillis > 0) {
+ watcher = collectionPropsWatchers.compute(collection,
+ (c, w) -> w == null ? new PropsWatcher(c, cacheForMillis) : w.renew(cacheForMillis));
+ }
VersionedCollectionProps vprops = watchedCollectionProps.get(collection);
- Map<String, String> properties = vprops != null ? vprops.props : null;
- if (properties == null) {
+ boolean haveUnexpiredProps = vprops != null && vprops.cacheUntilNs > System.nanoTime();
+ long untilNs = System.nanoTime() + TimeUnit.NANOSECONDS.convert(cacheForMillis, TimeUnit.MILLISECONDS);
+ Map<String, String> properties;
+ if (haveUnexpiredProps) {
+ properties = vprops.props;
+ vprops.cacheUntilNs = Math.max(vprops.cacheUntilNs, untilNs);
+ } else {
try {
- // todo: maybe we want to store/watch since if someone's calling this it's probably going to get called again?
- // Not storing the value in watchedCollectionProps, because it can gat stale, since we have no watcher set.
- properties = fetchCollectionProperties(collection, null ).props;
+ VersionedCollectionProps vcp = fetchCollectionProperties(collection, watcher);
+ properties = vcp.props;
+ if (cacheForMillis > 0) {
+ vcp.cacheUntilNs = untilNs;
+ watchedCollectionProps.put(collection,vcp);
+ } else {
+ // we're synchronized on watchedCollectionProps and we can only get here if we have found an expired
+ // vprops above, so it is safe to remove the cached value and let the GC free up some mem a bit sooner.
+ if (!collectionPropsObservers.containsKey(collection)) {
+ watchedCollectionProps.remove(collection);
+ }
+ }
} catch (Exception e) {
throw new SolrException(ErrorCode.SERVER_ERROR, "Error reading collection properties", SolrZkClient.checkInterrupted(e));
}
@@ -1114,13 +1164,14 @@ public class ZkStateReader implements SolrCloseable {
}
private class VersionedCollectionProps {
- public VersionedCollectionProps(int zkVersion, Map<String, String> props) {
+ int zkVersion;
+ Map<String,String> props;
+ long cacheUntilNs = 0;
+
+ VersionedCollectionProps(int zkVersion, Map<String, String> props) {
this.zkVersion = zkVersion;
this.props = props;
}
-
- int zkVersion;
- Map<String,String> props;
}
static String getCollectionPropsPath(final String collection) {
@@ -1130,6 +1181,14 @@ public class ZkStateReader implements SolrCloseable {
@SuppressWarnings("unchecked")
private VersionedCollectionProps fetchCollectionProperties(String collection, Watcher watcher) throws KeeperException, InterruptedException {
final String znodePath = getCollectionPropsPath(collection);
+ // lazy init cache cleaner once we know someone is using collection properties.
+ if (collectionPropsCacheCleaner == null) {
+ synchronized (this) { // There can be only one! :)
+ if (collectionPropsCacheCleaner == null) {
+ collectionPropsCacheCleaner = notifications.submit(new CacheCleaner());
+ }
+ }
+ }
while (true) {
try {
Stat stat = new Stat();
@@ -1279,9 +1338,21 @@ public class ZkStateReader implements SolrCloseable {
/** Watches collection properties */
class PropsWatcher implements Watcher {
private final String coll;
+ private long watchUntilNs;
PropsWatcher(String coll) {
this.coll = coll;
+ watchUntilNs = 0;
+ }
+
+ PropsWatcher(String coll, long forMillis) {
+ this.coll = coll;
+ watchUntilNs = System.nanoTime() + TimeUnit.NANOSECONDS.convert(forMillis,TimeUnit.MILLISECONDS);
+ }
+
+ public PropsWatcher renew(long forMillis) {
+ watchUntilNs = System.nanoTime() + TimeUnit.NANOSECONDS.convert(forMillis,TimeUnit.MILLISECONDS);
+ return this;
}
@Override
@@ -1291,7 +1362,8 @@ public class ZkStateReader implements SolrCloseable {
return;
}
- if (!collectionPropsWatches.containsKey(coll)) {
+ boolean expired = System.nanoTime() > watchUntilNs;
+ if (!collectionPropsObservers.containsKey(coll) && expired) {
// No one can be notified of the change, we can ignore it and "unset" the watch
log.debug("Ignoring property change for collection {}", coll);
return;
@@ -1320,6 +1392,19 @@ public class ZkStateReader implements SolrCloseable {
if (notifyWatchers) {
notifyPropsWatchers(coll, properties);
}
+ if (vcp.zkVersion == -1 && existingVcp != null) { // Collection DELETE detected
+
+ // We should not be caching a collection that has been deleted.
+ watchedCollectionProps.remove(coll);
+
+ // core ref counting not relevant here, don't need canRemove(), we just sent
+ // a notification of an empty set of properties, no reason to watch what doesn't exist.
+ collectionPropsObservers.remove(coll);
+
+ // This is the one time we know it's safe to throw this out. We just failed to set the watch
+ // due to an NoNodeException, so it isn't held by ZK and can't re-set itself due to an update.
+ collectionPropsWatchers.remove(coll);
+ }
}
}
} catch (KeeperException.SessionExpiredException | KeeperException.ConnectionLossException e) {
@@ -1708,7 +1793,7 @@ public class ZkStateReader implements SolrCloseable {
public void registerCollectionPropsWatcher(final String collection, CollectionPropsWatcher propsWatcher) {
AtomicBoolean watchSet = new AtomicBoolean(false);
- collectionPropsWatches.compute(collection, (k, v) -> {
+ collectionPropsObservers.compute(collection, (k, v) -> {
if (v == null) {
v = new CollectionWatch<>();
watchSet.set(true);
@@ -1718,12 +1803,12 @@ public class ZkStateReader implements SolrCloseable {
});
if (watchSet.get()) {
- new PropsWatcher(collection).refreshAndWatch(false);
+ collectionPropsWatchers.computeIfAbsent(collection, PropsWatcher::new).refreshAndWatch(false);
}
}
public void removeCollectionPropsWatcher(String collection, CollectionPropsWatcher watcher) {
- collectionPropsWatches.compute(collection, (k, v) -> {
+ collectionPropsObservers.compute(collection, (k, v) -> {
if (v == null)
return null;
v.stateWatchers.remove(watcher);
@@ -1983,30 +2068,46 @@ public class ZkStateReader implements SolrCloseable {
private class PropsNotification implements Runnable {
- final String collection;
- final Map<String, String> collectionProperties;
+ private final String collection;
+ private final Map<String, String> collectionProperties;
+ private final List<CollectionPropsWatcher> watchers = new ArrayList<>();
private PropsNotification(String collection, Map<String, String> collectionProperties) {
this.collection = collection;
this.collectionProperties = collectionProperties;
- }
-
- @Override
- public void run() {
- List<CollectionPropsWatcher> watchers = new ArrayList<>();
- collectionPropsWatches.compute(collection, (k, v) -> {
+ // guarantee delivery of notification regardless of what happens to collectionPropsObservers
+ // while we wait our turn in the executor by capturing the list on creation.
+ collectionPropsObservers.compute(collection, (k, v) -> {
if (v == null)
return null;
watchers.addAll(v.stateWatchers);
return v;
});
+ }
+
+ @Override
+ public void run() {
for (CollectionPropsWatcher watcher : watchers) {
if (watcher.onStateChanged(collectionProperties)) {
removeCollectionPropsWatcher(collection, watcher);
}
}
}
+ }
+ private class CacheCleaner implements Runnable {
+ public void run() {
+ while (!Thread.interrupted()) {
+ try {
+ Thread.sleep(60000);
+ } catch (InterruptedException e) {
+ // Executor shutdown will send us an interrupt
+ break;
+ }
+ watchedCollectionProps.entrySet().removeIf(entry ->
+ entry.getValue().cacheUntilNs < System.nanoTime() && !collectionPropsObservers.containsKey(entry.getKey()));
+ }
+ }
}
}