You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by dr...@apache.org on 2016/04/21 01:13:41 UTC

[1/2] lucene-solr:branch_5_5: SOLR-8914: ZkStateReader's refreshLiveNodes(Watcher) is not thread safe.

Repository: lucene-solr
Updated Branches:
  refs/heads/branch_5_5 b22c42ae5 -> 802ee6f8c


SOLR-8914: ZkStateReader's refreshLiveNodes(Watcher) is not thread safe.


Project: http://git-wip-us.apache.org/repos/asf/lucene-solr/repo
Commit: http://git-wip-us.apache.org/repos/asf/lucene-solr/commit/f45fae5c
Tree: http://git-wip-us.apache.org/repos/asf/lucene-solr/tree/f45fae5c
Diff: http://git-wip-us.apache.org/repos/asf/lucene-solr/diff/f45fae5c

Branch: refs/heads/branch_5_5
Commit: f45fae5c81e68763c99826524bec8b9273dfefd4
Parents: b22c42a
Author: markrmiller <ma...@apache.org>
Authored: Wed Apr 13 09:54:40 2016 -0400
Committer: Scott Blum <dr...@apache.org>
Committed: Wed Apr 20 18:07:24 2016 -0400

----------------------------------------------------------------------
 solr/CHANGES.txt                                |   3 +
 .../apache/solr/cloud/TestStressLiveNodes.java  | 252 +++++++++++++++++++
 .../apache/solr/common/cloud/ZkStateReader.java |  86 ++++---
 3 files changed, 313 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/f45fae5c/solr/CHANGES.txt
----------------------------------------------------------------------
diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt
index aade056..017b837 100644
--- a/solr/CHANGES.txt
+++ b/solr/CHANGES.txt
@@ -72,6 +72,9 @@ Bug Fixes
 * SOLR-8416: The collections create API should return after all replicas are active.
   (Michael Sun, Mark Miller, Alexey Serba)
 
