You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ma...@apache.org on 2015/04/10 00:42:13 UTC
svn commit: r1672513 - in /lucene/dev/branches/branch_5x: ./ solr/
solr/core/ solr/core/src/java/org/apache/solr/cloud/
solr/core/src/test/org/apache/solr/cloud/ solr/solrj/
solr/solrj/src/java/org/apache/solr/common/cloud/
Author: markrmiller
Date: Thu Apr 9 22:42:12 2015
New Revision: 1672513
URL: http://svn.apache.org/r1672513
Log:
SOLR-7066: autoAddReplicas feature has bug when selecting replacement nodes.
Modified:
lucene/dev/branches/branch_5x/ (props changed)
lucene/dev/branches/branch_5x/solr/ (props changed)
lucene/dev/branches/branch_5x/solr/CHANGES.txt (contents, props changed)
lucene/dev/branches/branch_5x/solr/core/ (props changed)
lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/cloud/OverseerAutoReplicaFailoverThread.java
lucene/dev/branches/branch_5x/solr/core/src/test/org/apache/solr/cloud/SharedFSAutoReplicaFailoverTest.java
lucene/dev/branches/branch_5x/solr/solrj/ (props changed)
lucene/dev/branches/branch_5x/solr/solrj/src/java/org/apache/solr/common/cloud/ClusterStateUtil.java
Modified: lucene/dev/branches/branch_5x/solr/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_5x/solr/CHANGES.txt?rev=1672513&r1=1672512&r2=1672513&view=diff
==============================================================================
--- lucene/dev/branches/branch_5x/solr/CHANGES.txt (original)
+++ lucene/dev/branches/branch_5x/solr/CHANGES.txt Thu Apr 9 22:42:12 2015
@@ -44,6 +44,8 @@ Bug Fixes
* SOLR-6709: Fix QueryResponse to deal with the "expanded" section when using the XMLResponseParser
(Varun Thacker, Joel Bernstein)
+* SOLR-7066: autoAddReplicas feature has bug when selecting replacement nodes. (Mark Miller)
+
Optimizations
----------------------
Modified: lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/cloud/OverseerAutoReplicaFailoverThread.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/cloud/OverseerAutoReplicaFailoverThread.java?rev=1672513&r1=1672512&r2=1672513&view=diff
==============================================================================
--- lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/cloud/OverseerAutoReplicaFailoverThread.java (original)
+++ lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/cloud/OverseerAutoReplicaFailoverThread.java Thu Apr 9 22:42:12 2015
@@ -149,7 +149,7 @@ public class OverseerAutoReplicaFailover
ClusterState clusterState = zkStateReader.getClusterState();
//check if we have disabled autoAddReplicas cluster wide
String autoAddReplicas = (String) zkStateReader.getClusterProps().get(ZkStateReader.AUTO_ADD_REPLICAS);
- if (autoAddReplicas !=null && autoAddReplicas.equals("false")) {
+ if (autoAddReplicas != null && autoAddReplicas.equals("false")) {
return;
}
if (clusterState != null) {
@@ -164,15 +164,17 @@ public class OverseerAutoReplicaFailover
lastClusterStateVersion = clusterState.getZkClusterStateVersion();
Set<String> collections = clusterState.getCollections();
for (final String collection : collections) {
+ log.debug("look at collection={}", collection);
DocCollection docCollection = clusterState.getCollection(collection);
if (!docCollection.getAutoAddReplicas()) {
+ log.debug("Collection {} is not setup to use autoAddReplicas, skipping..", docCollection.getName());
continue;
}
if (docCollection.getReplicationFactor() == null) {
log.debug("Skipping collection because it has no defined replicationFactor, name={}", docCollection.getName());
continue;
}
- log.debug("Found collection, name={} replicationFactor=", collection, docCollection.getReplicationFactor());
+ log.debug("Found collection, name={} replicationFactor={}", collection, docCollection.getReplicationFactor());
Collection<Slice> slices = docCollection.getSlices();
for (Slice slice : slices) {
@@ -182,7 +184,7 @@ public class OverseerAutoReplicaFailover
int goodReplicas = findDownReplicasInSlice(clusterState, docCollection, slice, downReplicas);
- log.debug("replicationFactor={} goodReplicaCount={}", docCollection.getReplicationFactor(), goodReplicas);
+ log.debug("collection={} replicationFactor={} goodReplicaCount={}", docCollection.getName(), docCollection.getReplicationFactor(), goodReplicas);
if (downReplicas.size() > 0 && goodReplicas < docCollection.getReplicationFactor()) {
// badReplicaMap.put(collection, badReplicas);
@@ -199,7 +201,7 @@ public class OverseerAutoReplicaFailover
private void processBadReplicas(final String collection, final Collection<DownReplica> badReplicas) {
for (DownReplica badReplica : badReplicas) {
- log.debug("process down replica {}", badReplica.replica.getName());
+ log.debug("process down replica={} from collection={}", badReplica.replica.getName(), collection);
String baseUrl = badReplica.replica.getStr(ZkStateReader.BASE_URL_PROP);
Long wentBadAtNS = baseUrlForBadNodes.getIfPresent(baseUrl);
if (wentBadAtNS == null) {
@@ -252,7 +254,7 @@ public class OverseerAutoReplicaFailover
});
// wait to see state for core we just created
- boolean success = ClusterStateUtil.waitToSeeLive(zkStateReader, collection, coreNodeName, createUrl, 30000);
+ boolean success = ClusterStateUtil.waitToSeeLiveReplica(zkStateReader, collection, coreNodeName, createUrl, 30000);
if (!success) {
log.error("Creating new replica appears to have failed, timed out waiting to see created SolrCore register in the clusterstate.");
return false;
@@ -304,8 +306,9 @@ public class OverseerAutoReplicaFailover
assert badReplica != null;
assert badReplica.collection != null;
assert badReplica.slice != null;
- Map<String,Counts> counts = new HashMap<>();
- ValueComparator vc = new ValueComparator(counts);
+ log.debug("getBestCreateUrl for " + badReplica.replica);
+ Map<String,Counts> counts = new HashMap<String, Counts>();
+ Set<String> unsuitableHosts = new HashSet<String>();
Set<String> liveNodes = new HashSet<>(zkStateReader.getClusterState().getLiveNodes());
@@ -320,20 +323,20 @@ public class OverseerAutoReplicaFailover
for (Slice slice : slices) {
// only look at active shards
if (slice.getState() == Slice.State.ACTIVE) {
- log.debug("look at slice {} as possible create candidate", slice.getName());
+ log.debug("look at slice {} for collection {} as possible create candidate", slice.getName(), collection);
Collection<Replica> replicas = slice.getReplicas();
for (Replica replica : replicas) {
liveNodes.remove(replica.getNodeName());
- if (replica.getStr(ZkStateReader.BASE_URL_PROP).equals(
+ String baseUrl = replica.getStr(ZkStateReader.BASE_URL_PROP);
+ if (baseUrl.equals(
badReplica.replica.getStr(ZkStateReader.BASE_URL_PROP))) {
continue;
}
- String baseUrl = replica.getStr(ZkStateReader.BASE_URL_PROP);
// on a live node?
- log.debug("nodename={} livenodes={}", replica.getNodeName(), clusterState.getLiveNodes());
+ log.debug("collection={} nodename={} livenodes={}", collection, replica.getNodeName(), clusterState.getLiveNodes());
boolean live = clusterState.liveNodesContain(replica.getNodeName());
- log.debug("look at replica {} as possible create candidate, live={}", replica.getName(), live);
+ log.debug("collection={} look at replica {} as possible create candidate, live={}", collection, replica.getName(), live);
if (live) {
Counts cnt = counts.get(baseUrl);
if (cnt == null) {
@@ -351,8 +354,12 @@ public class OverseerAutoReplicaFailover
// TODO: this is collection wide and we want to take into
// account cluster wide - use new cluster sys prop
- int maxShardsPerNode = docCollection.getMaxShardsPerNode();
- log.debug("max shards per node={} good replicas={}", maxShardsPerNode, cnt);
+ Integer maxShardsPerNode = badReplica.collection.getMaxShardsPerNode();
+ if (maxShardsPerNode == null) {
+ log.warn("maxShardsPerNode is not defined for collection, name=" + badReplica.collection.getName());
+ maxShardsPerNode = Integer.MAX_VALUE;
+ }
+ log.debug("collection={} node={} max shards per node={} potential hosts={}", collection, baseUrl, maxShardsPerNode, cnt);
Collection<Replica> badSliceReplicas = null;
DocCollection c = clusterState.getCollection(badReplica.collection.getName());
@@ -363,10 +370,13 @@ public class OverseerAutoReplicaFailover
}
}
boolean alreadyExistsOnNode = replicaAlreadyExistsOnNode(zkStateReader.getClusterState(), badSliceReplicas, badReplica, baseUrl);
- if (alreadyExistsOnNode || cnt.collectionShardsOnNode >= maxShardsPerNode) {
- counts.remove(replica.getStr(ZkStateReader.BASE_URL_PROP));
+ if (unsuitableHosts.contains(baseUrl) || alreadyExistsOnNode || cnt.collectionShardsOnNode >= maxShardsPerNode) {
+ counts.remove(baseUrl);
+ unsuitableHosts.add(baseUrl);
+ log.debug("not a candidate node, collection={} node={} max shards per node={} good replicas={}", collection, baseUrl, maxShardsPerNode, cnt);
} else {
- counts.put(replica.getStr(ZkStateReader.BASE_URL_PROP), cnt);
+ counts.put(baseUrl, cnt);
+ log.debug("is a candidate node, collection={} node={} max shards per node={} good replicas={}", collection, baseUrl, maxShardsPerNode, cnt);
}
}
}
@@ -380,32 +390,35 @@ public class OverseerAutoReplicaFailover
}
if (counts.size() == 0) {
+ log.debug("no suitable hosts found for getBestCreateUrl for collection={}", badReplica.collection.getName());
return null;
}
- Map<String,Counts> sortedCounts = new TreeMap<>(vc);
+ ValueComparator vc = new ValueComparator(counts);
+ Map<String,Counts> sortedCounts = new TreeMap<String, Counts>(vc);
sortedCounts.putAll(counts);
- log.debug("empty nodes={}", liveNodes);
- log.debug("sorted hosts={}", sortedCounts);
+ log.debug("empty nodes={} for collection={}", liveNodes, badReplica.collection.getName());
+ log.debug("sorted hosts={} for collection={}", sortedCounts, badReplica.collection.getName());
+ log.debug("unsuitable hosts={} for collection={}", unsuitableHosts, badReplica.collection.getName());
return sortedCounts.keySet().iterator().next();
}
private static boolean replicaAlreadyExistsOnNode(ClusterState clusterState, Collection<Replica> replicas, DownReplica badReplica, String baseUrl) {
if (replicas != null) {
- log.debug("check if replica already exists on node using replicas {}", getNames(replicas));
+ log.debug("collection={} check if replica already exists on node using replicas {}", badReplica.collection.getName(), getNames(replicas));
for (Replica replica : replicas) {
final Replica.State state = replica.getState();
if (!replica.getName().equals(badReplica.replica.getName()) && replica.getStr(ZkStateReader.BASE_URL_PROP).equals(baseUrl)
&& clusterState.liveNodesContain(replica.getNodeName())
&& (state == Replica.State.ACTIVE || state == Replica.State.DOWN || state == Replica.State.RECOVERING)) {
- log.debug("replica already exists on node, bad replica={}, existing replica={}, node name={}", badReplica.replica.getName(), replica.getName(), replica.getNodeName());
+ log.debug("collection={} replica already exists on node, bad replica={}, existing replica={}, node name={}", badReplica.collection.getName(), badReplica.replica.getName(), replica.getName(), replica.getNodeName());
return true;
}
}
}
- log.debug("replica does not yet exist on node: {}", baseUrl);
+ log.debug("collection={} replica does not yet exist on node: {}", badReplica.collection.getName(), baseUrl);
return false;
}
@@ -484,7 +497,7 @@ public class OverseerAutoReplicaFailover
@Override
public String toString() {
return "Counts [negRankingWeight=" + negRankingWeight + ", sameSliceCount="
- + ourReplicas + "]";
+ + ourReplicas + ", collectionShardsOnNode=" + collectionShardsOnNode + "]";
}
}
Modified: lucene/dev/branches/branch_5x/solr/core/src/test/org/apache/solr/cloud/SharedFSAutoReplicaFailoverTest.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_5x/solr/core/src/test/org/apache/solr/cloud/SharedFSAutoReplicaFailoverTest.java?rev=1672513&r1=1672512&r2=1672513&view=diff
==============================================================================
--- lucene/dev/branches/branch_5x/solr/core/src/test/org/apache/solr/cloud/SharedFSAutoReplicaFailoverTest.java (original)
+++ lucene/dev/branches/branch_5x/solr/core/src/test/org/apache/solr/cloud/SharedFSAutoReplicaFailoverTest.java Thu Apr 9 22:42:12 2015
@@ -17,6 +17,8 @@ package org.apache.solr.cloud;
* limitations under the License.
*/
+import static org.apache.solr.common.cloud.ZkNodeProps.makeMap;
+
import java.util.Collection;
import java.util.HashSet;
import java.util.Map;
@@ -40,7 +42,6 @@ import org.apache.solr.client.solrj.requ
import org.apache.solr.client.solrj.response.CollectionAdminResponse;
import org.apache.solr.cloud.hdfs.HdfsTestUtil;
import org.apache.solr.common.cloud.ClusterStateUtil;
-import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.cloud.Slice;
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.params.CollectionParams;
@@ -51,7 +52,6 @@ import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
-import static org.apache.solr.common.cloud.ZkNodeProps.makeMap;
@Nightly
@Slow
@@ -146,37 +146,66 @@ public class SharedFSAutoReplicaFailover
assertTrue(response2.isSuccess());
waitForRecoveriesToFinish(collection2, false);
+
+ String collection3 = "solrj_collection3";
+ createCollectionRequest = new Create();
+ createCollectionRequest.setCollectionName(collection3);
+ createCollectionRequest.setNumShards(5);
+ createCollectionRequest.setReplicationFactor(1);
+ createCollectionRequest.setMaxShardsPerNode(1);
+ createCollectionRequest.setConfigName("conf1");
+ createCollectionRequest.setRouterField("myOwnField");
+ createCollectionRequest.setAutoAddReplicas(true);
+ CollectionAdminResponse response3 = createCollectionRequest.process(getCommonCloudSolrClient());
+
+ assertEquals(0, response3.getStatus());
+ assertTrue(response3.isSuccess());
+
+ waitForRecoveriesToFinish(collection3, false);
ChaosMonkey.stop(jettys.get(1));
ChaosMonkey.stop(jettys.get(2));
- Thread.sleep(3000);
+ Thread.sleep(5000);
- assertTrue("Timeout waiting for all live and active", ClusterStateUtil.waitForAllActiveAndLive(cloudClient.getZkStateReader(), collection1, 120000));
+ assertTrue("Timeout waiting for all live and active", ClusterStateUtil.waitForAllActiveAndLiveReplicas(cloudClient.getZkStateReader(), collection1, 120000));
assertSliceAndReplicaCount(collection1);
- assertEquals(4, getLiveAndActiveCount(collection1));
- assertTrue(getLiveAndActiveCount(collection2) < 4);
+ assertEquals(4, ClusterStateUtil.getLiveAndActiveReplicaCount(cloudClient.getZkStateReader(), collection1));
+ assertTrue(ClusterStateUtil.getLiveAndActiveReplicaCount(cloudClient.getZkStateReader(), collection2) < 4);
+ // collection3 has maxShardsPerNode=1, there are 4 standard jetties and one control jetty and 2 nodes stopped
+ ClusterStateUtil.waitForLiveAndActiveReplicaCount(cloudClient.getZkStateReader(), collection3, 3, 30000);
+
+ // collection1 should still be at 4
+ assertEquals(4, ClusterStateUtil.getLiveAndActiveReplicaCount(cloudClient.getZkStateReader(), collection1));
+ // and collection2 less than 4
+ assertTrue(ClusterStateUtil.getLiveAndActiveReplicaCount(cloudClient.getZkStateReader(), collection2) < 4);
+
ChaosMonkey.stop(jettys);
ChaosMonkey.stop(controlJetty);
- assertTrue("Timeout waiting for all not live", ClusterStateUtil.waitForAllNotLive(cloudClient.getZkStateReader(), 45000));
+ assertTrue("Timeout waiting for all not live", ClusterStateUtil.waitForAllReplicasNotLive(cloudClient.getZkStateReader(), 45000));
ChaosMonkey.start(jettys);
ChaosMonkey.start(controlJetty);
- assertTrue("Timeout waiting for all live and active", ClusterStateUtil.waitForAllActiveAndLive(cloudClient.getZkStateReader(), collection1, 120000));
+ assertTrue("Timeout waiting for all live and active", ClusterStateUtil.waitForAllActiveAndLiveReplicas(cloudClient.getZkStateReader(), collection1, 120000));
assertSliceAndReplicaCount(collection1);
-
+ assertSingleReplicationAndShardSize(collection3, 5);
+
int jettyIndex = random().nextInt(jettys.size());
ChaosMonkey.stop(jettys.get(jettyIndex));
ChaosMonkey.start(jettys.get(jettyIndex));
-
- assertTrue("Timeout waiting for all live and active", ClusterStateUtil.waitForAllActiveAndLive(cloudClient.getZkStateReader(), collection1, 60000));
-
+
+ assertTrue("Timeout waiting for all live and active", ClusterStateUtil.waitForAllActiveAndLiveReplicas(cloudClient.getZkStateReader(), collection1, 60000));
+
+ assertSliceAndReplicaCount(collection1);
+
+ assertSingleReplicationAndShardSize(collection3, 5);
+ ClusterStateUtil.waitForLiveAndActiveReplicaCount(cloudClient.getZkStateReader(), collection3, 5, 30000);
//disable autoAddReplicas
Map m = makeMap(
"action", CollectionParams.CollectionAction.CLUSTERPROP.toLower(),
@@ -187,7 +216,7 @@ public class SharedFSAutoReplicaFailover
request.setPath("/admin/collections");
cloudClient.request(request);
- int currentCount = getLiveAndActiveCount(collection1);
+ int currentCount = ClusterStateUtil.getLiveAndActiveReplicaCount(cloudClient.getZkStateReader(), collection1);
ChaosMonkey.stop(jettys.get(3));
@@ -195,7 +224,7 @@ public class SharedFSAutoReplicaFailover
//Hence waiting for 30 seconds to be on the safe side.
Thread.sleep(30000);
//Ensures that autoAddReplicas has not kicked in.
- assertTrue(currentCount > getLiveAndActiveCount(collection1));
+ assertTrue(currentCount > ClusterStateUtil.getLiveAndActiveReplicaCount(cloudClient.getZkStateReader(), collection1));
//enable autoAddReplicas
m = makeMap(
@@ -206,24 +235,17 @@ public class SharedFSAutoReplicaFailover
request.setPath("/admin/collections");
cloudClient.request(request);
- assertTrue("Timeout waiting for all live and active", ClusterStateUtil.waitForAllActiveAndLive(cloudClient.getZkStateReader(), collection1, 60000));
+ assertTrue("Timeout waiting for all live and active", ClusterStateUtil.waitForAllActiveAndLiveReplicas(cloudClient.getZkStateReader(), collection1, 60000));
assertSliceAndReplicaCount(collection1);
}
-
- private int getLiveAndActiveCount(String collection1) {
+
+ private void assertSingleReplicationAndShardSize(String collection, int numSlices) {
Collection<Slice> slices;
- slices = cloudClient.getZkStateReader().getClusterState().getActiveSlices(collection1);
- int liveAndActive = 0;
+ slices = cloudClient.getZkStateReader().getClusterState().getActiveSlices(collection);
+ assertEquals(numSlices, slices.size());
for (Slice slice : slices) {
- for (Replica replica : slice.getReplicas()) {
- boolean live = cloudClient.getZkStateReader().getClusterState().liveNodesContain(replica.getNodeName());
- boolean active = replica.getState() == Replica.State.ACTIVE;
- if (live && active) {
- liveAndActive++;
- }
- }
+ assertEquals(1, slice.getReplicas().size());
}
- return liveAndActive;
}
private void assertSliceAndReplicaCount(String collection) {
Modified: lucene/dev/branches/branch_5x/solr/solrj/src/java/org/apache/solr/common/cloud/ClusterStateUtil.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_5x/solr/solrj/src/java/org/apache/solr/common/cloud/ClusterStateUtil.java?rev=1672513&r1=1672512&r2=1672513&view=diff
==============================================================================
--- lucene/dev/branches/branch_5x/solr/solrj/src/java/org/apache/solr/common/cloud/ClusterStateUtil.java (original)
+++ lucene/dev/branches/branch_5x/solr/solrj/src/java/org/apache/solr/common/cloud/ClusterStateUtil.java Thu Apr 9 22:42:12 2015
@@ -41,8 +41,8 @@ public class ClusterStateUtil {
* how long to wait before giving up
* @return false if timed out
*/
- public static boolean waitForAllActiveAndLive(ZkStateReader zkStateReader, int timeoutInMs) {
- return waitForAllActiveAndLive(zkStateReader, null, timeoutInMs);
+ public static boolean waitForAllActiveAndLiveReplicas(ZkStateReader zkStateReader, int timeoutInMs) {
+ return waitForAllActiveAndLiveReplicas(zkStateReader, null, timeoutInMs);
}
/**
@@ -55,12 +55,12 @@ public class ClusterStateUtil {
* how long to wait before giving up
* @return false if timed out
*/
- public static boolean waitForAllActiveAndLive(ZkStateReader zkStateReader, String collection,
+ public static boolean waitForAllActiveAndLiveReplicas(ZkStateReader zkStateReader, String collection,
int timeoutInMs) {
long timeout = System.nanoTime()
+ TimeUnit.NANOSECONDS.convert(timeoutInMs, TimeUnit.MILLISECONDS);
boolean success = false;
- while (System.nanoTime() < timeout) {
+ while (!success && System.nanoTime() < timeout) {
success = true;
ClusterState clusterState = zkStateReader.getClusterState();
if (clusterState != null) {
@@ -119,7 +119,7 @@ public class ClusterStateUtil {
* how long to wait before giving up
* @return false if timed out
*/
- public static boolean waitToSeeLive(ZkStateReader zkStateReader,
+ public static boolean waitToSeeLiveReplica(ZkStateReader zkStateReader,
String collection, String coreNodeName, String baseUrl,
int timeoutInMs) {
long timeout = System.nanoTime()
@@ -162,17 +162,17 @@ public class ClusterStateUtil {
return false;
}
- public static boolean waitForAllNotLive(ZkStateReader zkStateReader, int timeoutInMs) {
- return waitForAllNotLive(zkStateReader, null, timeoutInMs);
+ public static boolean waitForAllReplicasNotLive(ZkStateReader zkStateReader, int timeoutInMs) {
+ return waitForAllReplicasNotLive(zkStateReader, null, timeoutInMs);
}
- public static boolean waitForAllNotLive(ZkStateReader zkStateReader,
+ public static boolean waitForAllReplicasNotLive(ZkStateReader zkStateReader,
String collection, int timeoutInMs) {
long timeout = System.nanoTime()
+ TimeUnit.NANOSECONDS.convert(timeoutInMs, TimeUnit.MILLISECONDS);
boolean success = false;
- while (System.nanoTime() < timeout) {
+ while (!success && System.nanoTime() < timeout) {
success = true;
ClusterState clusterState = zkStateReader.getClusterState();
if (clusterState != null) {
@@ -213,6 +213,44 @@ public class ClusterStateUtil {
}
return success;
+ }
+
+ public static int getLiveAndActiveReplicaCount(ZkStateReader zkStateReader, String collection) {
+ Collection<Slice> slices;
+ slices = zkStateReader.getClusterState().getActiveSlices(collection);
+ int liveAndActive = 0;
+ for (Slice slice : slices) {
+ for (Replica replica : slice.getReplicas()) {
+ boolean live = zkStateReader.getClusterState().liveNodesContain(replica.getNodeName());
+ boolean active = replica.getState() == Replica.State.ACTIVE;
+ if (live && active) {
+ liveAndActive++;
+ }
+ }
+ }
+ return liveAndActive;
+ }
+
+ public static boolean waitForLiveAndActiveReplicaCount(ZkStateReader zkStateReader,
+ String collection, int replicaCount, int timeoutInMs) {
+ long timeout = System.nanoTime()
+ + TimeUnit.NANOSECONDS.convert(timeoutInMs, TimeUnit.MILLISECONDS);
+ boolean success = false;
+ while (!success && System.nanoTime() < timeout) {
+ success = getLiveAndActiveReplicaCount(zkStateReader, collection) == replicaCount;
+
+ if (!success) {
+ try {
+ Thread.sleep(TIMEOUT_POLL_MS);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new SolrException(ErrorCode.SERVER_ERROR, "Interrupted");
+ }
+ }
+
+ }
+
+ return success;
}
public static boolean isAutoAddReplicas(ZkStateReader reader, String collection) {