You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ma...@apache.org on 2015/04/08 23:49:21 UTC

svn commit: r1672201 - in /lucene/dev/trunk/solr: ./ core/src/java/org/apache/solr/cloud/ core/src/test/org/apache/solr/cloud/ solrj/src/java/org/apache/solr/common/cloud/

Author: markrmiller
Date: Wed Apr  8 21:49:21 2015
New Revision: 1672201

URL: http://svn.apache.org/r1672201
Log:
SOLR-7066: autoAddReplicas feature has bug when selecting replacement nodes.

Modified:
    lucene/dev/trunk/solr/CHANGES.txt
    lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/OverseerAutoReplicaFailoverThread.java
    lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/SharedFSAutoReplicaFailoverTest.java
    lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/common/cloud/ClusterStateUtil.java

Modified: lucene/dev/trunk/solr/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/CHANGES.txt?rev=1672201&r1=1672200&r2=1672201&view=diff
==============================================================================
--- lucene/dev/trunk/solr/CHANGES.txt (original)
+++ lucene/dev/trunk/solr/CHANGES.txt Wed Apr  8 21:49:21 2015
@@ -96,6 +96,8 @@ Bug Fixes
 * SOLR-6709: Fix QueryResponse to deal with the "expanded" section when using the XMLResponseParser
   (Varun Thacker, Joel Bernstein)
 
+* SOLR-7066: autoAddReplicas feature has bug when selecting replacement nodes. (Mark Miller)
+
 Optimizations
 ----------------------
 

Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/OverseerAutoReplicaFailoverThread.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/OverseerAutoReplicaFailoverThread.java?rev=1672201&r1=1672200&r2=1672201&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/OverseerAutoReplicaFailoverThread.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/OverseerAutoReplicaFailoverThread.java Wed Apr  8 21:49:21 2015
@@ -149,7 +149,7 @@ public class OverseerAutoReplicaFailover
     ClusterState clusterState = zkStateReader.getClusterState();
     //check if we have disabled autoAddReplicas cluster wide
     String autoAddReplicas = (String) zkStateReader.getClusterProps().get(ZkStateReader.AUTO_ADD_REPLICAS);