+* SOLR-8914: ZkStateReader's refreshLiveNodes(Watcher) is not thread safe. (Scott Blum, hoss,
+  sarowe, Erick Erickson, Mark Miller, shalin)
+
 ======================= 5.5.0 =======================
 
 Consult the LUCENE_CHANGES.txt file for additional, low level, changes in this release

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/f45fae5c/solr/core/src/test/org/apache/solr/cloud/TestStressLiveNodes.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/TestStressLiveNodes.java b/solr/core/src/test/org/apache/solr/cloud/TestStressLiveNodes.java
new file mode 100644
index 0000000..466e344
--- /dev/null
+++ b/solr/core/src/test/org/apache/solr/cloud/TestStressLiveNodes.java
@@ -0,0 +1,252 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.cloud;
+
+import java.lang.invoke.MethodHandles;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.lucene.util.LuceneTestCase.Slow;
+import org.apache.lucene.util.TestUtil;
+import org.apache.solr.client.solrj.embedded.JettyConfig;
+import org.apache.solr.client.solrj.impl.CloudSolrClient;
+import org.apache.solr.common.cloud.SolrZkClient;
+import org.apache.solr.common.cloud.ZkStateReader;
+import org.apache.solr.common.util.ExecutorUtil;
+import org.apache.solr.util.AbstractSolrTestCase;
+import org.apache.solr.util.DefaultSolrThreadFactory;
+
+import org.apache.zookeeper.CreateMode;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Stress test LiveNodes watching.
+ *
+ * Does bursts of adds to live_nodes using parallel threads to and verifies that after each 
+ * burst a ZkStateReader detects the correct set.
+ */
+@Slow
+public class TestStressLiveNodes extends AbstractSolrTestCase {
+
+  private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+  private static MiniSolrCloudCluster CLUSTER;
+
+  /** A basic cloud client, we'll be testing the behavior of it's ZkStateReader */
+  private static CloudSolrClient CLOUD_CLIENT;
+  
+  /** The addr of the zk server used in this test */
+  private static String ZK_SERVER_ADDR;
+
+  /* how many seconds we're willing to wait for our executor tasks to finish before failing the test */
+  private final static int WAIT_TIME = TEST_NIGHTLY ? 60 : 30;
+
+  @BeforeClass
+  private static void createMiniSolrCloudCluster() throws Exception {
+    JettyConfig.Builder jettyConfig = JettyConfig.builder();
+    jettyConfig.waitForLoadingCoresToFinish(null);
+    CLUSTER = new MiniSolrCloudCluster(1, createTempDir(), jettyConfig.build());
+
+    CLOUD_CLIENT = CLUSTER.getSolrClient();
+    CLOUD_CLIENT.connect(); // force connection even though we aren't sending any requests
+
+    ZK_SERVER_ADDR = CLUSTER.getZkServer().getZkAddress();
+  }
+
+  @AfterClass
+  private static void shutdownMiniSolrCloudCluster() throws Exception {
+    CLUSTER.shutdown();
+  }
+
+  private static SolrZkClient newSolrZkClient() {
+    assertNotNull(ZK_SERVER_ADDR);
+    // WTF is CloudConfigBuilder.DEFAULT_ZK_CLIENT_TIMEOUT private?
+    return new SolrZkClient(ZK_SERVER_ADDR, 15000);
+  }
+
+  /** returns the true set of live nodes (currently in zk) as a sorted list */
+  private static List<String> getTrueLiveNodesFromZk() throws Exception {
+    SolrZkClient client = newSolrZkClient();
+    try {
+      ArrayList<String> result = new ArrayList<>(client.getChildren(ZkStateReader.LIVE_NODES_ZKNODE, null, true));
+      Collections.sort(result);
+      return result;
+    } finally {
+      client.close();
+    }
+  }
+
+  /** 
+   * returns the cached set of live nodes (according to the ZkStateReader in our CloudSolrClient) 
+   * as a sorted list. 
+   * This is done in a sleep+retry loop until the result matches the expectedCount, or a few iters have passed
+   * (this way we aren't testing how fast the watchers complete, just that they got the correct result)
+   */
+  private static List<String> getCachedLiveNodesFromLocalState(final int expectedCount) throws Exception {
+    ArrayList<String> result = null;
+
+    for (int i = 0; i < 10; i++) {
+      result = new ArrayList<>(CLOUD_CLIENT.getZkStateReader().getClusterState().getLiveNodes());
+      if (expectedCount != result.size()) {
+        log.info("sleeping #{} to give watchers a chance to finish: {} != {}",
+                 i, expectedCount, result.size());
+        Thread.sleep(200);
+      } else {
+        break;
+      }
+    }
+    if (expectedCount != result.size()) {
+      log.error("gave up waiting for live nodes to match expected size: {} != {}",
+                expectedCount, result.size());
+    }
+    Collections.sort(result);
+    return result;
+  }
+  
+  public void testStress() throws Exception {
+
+    // do many iters, so we have "bursts" of adding nodes that we then check
+    final int numIters = atLeast(1000);
+    for (int iter = 0; iter < numIters; iter++) {
+
+      // sanity check that ZK says there is in fact 1 live node
+      List<String> actualLiveNodes = getTrueLiveNodesFromZk();
+      assertEquals("iter"+iter+": " + actualLiveNodes.toString(),
+                   1, actualLiveNodes.size());
+
+      // only here do we forcibly update the cached live nodes so we don't have to wait for it to catch up
+      // with all the ephemeral nodes that vanished after the last iteration
+      CLOUD_CLIENT.getZkStateReader().updateLiveNodes();
+      
+      // sanity check that our Cloud Client's local state knows about the 1 (real) live node in our cluster
+      List<String> cachedLiveNodes = getCachedLiveNodesFromLocalState(actualLiveNodes.size());
+      assertEquals("iter"+iter+" " + actualLiveNodes.size() + " != " + cachedLiveNodes.size(),
+                   actualLiveNodes, cachedLiveNodes);
+      
+      
+      // start spining up some threads to add some live_node children in parallel
+
+      // we don't need a lot of threads or nodes (we don't want to swamp the CPUs
+      // just bursts of conccurent adds) but we do want to randomize it a bit so we increase the
+      // odds of concurrent watchers firing regardless of the num CPUs or load on the machine running
+      // the test (but we deliberately don't look at availableProcessors() since we want randomization
+      // consistency across all machines for a given seed)
+      final int numThreads = TestUtil.nextInt(random(), 2, 5);
+      
+      // use same num for all thrashers, to increase likely hood of them all competing
+      // (diff random number would mean heavy concurency only for ~ the first N=lowest num requetss)
+      //
+      // this does not need to be a large number -- in fact, the higher it is, the more
+      // likely we are to see a mistake in early watcher triggers get "corrected" by a later one
+      // and overlook a possible bug
+      final int numNodesPerThrasher = TestUtil.nextInt(random(), 1, 5);
+      
+      log.info("preparing parallel adds to live nodes: iter={}, numThreads={} numNodesPerThread={}",
+               iter, numThreads, numNodesPerThrasher);
+      
+      // NOTE: using ephemeral nodes
+      // so we can't close any of these thrashers until we are done with our assertions
+      final List<LiveNodeTrasher> thrashers = new ArrayList<>(numThreads);
+      for (int i = 0; i < numThreads; i++) {
+        thrashers.add(new LiveNodeTrasher("T"+iter+"_"+i, numNodesPerThrasher));
+      }
+      try {
+        final ExecutorService executorService = ExecutorUtil.newMDCAwareFixedThreadPool
+          (thrashers.size()+1, new DefaultSolrThreadFactory("test_live_nodes_thrasher_iter"+iter));
+        
+        executorService.invokeAll(thrashers);
+        executorService.shutdown();
+        if (! executorService.awaitTermination(WAIT_TIME, TimeUnit.SECONDS)) {
+          for (LiveNodeTrasher thrasher : thrashers) {
+            thrasher.stop();
+          }
+        }
+        assertTrue("iter"+iter+": thrashers didn't finish even after explicitly stopping",
+                   executorService.awaitTermination(WAIT_TIME, TimeUnit.SECONDS));
+
+        // sanity check the *real* live_nodes entries from ZK match what the thrashers added
+        int totalAdded = 1; // 1 real live node when we started
+        for (LiveNodeTrasher thrasher : thrashers) {
+          totalAdded += thrasher.getNumAdded();
+        }
+        actualLiveNodes = getTrueLiveNodesFromZk();
+        assertEquals("iter"+iter, totalAdded, actualLiveNodes.size());
+        
+        // verify our local client knows the correct set of live nodes
+        cachedLiveNodes = getCachedLiveNodesFromLocalState(actualLiveNodes.size());
+        assertEquals("iter"+iter+" " + actualLiveNodes.size() + " != " + cachedLiveNodes.size(),
+                     actualLiveNodes, cachedLiveNodes);
+        
+      } finally {
+        for (LiveNodeTrasher thrasher : thrashers) {
+          // shutdown our zk connection, freeing our ephemeral nodes
+          thrasher.close();
+        }
+      }
+    }
+  }
+
+  /** NOTE: has internal counter which is not thread safe, only call() in one thread at a time */
+  public static final class LiveNodeTrasher implements Callable<Integer> {
+    private final String id;
+    private final int numNodesToAdd;
+    private final SolrZkClient client;
+
+    private boolean running = false;;
+    private int numAdded = 0;
+    
+    /** ID should ideally be unique amonst any other instances */
+    public LiveNodeTrasher(String id, int numNodesToAdd) {
+      this.id = id;
+      this.numNodesToAdd = numNodesToAdd;
+      this.client = newSolrZkClient();
+    }
+    /** returns the number of nodes actually added w/o error */
+    public Integer call() {
+      running = true;
+      // NOTE: test includes 'running'
+      for (int i = 0; running && i < numNodesToAdd; i++) {
+        final String nodePath = ZkStateReader.LIVE_NODES_ZKNODE + "/thrasher-" + id + "-" + i;
+        try {
+          client.makePath(nodePath, CreateMode.EPHEMERAL, true);
+          numAdded++;
+        } catch (Exception e) {
+          log.error("failed to create: " + nodePath, e);
+        }
+      }
+      return numAdded;
+    }
+    public int getNumAdded() {
+      return numAdded;
+    }
+    public void close() {
+      client.close();
+    }
+    public void stop() {
+      running = false;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/f45fae5c/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java b/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java
index 497f15c..6a9209c 100644
--- a/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java
+++ b/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java
@@ -38,6 +38,7 @@ import java.util.Map.Entry;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
 
 import org.apache.solr.common.Callable;
 import org.apache.solr.common.SolrException;
@@ -444,6 +445,10 @@ public class ZkStateReader implements Closeable {
       final ClusterState loadedData = ClusterState.load(stat.getVersion(), data, Collections.<String> emptySet(),
           CLUSTER_STATE);
       synchronized (getUpdateLock()) {
+        if (this.legacyClusterStateVersion >= stat.getVersion()) {
+          // Nothing to do, someone else updated same or newer.
+          return;
+        }
         this.legacyCollectionStates = loadedData.getCollectionStates();
         this.legacyClusterStateVersion = stat.getVersion();
       }
@@ -466,6 +471,9 @@ public class ZkStateReader implements Closeable {
     }
   }
 
+  // We don't get a Stat or track versions on getChildren() calls, so force linearization.
+  private final Object refreshCollectionListLock = new Object();
+
   /**
    * Search for any lazy-loadable state format2 collections.
    *
@@ -479,29 +487,33 @@ public class ZkStateReader implements Closeable {
    * {@link ClusterState#getCollections()} method as a safeguard against exposing wrong collection names to the users
    */
   private void refreshCollectionList(Watcher watcher) throws KeeperException, InterruptedException {
-    List<String> children = null;
-    try {
-      children = zkClient.getChildren(COLLECTIONS_ZKNODE, watcher, true);
-    } catch (KeeperException.NoNodeException e) {
-      LOG.warn("Error fetching collection names: [{}]", e.getMessage());
-      // fall through
-    }
-    if (children == null || children.isEmpty()) {
-      lazyCollectionStates.clear();
-      return;
-    }
-
     // Don't mess with watchedCollections, they should self-manage.
+    synchronized (refreshCollectionListLock) {
+      List<String> children = null;
+      try {
+        children = zkClient.getChildren(COLLECTIONS_ZKNODE, watcher, true);
+      } catch (KeeperException.NoNodeException e) {
+        LOG.warn("Error fetching collection names: [{}]", e.getMessage());
+        // fall through
+      }
+      if (children == null || children.isEmpty()) {
+        lazyCollectionStates.clear();
+        return;
+      }
 
-    // First, drop any children that disappeared.
-    this.lazyCollectionStates.keySet().retainAll(children);
-    for (String coll : children) {
-      // We will create an eager collection for any interesting collections, so don't add to lazy.
-      if (!interestingCollections.contains(coll)) {
-        // Double check contains just to avoid allocating an object.
-        LazyCollectionRef existing = lazyCollectionStates.get(coll);
-        if (existing == null) {
-          lazyCollectionStates.putIfAbsent(coll, new LazyCollectionRef(coll));
+      // Don't lock getUpdateLock() here, we don't need it and it would cause deadlock.
+      // Don't mess with watchedCollections, they should self-manage.
+
+      // First, drop any children that disappeared.
+      this.lazyCollectionStates.keySet().retainAll(children);
+      for (String coll : children) {
+        // We will create an eager collection for any interesting collections, so don't add to lazy.
+        if (!interestingCollections.contains(coll)) {
+          // Double check contains just to avoid allocating an object.
+          LazyCollectionRef existing = lazyCollectionStates.get(coll);
+          if (existing == null) {
+            lazyCollectionStates.putIfAbsent(coll, new LazyCollectionRef(coll));
+          }
         }
       }
     }
@@ -533,19 +545,37 @@ public class ZkStateReader implements Closeable {
     }
   }
 
+  // We don't get a Stat or track versions on getChildren() calls, so force linearization.
+  private final Object refreshLiveNodesLock = new Object();
+  // Ensures that only the latest getChildren fetch gets applied.
+  private final AtomicReference<Set<String>> lastFetchedLiveNodes = new AtomicReference<>();
+
   /**
    * Refresh live_nodes.
    */
   private void refreshLiveNodes(Watcher watcher) throws KeeperException, InterruptedException {
-    Set<String> newLiveNodes;
-    try {
-      List<String> nodeList = zkClient.getChildren(LIVE_NODES_ZKNODE, watcher, true);
-      LOG.debug("Updating live nodes from ZooKeeper... [{}]", nodeList.size());
-      newLiveNodes = new HashSet<>(nodeList);
-    } catch (KeeperException.NoNodeException e) {
-      newLiveNodes = emptySet();
+    synchronized (refreshLiveNodesLock) {
+      Set<String> newLiveNodes;
+      try {
+        List<String> nodeList = zkClient.getChildren(LIVE_NODES_ZKNODE, watcher, true);
+        LOG.debug("Updating live nodes from ZooKeeper... [{}]", nodeList.size());
+        newLiveNodes = new HashSet<>(nodeList);
+      } catch (KeeperException.NoNodeException e) {
+        newLiveNodes = emptySet();
+      }
+      lastFetchedLiveNodes.set(newLiveNodes);
     }
+
+    // Can't lock getUpdateLock() until we release the other, it would cause deadlock.
+    Set<String> oldLiveNodes, newLiveNodes;
     synchronized (getUpdateLock()) {
+      newLiveNodes = lastFetchedLiveNodes.getAndSet(null);
+      if (newLiveNodes == null) {
+        // Someone else won the race to apply the last update, just exit.
+        return;
+      }
+
+      oldLiveNodes = this.liveNodes;
       this.liveNodes = newLiveNodes;
       if (clusterState != null) {
         clusterState.setLiveNodes(newLiveNodes);


[2/2] lucene-solr:branch_5_5: SOLR-8973: Zookeeper frenzy when a core is first created.

Posted by dr...@apache.org.
SOLR-8973: Zookeeper frenzy when a core is first created.


Project: http://git-wip-us.apache.org/repos/asf/lucene-solr/repo
Commit: http://git-wip-us.apache.org/repos/asf/lucene-solr/commit/802ee6f8
Tree: http://git-wip-us.apache.org/repos/asf/lucene-solr/tree/802ee6f8
Diff: http://git-wip-us.apache.org/repos/asf/lucene-solr/diff/802ee6f8

Branch: refs/heads/branch_5_5
Commit: 802ee6f8c90ba6462f29c963a760aa1dfeeb09b1
Parents: f45fae5
Author: Scott Blum <dr...@apache.org>
Authored: Tue Apr 19 19:39:55 2016 -0400
Committer: Scott Blum <dr...@apache.org>
Committed: Wed Apr 20 18:29:49 2016 -0400

----------------------------------------------------------------------
 solr/CHANGES.txt                                |  2 +
 .../org/apache/solr/cloud/ZkController.java     | 12 ++--
 .../solr/cloud/overseer/ZkStateReaderTest.java  | 60 ++++++++++++++++++++
 .../apache/solr/common/cloud/ZkStateReader.java | 36 ++++++------
 4 files changed, 89 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/802ee6f8/solr/CHANGES.txt
----------------------------------------------------------------------
diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt
index 017b837..aa4ef6f 100644
--- a/solr/CHANGES.txt
+++ b/solr/CHANGES.txt
@@ -75,6 +75,8 @@ Bug Fixes
 * SOLR-8914: ZkStateReader's refreshLiveNodes(Watcher) is not thread safe. (Scott Blum, hoss,
   sarowe, Erick Erickson, Mark Miller, shalin)
 
+* SOLR-8973: Zookeeper frenzy when a core is first created. (Janmejay Singh, Scott Blum, shalin)
+
 ======================= 5.5.0 =======================
 
 Consult the LUCENE_CHANGES.txt file for additional, low level, changes in this release

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/802ee6f8/solr/core/src/java/org/apache/solr/cloud/ZkController.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/ZkController.java b/solr/core/src/java/org/apache/solr/cloud/ZkController.java
index a5da683..e7b5f50 100644
--- a/solr/core/src/java/org/apache/solr/cloud/ZkController.java
+++ b/solr/core/src/java/org/apache/solr/cloud/ZkController.java
@@ -1525,11 +1525,13 @@ public final class ZkController {
       }
 
       publish(cd, Replica.State.DOWN, false, true);
-      DocCollection collection = zkStateReader.getClusterState().getCollectionOrNull(cd.getCloudDescriptor().getCollectionName());
-      if (collection != null) {
-        log.info("Registering watch for collection {}", cd.getCloudDescriptor().getCollectionName());
-        zkStateReader.addCollectionWatch(cd.getCloudDescriptor().getCollectionName());
-      }
+      String collectionName = cd.getCloudDescriptor().getCollectionName();
+      DocCollection collection = zkStateReader.getClusterState().getCollectionOrNull(collectionName);
+      log.info(collection == null ?
+              "Collection {} not visible yet, but flagging it so a watch is registered when it becomes visible" :
+              "Registering watch for collection {}",
+          collectionName);
+      zkStateReader.addCollectionWatch(collectionName);
     } catch (KeeperException e) {
       log.error("", e);
       throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR, "", e);

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/802ee6f8/solr/core/src/test/org/apache/solr/cloud/overseer/ZkStateReaderTest.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/overseer/ZkStateReaderTest.java b/solr/core/src/test/org/apache/solr/cloud/overseer/ZkStateReaderTest.java
index 69626b0..fc52ef5 100644
--- a/solr/core/src/test/org/apache/solr/cloud/overseer/ZkStateReaderTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/overseer/ZkStateReaderTest.java
@@ -26,6 +26,7 @@ import org.apache.solr.cloud.Overseer;
 import org.apache.solr.cloud.OverseerTest;
 import org.apache.solr.cloud.ZkController;
 import org.apache.solr.cloud.ZkTestServer;
+import org.apache.solr.common.cloud.ClusterState;
 import org.apache.solr.common.cloud.DocCollection;
 import org.apache.solr.common.cloud.DocRouter;
 import org.apache.solr.common.cloud.Slice;
@@ -180,4 +181,63 @@ public class ZkStateReaderTest extends SolrTestCaseJ4 {
       server.shutdown();
     }
   }
+
+  public void testWatchedCollectionCreation() throws Exception {
+    String zkDir = createTempDir("testWatchedCollectionCreation").toFile().getAbsolutePath();
+
+    ZkTestServer server = new ZkTestServer(zkDir);
+
+    SolrZkClient zkClient = null;
+
+    try {
+      server.run();
+      AbstractZkTestCase.tryCleanSolrZkNode(server.getZkHost());
+      AbstractZkTestCase.makeSolrZkNode(server.getZkHost());
+
+      zkClient = new SolrZkClient(server.getZkAddress(), OverseerTest.DEFAULT_CONNECTION_TIMEOUT);
+      ZkController.createClusterZkNodes(zkClient);
+
+      ZkStateReader reader = new ZkStateReader(zkClient);
+      reader.createClusterStateWatchersAndUpdate();
+      reader.addCollectionWatch("c1");
+
+      // Initially there should be no c1 collection.
+      assertNull(reader.getClusterState().getCollectionRef("c1"));
+
+      zkClient.makePath(ZkStateReader.COLLECTIONS_ZKNODE + "/c1", true);
+      reader.updateClusterState();
+
+      // Still no c1 collection, despite a collection path.
+      assertNull(reader.getClusterState().getCollectionRef("c1"));
+
+      ZkStateWriter writer = new ZkStateWriter(reader, new Overseer.Stats());
+
+
+      // create new collection with stateFormat = 2
+      DocCollection state = new DocCollection("c1", new HashMap<String, Slice>(), new HashMap<String, Object>(), DocRouter.DEFAULT, 0, ZkStateReader.CLUSTER_STATE + "/c1/state.json");
+      ZkWriteCommand wc = new ZkWriteCommand("c1", state);
+      writer.enqueueUpdate(reader.getClusterState(), wc, null);
+      writer.writePendingUpdates();
+
+      assertTrue(zkClient.exists(ZkStateReader.COLLECTIONS_ZKNODE + "/c1/state.json", true));
+
+      //reader.forceUpdateCollection("c1");
+
+      for (int i = 0; i < 100; ++i) {
+        Thread.sleep(50);
+        ClusterState.CollectionRef ref = reader.getClusterState().getCollectionRef("c1");
+        if (ref != null) {
+          break;
+        }
+      }
+      ClusterState.CollectionRef ref = reader.getClusterState().getCollectionRef("c1");
+      assertNotNull(ref);
+      assertFalse(ref.isLazilyLoaded());
+      assertEquals(2, ref.get().getStateFormat());
+    } finally {
+      IOUtils.close(zkClient);
+      server.shutdown();
+
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/802ee6f8/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java b/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java
index 6a9209c..49b02a2 100644
--- a/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java
+++ b/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java
@@ -403,13 +403,6 @@ public class ZkStateReader implements Closeable {
     // To move a collection's state to format2, first create the new state2 format node, then remove legacy entry.
     Map<String, ClusterState.CollectionRef> result = new LinkedHashMap<>(legacyCollectionStates);
 
-    // Are there any interesting collections that disappeared from the legacy cluster state?
-    for (String coll : interestingCollections) {
-      if (!result.containsKey(coll) && !watchedCollectionStates.containsKey(coll)) {
-        new StateWatcher(coll).refreshAndWatch(true);
-      }
-    }
-  
     // Add state format2 collections, but don't override legacy collection states.
     for (Map.Entry<String, DocCollection> entry : watchedCollectionStates.entrySet()) {
       if (!result.containsKey(entry.getKey())) {
@@ -997,15 +990,26 @@ public class ZkStateReader implements Closeable {
 
   private DocCollection fetchCollectionState(String coll, Watcher watcher) throws KeeperException, InterruptedException {
     String collectionPath = getCollectionPath(coll);
-    try {
-      Stat stat = new Stat();
-      byte[] data = zkClient.getData(collectionPath, watcher, stat, true);
-      ClusterState state = ClusterState.load(stat.getVersion(), data,
-              Collections.<String>emptySet(), collectionPath);
-      ClusterState.CollectionRef collectionRef = state.getCollectionStates().get(coll);
-      return collectionRef == null ? null : collectionRef.get();
-    } catch (KeeperException.NoNodeException e) {
-      return null;
+    while (true) {
+      try {
+        Stat stat = new Stat();
+        byte[] data = zkClient.getData(collectionPath, watcher, stat, true);
+        ClusterState state = ClusterState.load(stat.getVersion(), data,
+            Collections.<String>emptySet(), collectionPath);
+        ClusterState.CollectionRef collectionRef = state.getCollectionStates().get(coll);
+        return collectionRef == null ? null : collectionRef.get();
+      } catch (KeeperException.NoNodeException e) {
+        if (watcher != null) {
+          // Leave an exists watch in place in case a state.json is created later.
+          Stat exists = zkClient.exists(collectionPath, watcher, true);
+          if (exists != null) {
+            // Rare race condition, we tried to fetch the data and couldn't find it, then we found it exists.
+            // Loop and try again.
+            continue;
+          }
+        }
+        return null;
+      }
     }
   }