You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by sh...@apache.org on 2015/08/25 15:13:49 UTC
svn commit: r1697670 - in /lucene/dev/branches/branch_5x: ./ solr/
solr/core/ solr/core/src/java/org/apache/solr/cloud/
solr/core/src/java/org/apache/solr/cloud/overseer/
solr/core/src/test/org/apache/solr/cloud/
solr/core/src/test/org/apache/solr/clou...
Author: shalin
Date: Tue Aug 25 13:13:48 2015
New Revision: 1697670
URL: http://svn.apache.org/r1697670
Log:
SOLR-6629: Watch /collections zk node on all nodes
Modified:
lucene/dev/branches/branch_5x/ (props changed)
lucene/dev/branches/branch_5x/solr/ (props changed)
lucene/dev/branches/branch_5x/solr/CHANGES.txt (contents, props changed)
lucene/dev/branches/branch_5x/solr/core/ (props changed)
lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionMessageHandler.java
lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/cloud/ZkController.java
lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/cloud/overseer/ZkStateWriter.java
lucene/dev/branches/branch_5x/solr/core/src/test/org/apache/solr/cloud/OverseerTest.java
lucene/dev/branches/branch_5x/solr/core/src/test/org/apache/solr/cloud/overseer/ZkStateReaderTest.java
lucene/dev/branches/branch_5x/solr/solrj/ (props changed)
lucene/dev/branches/branch_5x/solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudSolrClient.java
lucene/dev/branches/branch_5x/solr/solrj/src/java/org/apache/solr/common/cloud/ClusterState.java
lucene/dev/branches/branch_5x/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java
Modified: lucene/dev/branches/branch_5x/solr/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_5x/solr/CHANGES.txt?rev=1697670&r1=1697669&r2=1697670&view=diff
==============================================================================
--- lucene/dev/branches/branch_5x/solr/CHANGES.txt (original)
+++ lucene/dev/branches/branch_5x/solr/CHANGES.txt Tue Aug 25 13:13:48 2015
@@ -84,6 +84,10 @@ Optimizations
message processing performance by ~470%.
(Noble Paul, Scott Blum, shalin)
+* SOLR-6629: Watch /collections zk node on all nodes so that cluster state updates
+ are more efficient especially when cluster has a mix of collections in stateFormat=1
+ and stateFormat=2. (Scott Blum, shalin)
+
Other Changes
----------------------
Modified: lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionMessageHandler.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionMessageHandler.java?rev=1697670&r1=1697669&r2=1697670&view=diff
==============================================================================
--- lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionMessageHandler.java (original)
+++ lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionMessageHandler.java Tue Aug 25 13:13:48 2015
@@ -2006,7 +2006,7 @@ public class OverseerCollectionMessageHa
boolean created = false;
while (! waitUntil.hasTimedOut()) {
Thread.sleep(100);
- created = zkStateReader.getClusterState().getCollections().contains(collectionName);
+ created = zkStateReader.getClusterState().hasCollection(collectionName);
if(created) break;
}
if (!created)
Modified: lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/cloud/ZkController.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/cloud/ZkController.java?rev=1697670&r1=1697669&r2=1697670&view=diff
==============================================================================
--- lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/cloud/ZkController.java (original)
+++ lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/cloud/ZkController.java Tue Aug 25 13:13:48 2015
@@ -638,8 +638,9 @@ public final class ZkController {
cmdExecutor.ensureExists(ZkStateReader.LIVE_NODES_ZKNODE, zkClient);
cmdExecutor.ensureExists(ZkStateReader.COLLECTIONS_ZKNODE, zkClient);
cmdExecutor.ensureExists(ZkStateReader.ALIASES, zkClient);
- cmdExecutor.ensureExists(ZkStateReader.CLUSTER_STATE, zkClient);
- cmdExecutor.ensureExists(ZkStateReader.SOLR_SECURITY_CONF_PATH,"{}".getBytes(StandardCharsets.UTF_8),CreateMode.PERSISTENT, zkClient);
+ byte[] emptyJson = "{}".getBytes(StandardCharsets.UTF_8);
+ cmdExecutor.ensureExists(ZkStateReader.CLUSTER_STATE, emptyJson, CreateMode.PERSISTENT, zkClient);
+ cmdExecutor.ensureExists(ZkStateReader.SOLR_SECURITY_CONF_PATH, emptyJson, CreateMode.PERSISTENT, zkClient);
}
private void init(CurrentCoreDescriptorProvider registerOnReconnect) {
Modified: lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/cloud/overseer/ZkStateWriter.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/cloud/overseer/ZkStateWriter.java?rev=1697670&r1=1697669&r2=1697670&view=diff
==============================================================================
--- lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/cloud/overseer/ZkStateWriter.java (original)
+++ lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/cloud/overseer/ZkStateWriter.java Tue Aug 25 13:13:48 2015
@@ -125,14 +125,24 @@ public class ZkStateWriter {
callback.onEnqueue();
}
+ /*
+ We need to know if the collection has moved from stateFormat=1 to stateFormat=2 (as a result of MIGRATECLUSTERSTATE)
+ */
+ DocCollection previousCollection = prevState.getCollectionOrNull(cmd.name);
+ boolean wasPreviouslyStateFormat1 = previousCollection != null && previousCollection.getStateFormat() == 1;
+ boolean isCurrentlyStateFormat1 = cmd.collection != null && cmd.collection.getStateFormat() == 1;
+
if (cmd.collection == null) {
- isClusterStateModified = true;
+ if (wasPreviouslyStateFormat1) {
+ isClusterStateModified = true;
+ }
clusterState = prevState.copyWith(cmd.name, null);
updates.put(cmd.name, null);
} else {
- if (cmd.collection.getStateFormat() > 1) {
+ if (!isCurrentlyStateFormat1) {
updates.put(cmd.name, cmd.collection);
- } else {
+ }
+ if (isCurrentlyStateFormat1 || wasPreviouslyStateFormat1) {
isClusterStateModified = true;
}
clusterState = prevState.copyWith(cmd.name, cmd.collection);
@@ -211,6 +221,7 @@ public class ZkStateWriter {
if (c == null) {
// let's clean up the collections path for this collection
+ log.info("going to delete_collection {}", path);
reader.getZkClient().clean("/collections/" + name);
} else if (c.getStateFormat() > 1) {
byte[] data = Utils.toJSON(singletonMap(c.getName(), c));
@@ -225,7 +236,6 @@ public class ZkStateWriter {
reader.getZkClient().create(path, data, CreateMode.PERSISTENT, true);
DocCollection newCollection = new DocCollection(name, c.getSlicesMap(), c.getProperties(), c.getRouter(), 0, path);
clusterState = clusterState.copyWith(name, newCollection);
- isClusterStateModified = true;
}
} else if (c.getStateFormat() == 1) {
isClusterStateModified = true;
@@ -237,7 +247,6 @@ public class ZkStateWriter {
if (isClusterStateModified) {
assert clusterState.getZkClusterStateVersion() >= 0;
- lastUpdatedTime = System.nanoTime();
byte[] data = Utils.toJSON(clusterState);
Stat stat = reader.getZkClient().setData(ZkStateReader.CLUSTER_STATE, data, clusterState.getZkClusterStateVersion(), true);
Set<String> collectionNames = clusterState.getCollections();
@@ -249,6 +258,7 @@ public class ZkStateWriter {
clusterState = new ClusterState(stat.getVersion(), reader.getClusterState().getLiveNodes(), collectionStates);
isClusterStateModified = false;
}
+ lastUpdatedTime = System.nanoTime();
success = true;
} catch (KeeperException.BadVersionException bve) {
// this is a tragic error, we must disallow usage of this instance
Modified: lucene/dev/branches/branch_5x/solr/core/src/test/org/apache/solr/cloud/OverseerTest.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_5x/solr/core/src/test/org/apache/solr/cloud/OverseerTest.java?rev=1697670&r1=1697669&r2=1697670&view=diff
==============================================================================
--- lucene/dev/branches/branch_5x/solr/core/src/test/org/apache/solr/cloud/OverseerTest.java (original)
+++ lucene/dev/branches/branch_5x/solr/core/src/test/org/apache/solr/cloud/OverseerTest.java Tue Aug 25 13:13:48 2015
@@ -680,7 +680,7 @@ public class OverseerTest extends SolrTe
while (version == getClusterStateVersion(zkClient));
Thread.sleep(500);
assertFalse("collection1 should be gone after publishing the null state",
- reader.getClusterState().getCollections().contains(collection));
+ reader.getClusterState().hasCollection(collection));
} finally {
close(mockController);
close(overseerClient);
Modified: lucene/dev/branches/branch_5x/solr/core/src/test/org/apache/solr/cloud/overseer/ZkStateReaderTest.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_5x/solr/core/src/test/org/apache/solr/cloud/overseer/ZkStateReaderTest.java?rev=1697670&r1=1697669&r2=1697670&view=diff
==============================================================================
--- lucene/dev/branches/branch_5x/solr/core/src/test/org/apache/solr/cloud/overseer/ZkStateReaderTest.java (original)
+++ lucene/dev/branches/branch_5x/solr/core/src/test/org/apache/solr/cloud/overseer/ZkStateReaderTest.java Tue Aug 25 13:13:48 2015
@@ -39,15 +39,25 @@ public class ZkStateReaderTest extends S
/** Uses explicit refresh to ensure latest changes are visible. */
public void testStateFormatUpdateWithExplicitRefresh() throws Exception {
- testStateFormatUpdate(true);
+ testStateFormatUpdate(true, true);
+ }
+
+ /** Uses explicit refresh to ensure latest changes are visible. */
+ public void testStateFormatUpdateWithExplicitRefreshLazy() throws Exception {
+ testStateFormatUpdate(true, false);
}
/** ZkStateReader should automatically pick up changes based on ZK watches. */
public void testStateFormatUpdateWithTimeDelay() throws Exception {
- testStateFormatUpdate(false);
+ testStateFormatUpdate(false, true);
}
- public void testStateFormatUpdate(boolean explicitRefresh) throws Exception {
+ /** ZkStateReader should automatically pick up changes based on ZK watches. */
+ public void testStateFormatUpdateWithTimeDelayLazy() throws Exception {
+ testStateFormatUpdate(false, false);
+ }
+
+ public void testStateFormatUpdate(boolean explicitRefresh, boolean isInteresting) throws Exception {
String zkDir = createTempDir("testStateFormatUpdate").toFile().getAbsolutePath();
ZkTestServer server = new ZkTestServer(zkDir);
@@ -64,7 +74,9 @@ public class ZkStateReaderTest extends S
ZkStateReader reader = new ZkStateReader(zkClient);
reader.createClusterStateWatchersAndUpdate();
- int trackedStateVersion = reader.getClusterState().getZkClusterStateVersion();
+ if (isInteresting) {
+ reader.addCollectionWatch("c1");
+ }
ZkStateWriter writer = new ZkStateWriter(reader, new Overseer.Stats());
@@ -82,7 +94,16 @@ public class ZkStateReaderTest extends S
boolean exists = zkClient.exists(ZkStateReader.COLLECTIONS_ZKNODE + "/c1/state.json", true);
assertFalse(exists);
- trackedStateVersion = refresh(reader, trackedStateVersion, explicitRefresh);
+ if (explicitRefresh) {
+ reader.updateClusterState();
+ } else {
+ for (int i = 0; i < 100; ++i) {
+ if (reader.getClusterState().hasCollection("c1")) {
+ break;
+ }
+ Thread.sleep(50);
+ }
+ }
DocCollection collection = reader.getClusterState().getCollection("c1");
assertEquals(1, collection.getStateFormat());
@@ -101,7 +122,16 @@ public class ZkStateReaderTest extends S
boolean exists = zkClient.exists(ZkStateReader.COLLECTIONS_ZKNODE + "/c1/state.json", true);
assertTrue(exists);
- trackedStateVersion = refresh(reader, trackedStateVersion, explicitRefresh);
+ if (explicitRefresh) {
+ reader.updateClusterState();
+ } else {
+ for (int i = 0; i < 100; ++i) {
+ if (reader.getClusterState().getCollection("c1").getStateFormat() == 2) {
+ break;
+ }
+ Thread.sleep(50);
+ }
+ }
DocCollection collection = reader.getClusterState().getCollection("c1");
assertEquals(2, collection.getStateFormat());
@@ -138,7 +168,7 @@ public class ZkStateReaderTest extends S
new DocCollection("c1", new HashMap<String, Slice>(), new HashMap<String, Object>(), DocRouter.DEFAULT, 0, ZkStateReader.COLLECTIONS_ZKNODE + "/c1/state.json"));
writer.enqueueUpdate(reader.getClusterState(), c1, null);
writer.writePendingUpdates();
- refresh(reader, 0, true);
+ reader.updateClusterState();
assertTrue(reader.getClusterState().getCollectionRef("c1").isLazilyLoaded());
reader.addCollectionWatch("c1");
@@ -151,20 +181,4 @@ public class ZkStateReaderTest extends S
server.shutdown();
}
}
-
- private static int refresh(ZkStateReader reader, int trackedStateVersion, boolean explicitRefresh) throws KeeperException, InterruptedException {
- if (explicitRefresh) {
- reader.updateClusterState();
- return reader.getClusterState().getZkClusterStateVersion();
- }
- for (int i = 0; i < 100; ++i) {
- // Loop until we observe the change.
- int newStateVersion = reader.getClusterState().getZkClusterStateVersion();
- if (newStateVersion > trackedStateVersion) {
- return newStateVersion;
- }
- Thread.sleep(100);
- }
- throw new AssertionError("Did not observe expected update");
- }
}
Modified: lucene/dev/branches/branch_5x/solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudSolrClient.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_5x/solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudSolrClient.java?rev=1697670&r1=1697669&r2=1697670&view=diff
==============================================================================
--- lucene/dev/branches/branch_5x/solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudSolrClient.java (original)
+++ lucene/dev/branches/branch_5x/solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudSolrClient.java Tue Aug 25 13:13:48 2015
@@ -1094,7 +1094,7 @@ public class CloudSolrClient extends Sol
Set<String> collectionNames = new HashSet<>();
// validate collections
for (String collectionName : rawCollectionsList) {
- if (!clusterState.getCollections().contains(collectionName)) {
+ if (!clusterState.hasCollection(collectionName)) {
Aliases aliases = zkStateReader.getAliases();
String alias = aliases.getCollectionAlias(collectionName);
if (alias != null) {
Modified: lucene/dev/branches/branch_5x/solr/solrj/src/java/org/apache/solr/common/cloud/ClusterState.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_5x/solr/solrj/src/java/org/apache/solr/common/cloud/ClusterState.java?rev=1697670&r1=1697669&r2=1697670&view=diff
==============================================================================
--- lucene/dev/branches/branch_5x/solr/solrj/src/java/org/apache/solr/common/cloud/ClusterState.java (original)
+++ lucene/dev/branches/branch_5x/solr/solrj/src/java/org/apache/solr/common/cloud/ClusterState.java Tue Aug 25 13:13:48 2015
@@ -17,13 +17,6 @@ package org.apache.solr.common.cloud;
* limitations under the License.
*/
-import org.apache.solr.common.SolrException;
-import org.apache.solr.common.SolrException.ErrorCode;
-import org.apache.solr.common.util.Utils;
-import org.noggit.JSONWriter;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
@@ -33,6 +26,14 @@ import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
+import org.apache.solr.common.SolrException;
+import org.apache.solr.common.SolrException.ErrorCode;
+import org.apache.solr.common.util.Utils;
+import org.apache.zookeeper.Watcher;
+import org.noggit.JSONWriter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
/**
* Immutable state of the cloud. Normally you can get the state by using
* {@link ZkStateReader#getClusterState()}.
@@ -110,8 +111,16 @@ public class ClusterState implements JSO
return null;
}
- public boolean hasCollection(String coll) {
- return collectionStates.containsKey(coll) ;
+ /**
+ * Returns true if the specified collection name exists, false otherwise.
+ *
+ * Implementation note: This method resolves the collection reference by calling
+ * {@link CollectionRef#get()} which can make a call to ZooKeeper. This is necessary
+ * because the semantics of how collection list is loaded have changed in SOLR-6629.
+ * Please javadocs in {@link ZkStateReader#refreshCollectionList(Watcher)}
+ */
+ public boolean hasCollection(String collectionName) {
+ return getCollectionOrNull(collectionName) != null;
}
/**
@@ -170,19 +179,38 @@ public class ClusterState implements JSO
return collectionStates.get(coll);
}
- public DocCollection getCollectionOrNull(String coll) {
- CollectionRef ref = collectionStates.get(coll);
- return ref == null? null:ref.get();
+ /**
+ * Returns the corresponding {@link DocCollection} object for the given collection name
+ * if such a collection exists. Returns null otherwise.
+ *
+ * Implementation note: This method resolves the collection reference by calling
+ * {@link CollectionRef#get()} which can make a call to ZooKeeper. This is necessary
+ * because the semantics of how collection list is loaded have changed in SOLR-6629.
+ * Please javadocs in {@link ZkStateReader#refreshCollectionList(Watcher)}
+ */
+ public DocCollection getCollectionOrNull(String collectionName) {
+ CollectionRef ref = collectionStates.get(collectionName);
+ return ref == null ? null : ref.get();
}
/**
* Get collection names.
+ *
+ * Implementation note: This method resolves the collection reference by calling
+ * {@link CollectionRef#get()} which can make a call to ZooKeeper. This is necessary
+ * because the semantics of how collection list is loaded have changed in SOLR-6629.
+ * Please javadocs in {@link ZkStateReader#refreshCollectionList(Watcher)}
*/
public Set<String> getCollections() {
- return collectionStates.keySet();
+ Set<String> result = new HashSet<>();
+ for (Entry<String, CollectionRef> entry : collectionStates.entrySet()) {
+ if (entry.getValue().get() != null) {
+ result.add(entry.getKey());
+ }
+ }
+ return result;
}
-
/**
* Get names of the currently live nodes.
*/
Modified: lucene/dev/branches/branch_5x/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_5x/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java?rev=1697670&r1=1697669&r2=1697670&view=diff
==============================================================================
--- lucene/dev/branches/branch_5x/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java (original)
+++ lucene/dev/branches/branch_5x/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java Tue Aug 25 13:13:48 2015
@@ -24,7 +24,6 @@ import java.net.URLDecoder;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
-import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
@@ -122,7 +121,7 @@ public class ZkStateReader implements Cl
private final ConcurrentHashMap<String, DocCollection> watchedCollectionStates = new ConcurrentHashMap<String, DocCollection>();
/** Collections with format2 state.json, not "interesting" and not actively watched. */
- private volatile Map<String, ClusterState.CollectionRef> lazyCollectionStates = new HashMap<>();
+ private final ConcurrentHashMap<String, LazyCollectionRef> lazyCollectionStates = new ConcurrentHashMap<String, LazyCollectionRef>();
private volatile Set<String> liveNodes = emptySet();
@@ -262,7 +261,7 @@ public class ZkStateReader implements Cl
DocCollection newState = fetchCollectionState(coll, null);
updateWatchedCollection(coll, newState);
}
- refreshLazyFormat2Collections(true);
+ refreshCollectionList(null);
refreshLiveNodes(null);
constructState();
}
@@ -314,7 +313,7 @@ public class ZkStateReader implements Cl
// on reconnect of SolrZkClient force refresh and re-add watches.
refreshLegacyClusterState(new LegacyClusterStateWatcher());
refreshStateFormat2Collections();
- refreshLazyFormat2Collections(true);
+ refreshCollectionList(new CollectionsChildWatcher());
refreshLiveNodes(new LiveNodeWatcher());
synchronized (ZkStateReader.this.getUpdateLock()) {
@@ -451,7 +450,7 @@ public class ZkStateReader implements Cl
}
// Finally, add any lazy collections that aren't already accounted for.
- for (Map.Entry<String, ClusterState.CollectionRef> entry : lazyCollectionStates.entrySet()) {
+ for (Map.Entry<String, LazyCollectionRef> entry : lazyCollectionStates.entrySet()) {
if (!result.containsKey(entry.getKey())) {
result.put(entry.getKey(), entry.getValue());
}
@@ -501,78 +500,69 @@ public class ZkStateReader implements Cl
/**
* Search for any lazy-loadable state format2 collections.
+ *
+ * A stateFormat=1 collection which is not interesting to us can also
+ * be put into the {@link #lazyCollectionStates} map here. But that is okay
+ * because {@link #constructState()} will give priority to collections in the
+ * shared collection state over this map.
+ * In fact this is a clever way to avoid doing a ZK exists check on
+ * the /collections/collection_name/state.json znode
+ * Such an exists check is done in {@link ClusterState#hasCollection(String)} and
+ * {@link ClusterState#getCollections()} method as a safeguard against exposing wrong collection names to the users
*/
- private void refreshLazyFormat2Collections(boolean fullRefresh) throws KeeperException, InterruptedException {
+ private void refreshCollectionList(Watcher watcher) throws KeeperException, InterruptedException {
List<String> children = null;
try {
- children = zkClient.getChildren(COLLECTIONS_ZKNODE, null, true);
+ children = zkClient.getChildren(COLLECTIONS_ZKNODE, watcher, true);
} catch (KeeperException.NoNodeException e) {
log.warn("Error fetching collection names");
// fall through
}
if (children == null || children.isEmpty()) {
- synchronized (getUpdateLock()) {
- this.lazyCollectionStates = new HashMap<>();
- }
+ lazyCollectionStates.clear();
return;
}
- Map<String, ClusterState.CollectionRef> result = new HashMap<>();
- for (String collName : children) {
- if (interestingCollections.contains(collName)) {
- // We will create an eager collection for any interesting collections.
- continue;
- }
+ // Don't mess with watchedCollections, they should self-manage.
- if (!fullRefresh) {
- // Try to use an already-created lazy collection if it's not a full refresh.
- ClusterState.CollectionRef existing = lazyCollectionStates.get(collName);
- if (existing != null) {
- result.put(collName, existing);
- continue;
+ // First, drop any children that disappeared.
+ this.lazyCollectionStates.keySet().retainAll(children);
+ for (String coll : children) {
+ // We will create an eager collection for any interesting collections, so don't add to lazy.
+ if (!interestingCollections.contains(coll)) {
+ // Double check contains just to avoid allocating an object.
+ LazyCollectionRef existing = lazyCollectionStates.get(coll);
+ if (existing == null) {
+ lazyCollectionStates.putIfAbsent(coll, new LazyCollectionRef(coll));
}
}
-
- ClusterState.CollectionRef lazyCollectionState = tryMakeLazyCollectionStateFormat2(collName);
- if (lazyCollectionState != null) {
- result.put(collName, lazyCollectionState);
- }
- }
-
- synchronized (getUpdateLock()) {
- this.lazyCollectionStates = result;
}
}
- private ClusterState.CollectionRef tryMakeLazyCollectionStateFormat2(final String collName) {
- boolean exists = false;
- try {
- exists = zkClient.exists(getCollectionPath(collName), true);
- } catch (Exception e) {
- log.warn("Error reading collections nodes", e);
- }
- if (!exists) {
- return null;
+ private class LazyCollectionRef extends ClusterState.CollectionRef {
+
+ private final String collName;
+
+ public LazyCollectionRef(String collName) {
+ super(null);
+ this.collName = collName;
}
- // if it is not collection, then just create a reference which can fetch
- // the collection object just in time from ZK
- return new ClusterState.CollectionRef(null) {
- @Override
- public DocCollection get() {
- return getCollectionLive(ZkStateReader.this, collName);
- }
+ @Override
+ public DocCollection get() {
+ // TODO: consider limited caching
+ return getCollectionLive(ZkStateReader.this, collName);
+ }
- @Override
- public boolean isLazilyLoaded() {
- return true;
- }
+ @Override
+ public boolean isLazilyLoaded() {
+ return true;
+ }
- @Override
- public String toString() {
- return "lazy DocCollection(" + collName + ")";
- }
- };
+ @Override
+ public String toString() {
+ return "LazyCollectionRef(" + collName + ")";
+ }
}
/**
@@ -931,9 +921,6 @@ public class ZkStateReader implements Cl
public void refreshAndWatch() {
try {
refreshLegacyClusterState(this);
- // Changes to clusterstate.json signal global state changes.
- // TODO: get rid of clusterstate.json as a signaling mechanism.
- refreshLazyFormat2Collections(false);
} catch (KeeperException.NoNodeException e) {
throw new SolrException(ErrorCode.SERVICE_UNAVAILABLE,
"Cannot connect to cluster at " + zkClient.getZkServerAddress() + ": cluster not found/not ready");
@@ -954,6 +941,44 @@ public class ZkStateReader implements Cl
}
}
+ /** Watches /collections children . */
+ class CollectionsChildWatcher implements Watcher {
+
+ @Override
+ public void process(WatchedEvent event) {
+ // session events are not change events,
+ // and do not remove the watcher
+ if (EventType.None.equals(event.getType())) {
+ return;
+ }
+ log.info("A collections change: {}, has occurred - updating...", (event));
+ refreshAndWatch();
+ synchronized (getUpdateLock()) {
+ constructState();
+ }
+ }
+
+ /** Must hold {@link #getUpdateLock()} before calling this method. */
+ public void refreshAndWatch() {
+ try {
+ refreshCollectionList(this);
+ } catch (KeeperException e) {
+ if (e.code() == KeeperException.Code.SESSIONEXPIRED
+ || e.code() == KeeperException.Code.CONNECTIONLOSS) {
+ log.warn("ZooKeeper watch triggered, but Solr cannot talk to ZK");
+ return;
+ }
+ log.error("", e);
+ throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR,
+ "", e);
+ } catch (InterruptedException e) {
+ // Restore the interrupted status
+ Thread.currentThread().interrupt();
+ log.warn("", e);
+ }
+ }
+ }
+
/** Watches the live_nodes and syncs changes. */
class LiveNodeWatcher implements Watcher {
@@ -968,7 +993,6 @@ public class ZkStateReader implements Cl
refreshAndWatch();
}
- /** Must hold {@link #getUpdateLock()} before calling this method. */
public void refreshAndWatch() {
try {
refreshLiveNodes(this);
@@ -1073,13 +1097,8 @@ public class ZkStateReader implements Cl
log.info("Removing watch for uninteresting collection {}", coll);
interestingCollections.remove(coll);
watchedCollectionStates.remove(coll);
- ClusterState.CollectionRef lazyCollectionStateFormat2 = tryMakeLazyCollectionStateFormat2(coll);
+ lazyCollectionStates.put(coll, new LazyCollectionRef(coll));
synchronized (getUpdateLock()) {
- if (lazyCollectionStateFormat2 != null) {
- this.lazyCollectionStates.put(coll, lazyCollectionStateFormat2);
- } else {
- this.lazyCollectionStates.remove(coll);
- }
constructState();
}
}