You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by sh...@apache.org on 2015/08/25 15:13:49 UTC

svn commit: r1697670 - in /lucene/dev/branches/branch_5x: ./ solr/ solr/core/ solr/core/src/java/org/apache/solr/cloud/ solr/core/src/java/org/apache/solr/cloud/overseer/ solr/core/src/test/org/apache/solr/cloud/ solr/core/src/test/org/apache/solr/clou...

Author: shalin
Date: Tue Aug 25 13:13:48 2015
New Revision: 1697670

URL: http://svn.apache.org/r1697670
Log:
SOLR-6629: Watch /collections zk node on all nodes

Modified:
    lucene/dev/branches/branch_5x/   (props changed)
    lucene/dev/branches/branch_5x/solr/   (props changed)
    lucene/dev/branches/branch_5x/solr/CHANGES.txt   (contents, props changed)
    lucene/dev/branches/branch_5x/solr/core/   (props changed)
    lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionMessageHandler.java
    lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/cloud/ZkController.java
    lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/cloud/overseer/ZkStateWriter.java
    lucene/dev/branches/branch_5x/solr/core/src/test/org/apache/solr/cloud/OverseerTest.java
    lucene/dev/branches/branch_5x/solr/core/src/test/org/apache/solr/cloud/overseer/ZkStateReaderTest.java
    lucene/dev/branches/branch_5x/solr/solrj/   (props changed)
    lucene/dev/branches/branch_5x/solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudSolrClient.java
    lucene/dev/branches/branch_5x/solr/solrj/src/java/org/apache/solr/common/cloud/ClusterState.java
    lucene/dev/branches/branch_5x/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java

Modified: lucene/dev/branches/branch_5x/solr/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_5x/solr/CHANGES.txt?rev=1697670&r1=1697669&r2=1697670&view=diff
==============================================================================
--- lucene/dev/branches/branch_5x/solr/CHANGES.txt (original)
+++ lucene/dev/branches/branch_5x/solr/CHANGES.txt Tue Aug 25 13:13:48 2015
@@ -84,6 +84,10 @@ Optimizations
   message processing performance by ~470%.
   (Noble Paul, Scott Blum, shalin)
 
+* SOLR-6629: Watch /collections zk node on all nodes so that cluster state updates
+  are more efficient especially when cluster has a mix of collections in stateFormat=1
+  and stateFormat=2. (Scott Blum, shalin)
+
 Other Changes
 ----------------------
 

Modified: lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionMessageHandler.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionMessageHandler.java?rev=1697670&r1=1697669&r2=1697670&view=diff
==============================================================================
--- lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionMessageHandler.java (original)
+++ lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionMessageHandler.java Tue Aug 25 13:13:48 2015
@@ -2006,7 +2006,7 @@ public class OverseerCollectionMessageHa
       boolean created = false;
       while (! waitUntil.hasTimedOut()) {
         Thread.sleep(100);
-        created = zkStateReader.getClusterState().getCollections().contains(collectionName);
+        created = zkStateReader.getClusterState().hasCollection(collectionName);
         if(created) break;
       }
       if (!created)