-    if (autoAddReplicas !=null && autoAddReplicas.equals("false")) {
+    if (autoAddReplicas != null && autoAddReplicas.equals("false")) {
       return;
     }
     if (clusterState != null) {
@@ -164,15 +164,17 @@ public class OverseerAutoReplicaFailover
       lastClusterStateVersion = clusterState.getZkClusterStateVersion();
       Set<String> collections = clusterState.getCollections();
       for (final String collection : collections) {
+        log.debug("look at collection={}", collection);
         DocCollection docCollection = clusterState.getCollection(collection);
         if (!docCollection.getAutoAddReplicas()) {
+          log.debug("Collection {} is not setup to use autoAddReplicas, skipping..", docCollection.getName());
           continue;
         }
         if (docCollection.getReplicationFactor() == null) {
           log.debug("Skipping collection because it has no defined replicationFactor, name={}", docCollection.getName());
           continue;
         }
-        log.debug("Found collection, name={} replicationFactor=", collection, docCollection.getReplicationFactor());
+        log.debug("Found collection, name={} replicationFactor={}", collection, docCollection.getReplicationFactor());
         
         Collection<Slice> slices = docCollection.getSlices();
         for (Slice slice : slices) {
@@ -182,7 +184,7 @@ public class OverseerAutoReplicaFailover
             
             int goodReplicas = findDownReplicasInSlice(clusterState, docCollection, slice, downReplicas);
             
-            log.debug("replicationFactor={} goodReplicaCount={}", docCollection.getReplicationFactor(), goodReplicas);
+            log.debug("collection={} replicationFactor={} goodReplicaCount={}", docCollection.getName(), docCollection.getReplicationFactor(), goodReplicas);
             
             if (downReplicas.size() > 0 && goodReplicas < docCollection.getReplicationFactor()) {
               // badReplicaMap.put(collection, badReplicas);
@@ -199,7 +201,7 @@ public class OverseerAutoReplicaFailover
 
   private void processBadReplicas(final String collection, final Collection<DownReplica> badReplicas) {
     for (DownReplica badReplica : badReplicas) {
-      log.debug("process down replica {}", badReplica.replica.getName());
+      log.debug("process down replica={} from collection={}", badReplica.replica.getName(), collection);
       String baseUrl = badReplica.replica.getStr(ZkStateReader.BASE_URL_PROP);
       Long wentBadAtNS = baseUrlForBadNodes.getIfPresent(baseUrl);
       if (wentBadAtNS == null) {
@@ -252,7 +254,7 @@ public class OverseerAutoReplicaFailover
       });
       
       // wait to see state for core we just created
-      boolean success = ClusterStateUtil.waitToSeeLive(zkStateReader, collection, coreNodeName, createUrl, 30000);
+      boolean success = ClusterStateUtil.waitToSeeLiveReplica(zkStateReader, collection, coreNodeName, createUrl, 30000);
       if (!success) {
         log.error("Creating new replica appears to have failed, timed out waiting to see created SolrCore register in the clusterstate.");
         return false;
@@ -304,8 +306,9 @@ public class OverseerAutoReplicaFailover
     assert badReplica != null;
     assert badReplica.collection != null;
     assert badReplica.slice != null;
-    Map<String,Counts> counts = new HashMap<>();
-    ValueComparator vc = new ValueComparator(counts);
+    log.debug("getBestCreateUrl for " + badReplica.replica);
+    Map<String,Counts> counts = new HashMap<String, Counts>();
+    Set<String> unsuitableHosts = new HashSet<String>();
     
     Set<String> liveNodes = new HashSet<>(zkStateReader.getClusterState().getLiveNodes());
     
@@ -320,20 +323,20 @@ public class OverseerAutoReplicaFailover
         for (Slice slice : slices) {
           // only look at active shards
           if (slice.getState() == Slice.State.ACTIVE) {
-            log.debug("look at slice {} as possible create candidate", slice.getName()); 
+            log.debug("look at slice {} for collection {} as possible create candidate", slice.getName(), collection); 
             Collection<Replica> replicas = slice.getReplicas();
 
             for (Replica replica : replicas) {
               liveNodes.remove(replica.getNodeName());
-              if (replica.getStr(ZkStateReader.BASE_URL_PROP).equals(
+              String baseUrl = replica.getStr(ZkStateReader.BASE_URL_PROP);
+              if (baseUrl.equals(
                   badReplica.replica.getStr(ZkStateReader.BASE_URL_PROP))) {
                 continue;
               }
-              String baseUrl = replica.getStr(ZkStateReader.BASE_URL_PROP);
               // on a live node?
-              log.debug("nodename={} livenodes={}", replica.getNodeName(), clusterState.getLiveNodes());
+              log.debug("collection={} nodename={} livenodes={}", collection, replica.getNodeName(), clusterState.getLiveNodes());
               boolean live = clusterState.liveNodesContain(replica.getNodeName());
-              log.debug("look at replica {} as possible create candidate, live={}", replica.getName(), live); 
+              log.debug("collection={} look at replica {} as possible create candidate, live={}", collection, replica.getName(), live); 
               if (live) {
                 Counts cnt = counts.get(baseUrl);
                 if (cnt == null) {
@@ -351,8 +354,12 @@ public class OverseerAutoReplicaFailover
                 
                 // TODO: this is collection wide and we want to take into
                 // account cluster wide - use new cluster sys prop
-                int maxShardsPerNode = docCollection.getMaxShardsPerNode();
-                log.debug("max shards per node={} good replicas={}", maxShardsPerNode, cnt);
+                Integer maxShardsPerNode = badReplica.collection.getMaxShardsPerNode();
+                if (maxShardsPerNode == null) {
+                  log.warn("maxShardsPerNode is not defined for collection, name=" + badReplica.collection.getName());
+                  maxShardsPerNode = Integer.MAX_VALUE;
+                }
+                log.debug("collection={} node={} max shards per node={} potential hosts={}", collection, baseUrl, maxShardsPerNode, cnt);
                 
                 Collection<Replica> badSliceReplicas = null;
                 DocCollection c = clusterState.getCollection(badReplica.collection.getName());
@@ -363,10 +370,13 @@ public class OverseerAutoReplicaFailover
                   }
                 }
                 boolean alreadyExistsOnNode = replicaAlreadyExistsOnNode(zkStateReader.getClusterState(), badSliceReplicas, badReplica, baseUrl);
-                if (alreadyExistsOnNode || cnt.collectionShardsOnNode >= maxShardsPerNode) {
-                  counts.remove(replica.getStr(ZkStateReader.BASE_URL_PROP));
+                if (unsuitableHosts.contains(baseUrl) || alreadyExistsOnNode || cnt.collectionShardsOnNode >= maxShardsPerNode) {
+                  counts.remove(baseUrl);
+                  unsuitableHosts.add(baseUrl);
+                  log.debug("not a candidate node, collection={} node={} max shards per node={} good replicas={}", collection, baseUrl, maxShardsPerNode, cnt);
                 } else {
-                  counts.put(replica.getStr(ZkStateReader.BASE_URL_PROP), cnt);
+                  counts.put(baseUrl, cnt);
+                  log.debug("is a candidate node, collection={} node={} max shards per node={} good replicas={}", collection, baseUrl, maxShardsPerNode, cnt);
                 }
               }
             }
@@ -380,32 +390,35 @@ public class OverseerAutoReplicaFailover
     }
     
     if (counts.size() == 0) {
+      log.debug("no suitable hosts found for getBestCreateUrl for collection={}", badReplica.collection.getName());
       return null;
     }
     
-    Map<String,Counts> sortedCounts = new TreeMap<>(vc);
+    ValueComparator vc = new ValueComparator(counts);
+    Map<String,Counts> sortedCounts = new TreeMap<String, Counts>(vc);
     sortedCounts.putAll(counts);
     
-    log.debug("empty nodes={}", liveNodes);
-    log.debug("sorted hosts={}", sortedCounts);
+    log.debug("empty nodes={} for collection={}", liveNodes, badReplica.collection.getName());
+    log.debug("sorted hosts={} for collection={}", sortedCounts, badReplica.collection.getName());
+    log.debug("unsuitable hosts={} for collection={}", unsuitableHosts, badReplica.collection.getName());
     
     return sortedCounts.keySet().iterator().next();
   }
   
   private static boolean replicaAlreadyExistsOnNode(ClusterState clusterState, Collection<Replica> replicas, DownReplica badReplica, String baseUrl) {
     if (replicas != null) {
-      log.debug("check if replica already exists on node using replicas {}", getNames(replicas));
+      log.debug("collection={} check if replica already exists on node using replicas {}", badReplica.collection.getName(), getNames(replicas));
       for (Replica replica : replicas) {
         final Replica.State state = replica.getState();
         if (!replica.getName().equals(badReplica.replica.getName()) && replica.getStr(ZkStateReader.BASE_URL_PROP).equals(baseUrl)
             && clusterState.liveNodesContain(replica.getNodeName())
             && (state == Replica.State.ACTIVE || state == Replica.State.DOWN || state == Replica.State.RECOVERING)) {
-          log.debug("replica already exists on node, bad replica={}, existing replica={}, node name={}", badReplica.replica.getName(), replica.getName(), replica.getNodeName());
+          log.debug("collection={} replica already exists on node, bad replica={}, existing replica={}, node name={}",  badReplica.collection.getName(), badReplica.replica.getName(), replica.getName(), replica.getNodeName());
           return true;
         }
       }
     }
-    log.debug("replica does not yet exist on node: {}", baseUrl);
+    log.debug("collection={} replica does not yet exist on node: {}",  badReplica.collection.getName(), baseUrl);
     return false;
   }
   
@@ -484,7 +497,7 @@ public class OverseerAutoReplicaFailover
     @Override
     public String toString() {
       return "Counts [negRankingWeight=" + negRankingWeight + ", sameSliceCount="
-          + ourReplicas + "]";
+          + ourReplicas + ", collectionShardsOnNode=" + collectionShardsOnNode + "]";
     }
   }
   

Modified: lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/SharedFSAutoReplicaFailoverTest.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/SharedFSAutoReplicaFailoverTest.java?rev=1672201&r1=1672200&r2=1672201&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/SharedFSAutoReplicaFailoverTest.java (original)
+++ lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/SharedFSAutoReplicaFailoverTest.java Wed Apr  8 21:49:21 2015
@@ -17,6 +17,8 @@ package org.apache.solr.cloud;
  * limitations under the License.
  */
 
+import static org.apache.solr.common.cloud.ZkNodeProps.makeMap;
+
 import java.util.Collection;
 import java.util.HashSet;
 import java.util.Map;
@@ -28,8 +30,6 @@ import java.util.concurrent.SynchronousQ
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 
-import com.carrotsearch.randomizedtesting.annotations.ThreadLeakFilters;
-
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.lucene.util.LuceneTestCase.Nightly;
 import org.apache.lucene.util.LuceneTestCase.Slow;
@@ -40,7 +40,6 @@ import org.apache.solr.client.solrj.requ
 import org.apache.solr.client.solrj.response.CollectionAdminResponse;
 import org.apache.solr.cloud.hdfs.HdfsTestUtil;
 import org.apache.solr.common.cloud.ClusterStateUtil;
-import org.apache.solr.common.cloud.Replica;
 import org.apache.solr.common.cloud.Slice;
 import org.apache.solr.common.cloud.ZkStateReader;
 import org.apache.solr.common.params.CollectionParams;
@@ -51,7 +50,8 @@ import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
-import static org.apache.solr.common.cloud.ZkNodeProps.makeMap;
+import com.carrotsearch.randomizedtesting.annotations.ThreadLeakFilters;
+
 
 @Nightly
 @Slow
@@ -148,37 +148,66 @@ public class SharedFSAutoReplicaFailover
     assertTrue(response2.isSuccess());
     
     waitForRecoveriesToFinish(collection2, false);
+    
+    String collection3 = "solrj_collection3";
+    createCollectionRequest = new Create();
+    createCollectionRequest.setCollectionName(collection3);
+    createCollectionRequest.setNumShards(5);
+    createCollectionRequest.setReplicationFactor(1);
+    createCollectionRequest.setMaxShardsPerNode(1);
+    createCollectionRequest.setConfigName("conf1");
+    createCollectionRequest.setRouterField("myOwnField");
+    createCollectionRequest.setAutoAddReplicas(true);
+    CollectionAdminResponse response3 = createCollectionRequest.process(getCommonCloudSolrClient());
+
+    assertEquals(0, response3.getStatus());
+    assertTrue(response3.isSuccess());
+    
+    waitForRecoveriesToFinish(collection3, false);
 
     ChaosMonkey.stop(jettys.get(1));
     ChaosMonkey.stop(jettys.get(2));
 
-    Thread.sleep(3000);
+    Thread.sleep(5000);
 
-    assertTrue("Timeout waiting for all live and active", ClusterStateUtil.waitForAllActiveAndLive(cloudClient.getZkStateReader(), collection1, 120000));
+    assertTrue("Timeout waiting for all live and active", ClusterStateUtil.waitForAllActiveAndLiveReplicas(cloudClient.getZkStateReader(), collection1, 120000));
 
     assertSliceAndReplicaCount(collection1);
 
-    assertEquals(4, getLiveAndActiveCount(collection1));
-    assertTrue(getLiveAndActiveCount(collection2) < 4);
+    assertEquals(4, ClusterStateUtil.getLiveAndActiveReplicaCount(cloudClient.getZkStateReader(), collection1));
+    assertTrue(ClusterStateUtil.getLiveAndActiveReplicaCount(cloudClient.getZkStateReader(), collection2) < 4);
 
+    // collection3 has maxShardsPerNode=1, there are 4 standard jetties and one control jetty and 2 nodes stopped
+    ClusterStateUtil.waitForLiveAndActiveReplicaCount(cloudClient.getZkStateReader(), collection3, 3, 30000);
+    
+    // collection1 should still be at 4
+    assertEquals(4, ClusterStateUtil.getLiveAndActiveReplicaCount(cloudClient.getZkStateReader(), collection1));
+    // and collection2 less than 4
+    assertTrue(ClusterStateUtil.getLiveAndActiveReplicaCount(cloudClient.getZkStateReader(), collection2) < 4);
+    
     ChaosMonkey.stop(jettys);
     ChaosMonkey.stop(controlJetty);
 
-    assertTrue("Timeout waiting for all not live", ClusterStateUtil.waitForAllNotLive(cloudClient.getZkStateReader(), 45000));
+    assertTrue("Timeout waiting for all not live", ClusterStateUtil.waitForAllReplicasNotLive(cloudClient.getZkStateReader(), 45000));
 
     ChaosMonkey.start(jettys);
     ChaosMonkey.start(controlJetty);
 
-    assertTrue("Timeout waiting for all live and active", ClusterStateUtil.waitForAllActiveAndLive(cloudClient.getZkStateReader(), collection1, 120000));
+    assertTrue("Timeout waiting for all live and active", ClusterStateUtil.waitForAllActiveAndLiveReplicas(cloudClient.getZkStateReader(), collection1, 120000));
 
     assertSliceAndReplicaCount(collection1);
-
+    assertSingleReplicationAndShardSize(collection3, 5);
+    
     int jettyIndex = random().nextInt(jettys.size());
     ChaosMonkey.stop(jettys.get(jettyIndex));
     ChaosMonkey.start(jettys.get(jettyIndex));
-
-    assertTrue("Timeout waiting for all live and active", ClusterStateUtil.waitForAllActiveAndLive(cloudClient.getZkStateReader(), collection1, 60000));
-
+    
+    assertTrue("Timeout waiting for all live and active", ClusterStateUtil.waitForAllActiveAndLiveReplicas(cloudClient.getZkStateReader(), collection1, 60000));
+    
+    assertSliceAndReplicaCount(collection1);
+    
+    assertSingleReplicationAndShardSize(collection3, 5);
+    ClusterStateUtil.waitForLiveAndActiveReplicaCount(cloudClient.getZkStateReader(), collection3, 5, 30000);
     //disable autoAddReplicas
     Map m = makeMap(
         "action", CollectionParams.CollectionAction.CLUSTERPROP.toLower(),
@@ -189,7 +218,7 @@ public class SharedFSAutoReplicaFailover
     request.setPath("/admin/collections");
     cloudClient.request(request);
 
-    int currentCount = getLiveAndActiveCount(collection1);
+    int currentCount = ClusterStateUtil.getLiveAndActiveReplicaCount(cloudClient.getZkStateReader(), collection1);
 
     ChaosMonkey.stop(jettys.get(3));
 
@@ -197,7 +226,7 @@ public class SharedFSAutoReplicaFailover
     //Hence waiting for 30 seconds to be on the safe side.
     Thread.sleep(30000);
     //Ensures that autoAddReplicas has not kicked in.
-    assertTrue(currentCount > getLiveAndActiveCount(collection1));
+    assertTrue(currentCount > ClusterStateUtil.getLiveAndActiveReplicaCount(cloudClient.getZkStateReader(), collection1));
 
     //enable autoAddReplicas
     m = makeMap(
@@ -208,24 +237,17 @@ public class SharedFSAutoReplicaFailover
     request.setPath("/admin/collections");
     cloudClient.request(request);
 
-    assertTrue("Timeout waiting for all live and active", ClusterStateUtil.waitForAllActiveAndLive(cloudClient.getZkStateReader(), collection1, 60000));
+    assertTrue("Timeout waiting for all live and active", ClusterStateUtil.waitForAllActiveAndLiveReplicas(cloudClient.getZkStateReader(), collection1, 60000));
     assertSliceAndReplicaCount(collection1);
   }
-
-  private int getLiveAndActiveCount(String collection1) {
+  
+  private void assertSingleReplicationAndShardSize(String collection, int numSlices) {
     Collection<Slice> slices;
-    slices = cloudClient.getZkStateReader().getClusterState().getActiveSlices(collection1);
-    int liveAndActive = 0;
+    slices = cloudClient.getZkStateReader().getClusterState().getActiveSlices(collection);
+    assertEquals(numSlices, slices.size());
     for (Slice slice : slices) {
-      for (Replica replica : slice.getReplicas()) {
-        boolean live = cloudClient.getZkStateReader().getClusterState().liveNodesContain(replica.getNodeName());
-        boolean active = replica.getState() == Replica.State.ACTIVE;
-        if (live && active) {
-          liveAndActive++;
-        }
-      }
+      assertEquals(1, slice.getReplicas().size());
     }
-    return liveAndActive;
   }
 
   private void assertSliceAndReplicaCount(String collection) {

Modified: lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/common/cloud/ClusterStateUtil.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/common/cloud/ClusterStateUtil.java?rev=1672201&r1=1672200&r2=1672201&view=diff
==============================================================================
--- lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/common/cloud/ClusterStateUtil.java (original)
+++ lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/common/cloud/ClusterStateUtil.java Wed Apr  8 21:49:21 2015
@@ -41,8 +41,8 @@ public class ClusterStateUtil {
    *          how long to wait before giving up
    * @return false if timed out
    */
-  public static boolean waitForAllActiveAndLive(ZkStateReader zkStateReader, int timeoutInMs) {
-    return waitForAllActiveAndLive(zkStateReader, null, timeoutInMs);
+  public static boolean waitForAllActiveAndLiveReplicas(ZkStateReader zkStateReader, int timeoutInMs) {
+    return waitForAllActiveAndLiveReplicas(zkStateReader, null, timeoutInMs);
   }
   
   /**
@@ -55,12 +55,12 @@ public class ClusterStateUtil {
    *          how long to wait before giving up
    * @return false if timed out
    */
-  public static boolean waitForAllActiveAndLive(ZkStateReader zkStateReader, String collection,
+  public static boolean waitForAllActiveAndLiveReplicas(ZkStateReader zkStateReader, String collection,
       int timeoutInMs) {
     long timeout = System.nanoTime()
         + TimeUnit.NANOSECONDS.convert(timeoutInMs, TimeUnit.MILLISECONDS);
     boolean success = false;
-    while (System.nanoTime() < timeout) {
+    while (!success && System.nanoTime() < timeout) {
       success = true;
       ClusterState clusterState = zkStateReader.getClusterState();
       if (clusterState != null) {
@@ -119,7 +119,7 @@ public class ClusterStateUtil {
    *          how long to wait before giving up
    * @return false if timed out
    */
-  public static boolean waitToSeeLive(ZkStateReader zkStateReader,
+  public static boolean waitToSeeLiveReplica(ZkStateReader zkStateReader,
       String collection, String coreNodeName, String baseUrl,
       int timeoutInMs) {
     long timeout = System.nanoTime()
@@ -162,17 +162,17 @@ public class ClusterStateUtil {
     return false;
   }
   
-  public static boolean waitForAllNotLive(ZkStateReader zkStateReader, int timeoutInMs) {
-    return waitForAllNotLive(zkStateReader, null, timeoutInMs);
+  public static boolean waitForAllReplicasNotLive(ZkStateReader zkStateReader, int timeoutInMs) {
+    return waitForAllReplicasNotLive(zkStateReader, null, timeoutInMs);
   }
   
 
-  public static boolean waitForAllNotLive(ZkStateReader zkStateReader,
+  public static boolean waitForAllReplicasNotLive(ZkStateReader zkStateReader,
       String collection, int timeoutInMs) {
     long timeout = System.nanoTime()
         + TimeUnit.NANOSECONDS.convert(timeoutInMs, TimeUnit.MILLISECONDS);
     boolean success = false;
-    while (System.nanoTime() < timeout) {
+    while (!success && System.nanoTime() < timeout) {
       success = true;
       ClusterState clusterState = zkStateReader.getClusterState();
       if (clusterState != null) {
@@ -213,6 +213,44 @@ public class ClusterStateUtil {
     }
     
     return success;
+  }
+  
+  public static int getLiveAndActiveReplicaCount(ZkStateReader zkStateReader, String collection) {
+    Collection<Slice> slices;
+    slices = zkStateReader.getClusterState().getActiveSlices(collection);
+    int liveAndActive = 0;
+    for (Slice slice : slices) {
+      for (Replica replica : slice.getReplicas()) {
+        boolean live = zkStateReader.getClusterState().liveNodesContain(replica.getNodeName());
+        boolean active = replica.getState() == Replica.State.ACTIVE;
+        if (live && active) {
+          liveAndActive++;
+        }
+      }
+    }
+    return liveAndActive;
+  }
+  
+  public static boolean waitForLiveAndActiveReplicaCount(ZkStateReader zkStateReader,
+      String collection, int replicaCount, int timeoutInMs) {
+    long timeout = System.nanoTime()
+        + TimeUnit.NANOSECONDS.convert(timeoutInMs, TimeUnit.MILLISECONDS);
+    boolean success = false;
+    while (!success && System.nanoTime() < timeout) {
+      success = getLiveAndActiveReplicaCount(zkStateReader, collection) == replicaCount;
+      
+      if (!success) {
+        try {
+          Thread.sleep(TIMEOUT_POLL_MS);
+        } catch (InterruptedException e) {
+          Thread.currentThread().interrupt();
+          throw new SolrException(ErrorCode.SERVER_ERROR, "Interrupted");
+        }
+      }
+      
+    }
+    
+    return success;
   }
   
   public static boolean isAutoAddReplicas(ZkStateReader reader, String collection) {