Modified: lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/cloud/ZkController.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/cloud/ZkController.java?rev=1697670&r1=1697669&r2=1697670&view=diff
==============================================================================
--- lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/cloud/ZkController.java (original)
+++ lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/cloud/ZkController.java Tue Aug 25 13:13:48 2015
@@ -638,8 +638,9 @@ public final class ZkController {
     cmdExecutor.ensureExists(ZkStateReader.LIVE_NODES_ZKNODE, zkClient);
     cmdExecutor.ensureExists(ZkStateReader.COLLECTIONS_ZKNODE, zkClient);
     cmdExecutor.ensureExists(ZkStateReader.ALIASES, zkClient);
-    cmdExecutor.ensureExists(ZkStateReader.CLUSTER_STATE, zkClient);
-    cmdExecutor.ensureExists(ZkStateReader.SOLR_SECURITY_CONF_PATH,"{}".getBytes(StandardCharsets.UTF_8),CreateMode.PERSISTENT, zkClient);
+    byte[] emptyJson = "{}".getBytes(StandardCharsets.UTF_8);
+    cmdExecutor.ensureExists(ZkStateReader.CLUSTER_STATE, emptyJson, CreateMode.PERSISTENT, zkClient);
+    cmdExecutor.ensureExists(ZkStateReader.SOLR_SECURITY_CONF_PATH, emptyJson, CreateMode.PERSISTENT, zkClient);
   }
 
   private void init(CurrentCoreDescriptorProvider registerOnReconnect) {

Modified: lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/cloud/overseer/ZkStateWriter.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/cloud/overseer/ZkStateWriter.java?rev=1697670&r1=1697669&r2=1697670&view=diff
==============================================================================
--- lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/cloud/overseer/ZkStateWriter.java (original)
+++ lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/cloud/overseer/ZkStateWriter.java Tue Aug 25 13:13:48 2015
@@ -125,14 +125,24 @@ public class ZkStateWriter {
       callback.onEnqueue();
     }
 
+    /*
+    We need to know if the collection has moved from stateFormat=1 to stateFormat=2 (as a result of MIGRATECLUSTERSTATE)
+     */
+    DocCollection previousCollection = prevState.getCollectionOrNull(cmd.name);
+    boolean wasPreviouslyStateFormat1 = previousCollection != null && previousCollection.getStateFormat() == 1;
+    boolean isCurrentlyStateFormat1 = cmd.collection != null && cmd.collection.getStateFormat() == 1;
+
     if (cmd.collection == null) {
-      isClusterStateModified = true;
+      if (wasPreviouslyStateFormat1) {
+        isClusterStateModified = true;
+      }
       clusterState = prevState.copyWith(cmd.name, null);
       updates.put(cmd.name, null);
     } else {
-      if (cmd.collection.getStateFormat() > 1) {
+      if (!isCurrentlyStateFormat1) {
         updates.put(cmd.name, cmd.collection);
-      } else {
+      }
+      if (isCurrentlyStateFormat1 || wasPreviouslyStateFormat1) {
         isClusterStateModified = true;
       }
       clusterState = prevState.copyWith(cmd.name, cmd.collection);
@@ -211,6 +221,7 @@ public class ZkStateWriter {
 
           if (c == null) {
             // let's clean up the collections path for this collection
+            log.info("going to delete_collection {}", path);
             reader.getZkClient().clean("/collections/" + name);
           } else if (c.getStateFormat() > 1) {
             byte[] data = Utils.toJSON(singletonMap(c.getName(), c));
@@ -225,7 +236,6 @@ public class ZkStateWriter {
               reader.getZkClient().create(path, data, CreateMode.PERSISTENT, true);
               DocCollection newCollection = new DocCollection(name, c.getSlicesMap(), c.getProperties(), c.getRouter(), 0, path);
               clusterState = clusterState.copyWith(name, newCollection);
-              isClusterStateModified = true;
             }
           } else if (c.getStateFormat() == 1) {
             isClusterStateModified = true;
@@ -237,7 +247,6 @@ public class ZkStateWriter {
 
       if (isClusterStateModified) {
         assert clusterState.getZkClusterStateVersion() >= 0;
-        lastUpdatedTime = System.nanoTime();
         byte[] data = Utils.toJSON(clusterState);
         Stat stat = reader.getZkClient().setData(ZkStateReader.CLUSTER_STATE, data, clusterState.getZkClusterStateVersion(), true);
         Set<String> collectionNames = clusterState.getCollections();
@@ -249,6 +258,7 @@ public class ZkStateWriter {
         clusterState = new ClusterState(stat.getVersion(), reader.getClusterState().getLiveNodes(), collectionStates);
         isClusterStateModified = false;
       }
+      lastUpdatedTime = System.nanoTime();
       success = true;
     } catch (KeeperException.BadVersionException bve) {
       // this is a tragic error, we must disallow usage of this instance

Modified: lucene/dev/branches/branch_5x/solr/core/src/test/org/apache/solr/cloud/OverseerTest.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_5x/solr/core/src/test/org/apache/solr/cloud/OverseerTest.java?rev=1697670&r1=1697669&r2=1697670&view=diff
==============================================================================
--- lucene/dev/branches/branch_5x/solr/core/src/test/org/apache/solr/cloud/OverseerTest.java (original)
+++ lucene/dev/branches/branch_5x/solr/core/src/test/org/apache/solr/cloud/OverseerTest.java Tue Aug 25 13:13:48 2015
@@ -680,7 +680,7 @@ public class OverseerTest extends SolrTe
       while (version == getClusterStateVersion(zkClient));
       Thread.sleep(500);
       assertFalse("collection1 should be gone after publishing the null state",
-          reader.getClusterState().getCollections().contains(collection));
+          reader.getClusterState().hasCollection(collection));
     } finally {
       close(mockController);
       close(overseerClient);

Modified: lucene/dev/branches/branch_5x/solr/core/src/test/org/apache/solr/cloud/overseer/ZkStateReaderTest.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_5x/solr/core/src/test/org/apache/solr/cloud/overseer/ZkStateReaderTest.java?rev=1697670&r1=1697669&r2=1697670&view=diff
==============================================================================
--- lucene/dev/branches/branch_5x/solr/core/src/test/org/apache/solr/cloud/overseer/ZkStateReaderTest.java (original)
+++ lucene/dev/branches/branch_5x/solr/core/src/test/org/apache/solr/cloud/overseer/ZkStateReaderTest.java Tue Aug 25 13:13:48 2015
@@ -39,15 +39,25 @@ public class ZkStateReaderTest extends S
 
   /** Uses explicit refresh to ensure latest changes are visible. */
   public void testStateFormatUpdateWithExplicitRefresh() throws Exception {
-    testStateFormatUpdate(true);
+    testStateFormatUpdate(true, true);
+  }
+
+  /** Uses explicit refresh to ensure latest changes are visible. */
+  public void testStateFormatUpdateWithExplicitRefreshLazy() throws Exception {
+    testStateFormatUpdate(true, false);
   }
 
   /** ZkStateReader should automatically pick up changes based on ZK watches. */
   public void testStateFormatUpdateWithTimeDelay() throws Exception {
-    testStateFormatUpdate(false);
+    testStateFormatUpdate(false, true);
   }
 
-  public void testStateFormatUpdate(boolean explicitRefresh) throws Exception {
+  /** ZkStateReader should automatically pick up changes based on ZK watches. */
+  public void testStateFormatUpdateWithTimeDelayLazy() throws Exception {
+    testStateFormatUpdate(false, false);
+  }
+
+  public void testStateFormatUpdate(boolean explicitRefresh, boolean isInteresting) throws Exception {
     String zkDir = createTempDir("testStateFormatUpdate").toFile().getAbsolutePath();
 
     ZkTestServer server = new ZkTestServer(zkDir);
@@ -64,7 +74,9 @@ public class ZkStateReaderTest extends S
 
       ZkStateReader reader = new ZkStateReader(zkClient);
       reader.createClusterStateWatchersAndUpdate();
-      int trackedStateVersion = reader.getClusterState().getZkClusterStateVersion();
+      if (isInteresting) {
+        reader.addCollectionWatch("c1");
+      }
 
       ZkStateWriter writer = new ZkStateWriter(reader, new Overseer.Stats());
 
@@ -82,7 +94,16 @@ public class ZkStateReaderTest extends S
         boolean exists = zkClient.exists(ZkStateReader.COLLECTIONS_ZKNODE + "/c1/state.json", true);
         assertFalse(exists);
 
-        trackedStateVersion = refresh(reader, trackedStateVersion, explicitRefresh);
+        if (explicitRefresh) {
+          reader.updateClusterState();
+        } else {
+          for (int i = 0; i < 100; ++i) {
+            if (reader.getClusterState().hasCollection("c1")) {
+              break;
+            }
+            Thread.sleep(50);
+          }
+        }
 
         DocCollection collection = reader.getClusterState().getCollection("c1");
         assertEquals(1, collection.getStateFormat());
@@ -101,7 +122,16 @@ public class ZkStateReaderTest extends S
         boolean exists = zkClient.exists(ZkStateReader.COLLECTIONS_ZKNODE + "/c1/state.json", true);
         assertTrue(exists);
 
-        trackedStateVersion = refresh(reader, trackedStateVersion, explicitRefresh);
+        if (explicitRefresh) {
+          reader.updateClusterState();
+        } else {
+          for (int i = 0; i < 100; ++i) {
+            if (reader.getClusterState().getCollection("c1").getStateFormat() == 2) {
+              break;
+            }
+            Thread.sleep(50);
+          }
+        }
 
         DocCollection collection = reader.getClusterState().getCollection("c1");
         assertEquals(2, collection.getStateFormat());
@@ -138,7 +168,7 @@ public class ZkStateReaderTest extends S
           new DocCollection("c1", new HashMap<String, Slice>(), new HashMap<String, Object>(), DocRouter.DEFAULT, 0, ZkStateReader.COLLECTIONS_ZKNODE + "/c1/state.json"));
       writer.enqueueUpdate(reader.getClusterState(), c1, null);
       writer.writePendingUpdates();
-      refresh(reader, 0, true);
+      reader.updateClusterState();
 
       assertTrue(reader.getClusterState().getCollectionRef("c1").isLazilyLoaded());
       reader.addCollectionWatch("c1");
@@ -151,20 +181,4 @@ public class ZkStateReaderTest extends S
       server.shutdown();
     }
   }
-
-  private static int refresh(ZkStateReader reader, int trackedStateVersion, boolean explicitRefresh) throws KeeperException, InterruptedException {
-    if (explicitRefresh) {
-      reader.updateClusterState();
-      return reader.getClusterState().getZkClusterStateVersion();
-    }
-    for (int i = 0; i < 100; ++i) {
-      // Loop until we observe the change.
-      int newStateVersion = reader.getClusterState().getZkClusterStateVersion();
-      if (newStateVersion > trackedStateVersion) {
-        return newStateVersion;
-      }
-      Thread.sleep(100);
-    }
-    throw new AssertionError("Did not observe expected update");
-  }
 }

Modified: lucene/dev/branches/branch_5x/solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudSolrClient.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_5x/solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudSolrClient.java?rev=1697670&r1=1697669&r2=1697670&view=diff
==============================================================================
--- lucene/dev/branches/branch_5x/solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudSolrClient.java (original)
+++ lucene/dev/branches/branch_5x/solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudSolrClient.java Tue Aug 25 13:13:48 2015
@@ -1094,7 +1094,7 @@ public class CloudSolrClient extends Sol
     Set<String> collectionNames = new HashSet<>();
     // validate collections
     for (String collectionName : rawCollectionsList) {
-      if (!clusterState.getCollections().contains(collectionName)) {
+      if (!clusterState.hasCollection(collectionName)) {
         Aliases aliases = zkStateReader.getAliases();
         String alias = aliases.getCollectionAlias(collectionName);
         if (alias != null) {

Modified: lucene/dev/branches/branch_5x/solr/solrj/src/java/org/apache/solr/common/cloud/ClusterState.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_5x/solr/solrj/src/java/org/apache/solr/common/cloud/ClusterState.java?rev=1697670&r1=1697669&r2=1697670&view=diff
==============================================================================
--- lucene/dev/branches/branch_5x/solr/solrj/src/java/org/apache/solr/common/cloud/ClusterState.java (original)
+++ lucene/dev/branches/branch_5x/solr/solrj/src/java/org/apache/solr/common/cloud/ClusterState.java Tue Aug 25 13:13:48 2015
@@ -17,13 +17,6 @@ package org.apache.solr.common.cloud;
  * limitations under the License.
  */
 
-import org.apache.solr.common.SolrException;
-import org.apache.solr.common.SolrException.ErrorCode;
-import org.apache.solr.common.util.Utils;
-import org.noggit.JSONWriter;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
@@ -33,6 +26,14 @@ import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
 
+import org.apache.solr.common.SolrException;
+import org.apache.solr.common.SolrException.ErrorCode;
+import org.apache.solr.common.util.Utils;
+import org.apache.zookeeper.Watcher;
+import org.noggit.JSONWriter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 /**
  * Immutable state of the cloud. Normally you can get the state by using
  * {@link ZkStateReader#getClusterState()}.
@@ -110,8 +111,16 @@ public class ClusterState implements JSO
     return null;
   }
 
-  public boolean hasCollection(String coll) {
-    return  collectionStates.containsKey(coll) ;
+  /**
+   * Returns true if the specified collection name exists, false otherwise.
+   *
+   * Implementation note: This method resolves the collection reference by calling
+   * {@link CollectionRef#get()} which can make a call to ZooKeeper. This is necessary
+   * because the semantics of how collection list is loaded have changed in SOLR-6629.
+   * Please javadocs in {@link ZkStateReader#refreshCollectionList(Watcher)}
+   */
+  public boolean hasCollection(String collectionName) {
+    return getCollectionOrNull(collectionName) != null;
   }
 
   /**
@@ -170,19 +179,38 @@ public class ClusterState implements JSO
     return  collectionStates.get(coll);
   }
 
-  public DocCollection getCollectionOrNull(String coll) {
-    CollectionRef ref = collectionStates.get(coll);
-    return ref == null? null:ref.get();
+  /**
+   * Returns the corresponding {@link DocCollection} object for the given collection name
+   * if such a collection exists. Returns null otherwise.
+   *
+   * Implementation note: This method resolves the collection reference by calling
+   * {@link CollectionRef#get()} which can make a call to ZooKeeper. This is necessary
+   * because the semantics of how collection list is loaded have changed in SOLR-6629.
+   * Please javadocs in {@link ZkStateReader#refreshCollectionList(Watcher)}
+   */
+  public DocCollection getCollectionOrNull(String collectionName) {
+    CollectionRef ref = collectionStates.get(collectionName);
+    return ref == null ? null : ref.get();
   }
 
   /**
    * Get collection names.
+   *
+   * Implementation note: This method resolves the collection reference by calling
+   * {@link CollectionRef#get()} which can make a call to ZooKeeper. This is necessary
+   * because the semantics of how collection list is loaded have changed in SOLR-6629.
+   * Please javadocs in {@link ZkStateReader#refreshCollectionList(Watcher)}
    */
   public Set<String> getCollections() {
-    return collectionStates.keySet();
+    Set<String> result = new HashSet<>();
+    for (Entry<String, CollectionRef> entry : collectionStates.entrySet()) {
+      if (entry.getValue().get() != null) {
+        result.add(entry.getKey());
+      }
+    }
+    return result;
   }
 
-
   /**
    * Get names of the currently live nodes.
    */

Modified: lucene/dev/branches/branch_5x/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_5x/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java?rev=1697670&r1=1697669&r2=1697670&view=diff
==============================================================================
--- lucene/dev/branches/branch_5x/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java (original)
+++ lucene/dev/branches/branch_5x/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java Tue Aug 25 13:13:48 2015
@@ -24,7 +24,6 @@ import java.net.URLDecoder;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
-import java.util.HashMap;
 import java.util.HashSet;
 import java.util.LinkedHashMap;
 import java.util.List;
@@ -122,7 +121,7 @@ public class ZkStateReader implements Cl
   private final ConcurrentHashMap<String, DocCollection> watchedCollectionStates = new ConcurrentHashMap<String, DocCollection>();
 
   /** Collections with format2 state.json, not "interesting" and not actively watched. */
-  private volatile Map<String, ClusterState.CollectionRef> lazyCollectionStates = new HashMap<>();
+  private final ConcurrentHashMap<String, LazyCollectionRef> lazyCollectionStates = new ConcurrentHashMap<String, LazyCollectionRef>();
 
   private volatile Set<String> liveNodes = emptySet();
 
@@ -262,7 +261,7 @@ public class ZkStateReader implements Cl
         DocCollection newState = fetchCollectionState(coll, null);
         updateWatchedCollection(coll, newState);
       }
-      refreshLazyFormat2Collections(true);
+      refreshCollectionList(null);
       refreshLiveNodes(null);
       constructState();
     }
@@ -314,7 +313,7 @@ public class ZkStateReader implements Cl
     // on reconnect of SolrZkClient force refresh and re-add watches.
     refreshLegacyClusterState(new LegacyClusterStateWatcher());
     refreshStateFormat2Collections();
-    refreshLazyFormat2Collections(true);
+    refreshCollectionList(new CollectionsChildWatcher());
     refreshLiveNodes(new LiveNodeWatcher());
 
     synchronized (ZkStateReader.this.getUpdateLock()) {
@@ -451,7 +450,7 @@ public class ZkStateReader implements Cl
     }
 
     // Finally, add any lazy collections that aren't already accounted for.
-    for (Map.Entry<String, ClusterState.CollectionRef> entry : lazyCollectionStates.entrySet()) {
+    for (Map.Entry<String, LazyCollectionRef> entry : lazyCollectionStates.entrySet()) {
       if (!result.containsKey(entry.getKey())) {
         result.put(entry.getKey(), entry.getValue());
       }
@@ -501,78 +500,69 @@ public class ZkStateReader implements Cl
 
   /**
    * Search for any lazy-loadable state format2 collections.
+   *
+   * A stateFormat=1 collection which is not interesting to us can also
+   * be put into the {@link #lazyCollectionStates} map here. But that is okay
+   * because {@link #constructState()} will give priority to collections in the
+   * shared collection state over this map.
+   * In fact this is a clever way to avoid doing a ZK exists check on
+   * the /collections/collection_name/state.json znode
+   * Such an exists check is done in {@link ClusterState#hasCollection(String)} and
+   * {@link ClusterState#getCollections()} method as a safeguard against exposing wrong collection names to the users
    */
-  private void refreshLazyFormat2Collections(boolean fullRefresh) throws KeeperException, InterruptedException {
+  private void refreshCollectionList(Watcher watcher) throws KeeperException, InterruptedException {
     List<String> children = null;
     try {
-      children = zkClient.getChildren(COLLECTIONS_ZKNODE, null, true);
+      children = zkClient.getChildren(COLLECTIONS_ZKNODE, watcher, true);
     } catch (KeeperException.NoNodeException e) {
       log.warn("Error fetching collection names");
       // fall through
     }
     if (children == null || children.isEmpty()) {
-      synchronized (getUpdateLock()) {
-        this.lazyCollectionStates = new HashMap<>();
-      }
+      lazyCollectionStates.clear();
       return;
     }
 
-    Map<String, ClusterState.CollectionRef> result = new HashMap<>();
-    for (String collName : children) {
-      if (interestingCollections.contains(collName)) {
-        // We will create an eager collection for any interesting collections.
-        continue;
-      }
+    // Don't mess with watchedCollections, they should self-manage.
 
-      if (!fullRefresh) {
-        // Try to use an already-created lazy collection if it's not a full refresh.
-        ClusterState.CollectionRef existing = lazyCollectionStates.get(collName);
-        if (existing != null) {
-          result.put(collName, existing);
-          continue;
+    // First, drop any children that disappeared.
+    this.lazyCollectionStates.keySet().retainAll(children);
+    for (String coll : children) {
+      // We will create an eager collection for any interesting collections, so don't add to lazy.
+      if (!interestingCollections.contains(coll)) {
+        // Double check contains just to avoid allocating an object.
+        LazyCollectionRef existing = lazyCollectionStates.get(coll);
+        if (existing == null) {
+          lazyCollectionStates.putIfAbsent(coll, new LazyCollectionRef(coll));
         }
       }
-
-      ClusterState.CollectionRef lazyCollectionState = tryMakeLazyCollectionStateFormat2(collName);
-      if (lazyCollectionState != null) {
-        result.put(collName, lazyCollectionState);
-      }
-    }
-
-    synchronized (getUpdateLock()) {
-      this.lazyCollectionStates = result;
     }
   }
 
-  private ClusterState.CollectionRef tryMakeLazyCollectionStateFormat2(final String collName) {
-    boolean exists = false;
-    try {
-      exists = zkClient.exists(getCollectionPath(collName), true);
-    } catch (Exception e) {
-      log.warn("Error reading collections nodes", e);
-    }
-    if (!exists) {
-      return null;
+  private class LazyCollectionRef extends ClusterState.CollectionRef {
+
+    private final String collName;
+
+    public LazyCollectionRef(String collName) {
+      super(null);
+      this.collName = collName;
     }
 
-    // if it is not collection, then just create a reference which can fetch
-    // the collection object just in time from ZK
-    return new ClusterState.CollectionRef(null) {
-      @Override
-      public DocCollection get() {
-        return getCollectionLive(ZkStateReader.this, collName);
-      }
+    @Override
+    public DocCollection get() {
+      // TODO: consider limited caching
+      return getCollectionLive(ZkStateReader.this, collName);
+    }
 
-      @Override
-      public boolean isLazilyLoaded() {
-        return true;
-      }
+    @Override
+    public boolean isLazilyLoaded() {
+      return true;
+    }
 
-      @Override
-      public String toString() {
-        return "lazy DocCollection(" + collName + ")";
-      }
-    };
+    @Override
+    public String toString() {
+      return "LazyCollectionRef(" + collName + ")";
+    }
   }
 
   /**
@@ -931,9 +921,6 @@ public class ZkStateReader implements Cl
     public void refreshAndWatch() {
       try {
         refreshLegacyClusterState(this);
-        // Changes to clusterstate.json signal global state changes.
-        // TODO: get rid of clusterstate.json as a signaling mechanism.
-        refreshLazyFormat2Collections(false);
       } catch (KeeperException.NoNodeException e) {
         throw new SolrException(ErrorCode.SERVICE_UNAVAILABLE,
                 "Cannot connect to cluster at " + zkClient.getZkServerAddress() + ": cluster not found/not ready");
@@ -954,6 +941,44 @@ public class ZkStateReader implements Cl
     }
   }
 
+  /** Watches /collections children . */
+  class CollectionsChildWatcher implements Watcher {
+
+    @Override
+    public void process(WatchedEvent event) {
+      // session events are not change events,
+      // and do not remove the watcher
+      if (EventType.None.equals(event.getType())) {
+        return;
+      }
+      log.info("A collections change: {}, has occurred - updating...", (event));
+      refreshAndWatch();
+      synchronized (getUpdateLock()) {
+        constructState();
+      }
+    }
+
+    /** Must hold {@link #getUpdateLock()} before calling this method. */
+    public void refreshAndWatch() {
+      try {
+        refreshCollectionList(this);
+      } catch (KeeperException e) {
+        if (e.code() == KeeperException.Code.SESSIONEXPIRED
+            || e.code() == KeeperException.Code.CONNECTIONLOSS) {
+          log.warn("ZooKeeper watch triggered, but Solr cannot talk to ZK");
+          return;
+        }
+        log.error("", e);
+        throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR,
+            "", e);
+      } catch (InterruptedException e) {
+        // Restore the interrupted status
+        Thread.currentThread().interrupt();
+        log.warn("", e);
+      }
+    }
+  }
+
   /** Watches the live_nodes and syncs changes. */
   class LiveNodeWatcher implements Watcher {
 
@@ -968,7 +993,6 @@ public class ZkStateReader implements Cl
       refreshAndWatch();
     }
 
-    /** Must hold {@link #getUpdateLock()} before calling this method. */
     public void refreshAndWatch() {
       try {
         refreshLiveNodes(this);
@@ -1073,13 +1097,8 @@ public class ZkStateReader implements Cl
     log.info("Removing watch for uninteresting collection {}", coll);
     interestingCollections.remove(coll);
     watchedCollectionStates.remove(coll);
-    ClusterState.CollectionRef lazyCollectionStateFormat2 = tryMakeLazyCollectionStateFormat2(coll);
+    lazyCollectionStates.put(coll, new LazyCollectionRef(coll));
     synchronized (getUpdateLock()) {
-      if (lazyCollectionStateFormat2 != null) {
-        this.lazyCollectionStates.put(coll, lazyCollectionStateFormat2);
-      } else {
-        this.lazyCollectionStates.remove(coll);
-      }
       constructState();
     }
   }