You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ab...@apache.org on 2017/07/04 11:26:15 UTC
lucene-solr:master: SOLR-10878: MOVEREPLICA command may lose data
when replicationFactor==1.
Repository: lucene-solr
Updated Branches:
refs/heads/master e9d33ee5a -> bc37e8b4c
SOLR-10878: MOVEREPLICA command may lose data when replicationFactor==1.
Project: http://git-wip-us.apache.org/repos/asf/lucene-solr/repo
Commit: http://git-wip-us.apache.org/repos/asf/lucene-solr/commit/bc37e8b4
Tree: http://git-wip-us.apache.org/repos/asf/lucene-solr/tree/bc37e8b4
Diff: http://git-wip-us.apache.org/repos/asf/lucene-solr/diff/bc37e8b4
Branch: refs/heads/master
Commit: bc37e8b4cc45794af9271c158ec36b2902cddc2f
Parents: e9d33ee
Author: Andrzej Bialecki <ab...@apache.org>
Authored: Tue Jul 4 11:33:41 2017 +0200
Committer: Andrzej Bialecki <ab...@apache.org>
Committed: Tue Jul 4 13:25:47 2017 +0200
----------------------------------------------------------------------
solr/CHANGES.txt | 2 +
.../org/apache/solr/cloud/AddReplicaCmd.java | 2 +-
.../org/apache/solr/cloud/DeleteReplicaCmd.java | 2 +-
.../org/apache/solr/cloud/MoveReplicaCmd.java | 43 ++++++++--
.../cloud/OverseerCollectionMessageHandler.java | 23 +++---
.../org/apache/solr/cloud/ReplaceNodeCmd.java | 43 +++++++---
.../apache/solr/cloud/MoveReplicaHDFSTest.java | 53 ++++++++++++
.../org/apache/solr/cloud/MoveReplicaTest.java | 84 +++++++++++++++++++-
8 files changed, 220 insertions(+), 32 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/bc37e8b4/solr/CHANGES.txt
----------------------------------------------------------------------
diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt
index 360916b..403b6cd 100644
--- a/solr/CHANGES.txt
+++ b/solr/CHANGES.txt
@@ -299,6 +299,8 @@ Bug Fixes
* SOLR-6807: CloudSolrClient's ZK state version check with the server was ignored when handleSelect=false
(David Smiley)
+* SOLR-10878: MOVEREPLICA command may lose data when replicationFactor is 1. (ab, shalin)
+
Optimizations
----------------------
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/bc37e8b4/solr/core/src/java/org/apache/solr/cloud/AddReplicaCmd.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/AddReplicaCmd.java b/solr/core/src/java/org/apache/solr/cloud/AddReplicaCmd.java
index 63acdd1..c42d073 100644
--- a/solr/core/src/java/org/apache/solr/cloud/AddReplicaCmd.java
+++ b/solr/core/src/java/org/apache/solr/cloud/AddReplicaCmd.java
@@ -68,7 +68,7 @@ public class AddReplicaCmd implements OverseerCollectionMessageHandler.Cmd {
ZkNodeProps addReplica(ClusterState clusterState, ZkNodeProps message, NamedList results, Runnable onComplete)
throws KeeperException, InterruptedException {
- log.info("addReplica() : {}", Utils.toJSONString(message));
+ log.debug("addReplica() : {}", Utils.toJSONString(message));
String collection = message.getStr(COLLECTION_PROP);
String node = message.getStr(CoreAdminParams.NODE);
String shard = message.getStr(SHARD_ID_PROP);
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/bc37e8b4/solr/core/src/java/org/apache/solr/cloud/DeleteReplicaCmd.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/DeleteReplicaCmd.java b/solr/core/src/java/org/apache/solr/cloud/DeleteReplicaCmd.java
index b79fa46..e71d7e8 100644
--- a/solr/core/src/java/org/apache/solr/cloud/DeleteReplicaCmd.java
+++ b/solr/core/src/java/org/apache/solr/cloud/DeleteReplicaCmd.java
@@ -265,7 +265,7 @@ public class DeleteReplicaCmd implements Cmd {
try {
if (!callable.call())
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
- "Could not remove replica : " + collectionName + "/" + shard + "/" + replicaName);
+ "Could not remove replica : " + collectionName + "/" + shard + "/" + replicaName);
} catch (InterruptedException | KeeperException e) {
throw e;
} catch (Exception ex) {
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/bc37e8b4/solr/core/src/java/org/apache/solr/cloud/MoveReplicaCmd.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/MoveReplicaCmd.java b/solr/core/src/java/org/apache/solr/cloud/MoveReplicaCmd.java
index fed1398..53d05e1 100644
--- a/solr/core/src/java/org/apache/solr/cloud/MoveReplicaCmd.java
+++ b/solr/core/src/java/org/apache/solr/cloud/MoveReplicaCmd.java
@@ -22,6 +22,8 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Locale;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.cloud.ClusterState;
@@ -29,6 +31,7 @@ import org.apache.solr.common.cloud.DocCollection;
import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.cloud.Slice;
import org.apache.solr.common.cloud.ZkNodeProps;
+import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.params.CoreAdminParams;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.common.util.Utils;
@@ -56,10 +59,11 @@ public class MoveReplicaCmd implements Cmd{
}
private void moveReplica(ClusterState clusterState, ZkNodeProps message, NamedList results) throws Exception {
- log.info("moveReplica() : {}", Utils.toJSONString(message));
+ log.debug("moveReplica() : {}", Utils.toJSONString(message));
ocmh.checkRequired(message, COLLECTION_PROP, "targetNode");
String collection = message.getStr(COLLECTION_PROP);
String targetNode = message.getStr("targetNode");
+ int timeout = message.getInt("timeout", 10 * 60); // 10 minutes
String async = message.getStr(ASYNC);
@@ -103,14 +107,14 @@ public class MoveReplicaCmd implements Cmd{
assert slice != null;
Object dataDir = replica.get("dataDir");
if (dataDir != null && dataDir.toString().startsWith("hdfs:/")) {
- moveHdfsReplica(clusterState, results, dataDir.toString(), targetNode, async, coll, replica, slice);
+ moveHdfsReplica(clusterState, results, dataDir.toString(), targetNode, async, coll, replica, slice, timeout);
} else {
- moveNormalReplica(clusterState, results, targetNode, async, coll, replica, slice);
+ moveNormalReplica(clusterState, results, targetNode, async, coll, replica, slice, timeout);
}
}
private void moveHdfsReplica(ClusterState clusterState, NamedList results, String dataDir, String targetNode, String async,
- DocCollection coll, Replica replica, Slice slice) throws Exception {
+ DocCollection coll, Replica replica, Slice slice, int timeout) throws Exception {
String newCoreName = Assign.buildCoreName(coll, slice.getName(), replica.getType());
ZkNodeProps removeReplicasProps = new ZkNodeProps(
@@ -154,7 +158,7 @@ public class MoveReplicaCmd implements Cmd{
}
private void moveNormalReplica(ClusterState clusterState, NamedList results, String targetNode, String async,
- DocCollection coll, Replica replica, Slice slice) throws Exception {
+ DocCollection coll, Replica replica, Slice slice, int timeout) throws Exception {
String newCoreName = Assign.buildCoreName(coll, slice.getName(), replica.getType());
ZkNodeProps addReplicasProps = new ZkNodeProps(
COLLECTION_PROP, coll.getName(),
@@ -163,20 +167,47 @@ public class MoveReplicaCmd implements Cmd{
CoreAdminParams.NAME, newCoreName);
if(async!=null) addReplicasProps.getProperties().put(ASYNC, async);
NamedList addResult = new NamedList();
+ CountDownLatch countDownLatch = new CountDownLatch(1);
+ ReplaceNodeCmd.RecoveryWatcher watcher = null;
+ if (replica.equals(slice.getLeader())) {
+ watcher = new ReplaceNodeCmd.RecoveryWatcher(coll.getName(), slice.getName(),
+ replica.getName(), null, countDownLatch);
+ ocmh.zkStateReader.registerCollectionStateWatcher(coll.getName(), watcher);
+ }
ocmh.addReplica(clusterState, addReplicasProps, addResult, null);
if (addResult.get("failure") != null) {
String errorString = String.format(Locale.ROOT, "Failed to create replica for collection=%s shard=%s" +
" on node=%s", coll.getName(), slice.getName(), targetNode);
log.warn(errorString);
results.add("failure", errorString);
+ if (watcher != null) { // unregister
+ ocmh.zkStateReader.registerCollectionStateWatcher(coll.getName(), watcher);
+ }
return;
}
+ // wait for the other replica to be active if the source replica was a leader
+ if (watcher != null) {
+ try {
+ log.debug("Waiting for leader's replica to recover.");
+ if (!countDownLatch.await(timeout, TimeUnit.SECONDS)) {
+ String errorString = String.format(Locale.ROOT, "Timed out waiting for leader's replica to recover, collection=%s shard=%s" +
+ " on node=%s", coll.getName(), slice.getName(), targetNode);
+ log.warn(errorString);
+ results.add("failure", errorString);
+ return;
+ } else {
+ log.debug("Replica " + watcher.getRecoveredReplica() + " is active - deleting the source...");
+ }
+ } finally {
+ ocmh.zkStateReader.removeCollectionStateWatcher(coll.getName(), watcher);
+ }
+ }
ZkNodeProps removeReplicasProps = new ZkNodeProps(
COLLECTION_PROP, coll.getName(),
SHARD_ID_PROP, slice.getName(),
REPLICA_PROP, replica.getName());
- if(async!=null) removeReplicasProps.getProperties().put(ASYNC, async);
+ if (async != null) removeReplicasProps.getProperties().put(ASYNC, async);
NamedList deleteResult = new NamedList();
ocmh.deleteReplica(clusterState, removeReplicasProps, deleteResult, null);
if (deleteResult.get("failure") != null) {
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/bc37e8b4/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionMessageHandler.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionMessageHandler.java b/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionMessageHandler.java
index a8d74e8..2c55f3c 100644
--- a/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionMessageHandler.java
+++ b/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionMessageHandler.java
@@ -419,20 +419,25 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler
boolean waitForCoreNodeGone(String collectionName, String shard, String replicaName, int timeoutms) throws InterruptedException {
TimeOut timeout = new TimeOut(timeoutms, TimeUnit.MILLISECONDS);
- boolean deleted = false;
- while (! timeout.hasTimedOut()) {
- Thread.sleep(100);
- DocCollection docCollection = zkStateReader.getClusterState().getCollection(collectionName);
- if(docCollection != null) {
+ // TODO: remove this workaround for SOLR-9440
+ zkStateReader.registerCore(collectionName);
+ try {
+ while (! timeout.hasTimedOut()) {
+ Thread.sleep(100);
+ DocCollection docCollection = zkStateReader.getClusterState().getCollection(collectionName);
+ if (docCollection == null) { // someone already deleted the collection
+ return true;
+ }
Slice slice = docCollection.getSlice(shard);
if(slice == null || slice.getReplica(replicaName) == null) {
- deleted = true;
+ return true;
}
}
- // Return true if either someone already deleted the collection/slice/replica.
- if (docCollection == null || deleted) break;
+ // replica still exists after the timeout
+ return false;
+ } finally {
+ zkStateReader.unregisterCore(collectionName);
}
- return deleted;
}
void deleteCoreNode(String collectionName, String replicaName, Replica replica, String core) throws KeeperException, InterruptedException {
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/bc37e8b4/solr/core/src/java/org/apache/solr/cloud/ReplaceNodeCmd.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/ReplaceNodeCmd.java b/solr/core/src/java/org/apache/solr/cloud/ReplaceNodeCmd.java
index 5adbe8c..ba60908 100644
--- a/solr/core/src/java/org/apache/solr/cloud/ReplaceNodeCmd.java
+++ b/solr/core/src/java/org/apache/solr/cloud/ReplaceNodeCmd.java
@@ -93,15 +93,6 @@ public class ReplaceNodeCmd implements OverseerCollectionMessageHandler.Cmd {
CountDownLatch replicasToRecover = new CountDownLatch(numLeaders);
for (ZkNodeProps sourceReplica : sourceReplicas) {
- if (sourceReplica.getBool(ZkStateReader.LEADER_PROP, false)) {
- String shardName = sourceReplica.getStr(SHARD_ID_PROP);
- String replicaName = sourceReplica.getStr(ZkStateReader.REPLICA_PROP);
- String collectionName = sourceReplica.getStr(COLLECTION_PROP);
- String key = collectionName + "_" + replicaName;
- RecoveryWatcher watcher = new RecoveryWatcher(collectionName, shardName, replicaName, replicasToRecover);
- watchers.put(key, watcher);
- zkStateReader.registerCollectionStateWatcher(collectionName, watcher);
- }
NamedList nl = new NamedList();
log.info("Going to create replica for collection={} shard={} on node={}", sourceReplica.getStr(COLLECTION_PROP), sourceReplica.getStr(SHARD_ID_PROP), target);
ZkNodeProps msg = sourceReplica.plus("parallel", String.valueOf(parallel)).plus(CoreAdminParams.NODE, target);
@@ -128,6 +119,16 @@ public class ReplaceNodeCmd implements OverseerCollectionMessageHandler.Cmd {
if (addedReplica != null) {
createdReplicas.add(addedReplica);
+ if (sourceReplica.getBool(ZkStateReader.LEADER_PROP, false)) {
+ String shardName = sourceReplica.getStr(SHARD_ID_PROP);
+ String replicaName = sourceReplica.getStr(ZkStateReader.REPLICA_PROP);
+ String collectionName = sourceReplica.getStr(COLLECTION_PROP);
+ String key = collectionName + "_" + replicaName;
+ RecoveryWatcher watcher = new RecoveryWatcher(collectionName, shardName, replicaName,
+ addedReplica.getStr(ZkStateReader.CORE_NAME_PROP), replicasToRecover);
+ watchers.put(key, watcher);
+ zkStateReader.registerCollectionStateWatcher(collectionName, watcher);
+ }
}
}
@@ -208,16 +209,27 @@ public class ReplaceNodeCmd implements OverseerCollectionMessageHandler.Cmd {
}
// we use this watcher to wait for replicas to recover
- private static class RecoveryWatcher implements CollectionStateWatcher {
+ static class RecoveryWatcher implements CollectionStateWatcher {
String collectionId;
String shardId;
String replicaId;
+ String targetCore;
CountDownLatch countDownLatch;
+ Replica recovered;
- RecoveryWatcher(String collectionId, String shardId, String replicaId, CountDownLatch countDownLatch) {
+ /**
+ * Watch for recovery of a replica
+ * @param collectionId collection name
+ * @param shardId shard id
+ * @param replicaId source replica name (coreNodeName)
+ * @param targetCore specific target core name - if null then any active replica will do
+ * @param countDownLatch countdown when recovered
+ */
+ RecoveryWatcher(String collectionId, String shardId, String replicaId, String targetCore, CountDownLatch countDownLatch) {
this.collectionId = collectionId;
this.shardId = shardId;
this.replicaId = replicaId;
+ this.targetCore = targetCore;
this.countDownLatch = countDownLatch;
}
@@ -241,7 +253,12 @@ public class ReplaceNodeCmd implements OverseerCollectionMessageHandler.Cmd {
continue;
}
// check its state
+ String coreName = replica.getStr(ZkStateReader.CORE_NAME_PROP);
+ if (targetCore != null && !targetCore.equals(coreName)) {
+ continue;
+ }
if (replica.isActive(liveNodes)) { // recovered - stop waiting
+ recovered = replica;
countDownLatch.countDown();
return true;
}
@@ -250,5 +267,9 @@ public class ReplaceNodeCmd implements OverseerCollectionMessageHandler.Cmd {
// set the watch again to wait for the new replica to recover
return false;
}
+
+ public Replica getRecoveredReplica() {
+ return recovered;
+ }
}
}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/bc37e8b4/solr/core/src/test/org/apache/solr/cloud/MoveReplicaHDFSTest.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/MoveReplicaHDFSTest.java b/solr/core/src/test/org/apache/solr/cloud/MoveReplicaHDFSTest.java
new file mode 100644
index 0000000..884d49e
--- /dev/null
+++ b/solr/core/src/test/org/apache/solr/cloud/MoveReplicaHDFSTest.java
@@ -0,0 +1,53 @@
+package org.apache.solr.cloud;
+
+import com.carrotsearch.randomizedtesting.ThreadFilter;
+import com.carrotsearch.randomizedtesting.annotations.ThreadLeakFilters;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.solr.cloud.hdfs.HdfsTestUtil;
+import org.apache.solr.common.cloud.ZkConfigManager;
+import org.apache.solr.util.BadHdfsThreadsFilter;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+
+/**
+ *
+ */
+@ThreadLeakFilters(defaultFilters = true, filters = {
+ BadHdfsThreadsFilter.class, // hdfs currently leaks thread(s)
+ MoveReplicaHDFSTest.ForkJoinThreadsFilter.class
+})
+public class MoveReplicaHDFSTest extends MoveReplicaTest {
+
+ private static MiniDFSCluster dfsCluster;
+
+ @BeforeClass
+ public static void setupClass() throws Exception {
+ dfsCluster = HdfsTestUtil.setupClass(createTempDir().toFile().getAbsolutePath());
+
+ ZkConfigManager configManager = new ZkConfigManager(zkClient());
+ configManager.uploadConfigDir(configset("cloud-hdfs"), "conf1");
+
+ System.setProperty("solr.hdfs.home", HdfsTestUtil.getDataDir(dfsCluster, "data"));
+ }
+
+ @AfterClass
+ public static void teardownClass() throws Exception {
+ cluster.shutdown(); // need to close before the MiniDFSCluster
+ HdfsTestUtil.teardownClass(dfsCluster);
+ dfsCluster = null;
+ }
+
+
+ public static class ForkJoinThreadsFilter implements ThreadFilter {
+
+ @Override
+ public boolean reject(Thread t) {
+ String name = t.getName();
+ if (name.startsWith("ForkJoinPool.commonPool")) {
+ return true;
+ }
+ return false;
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/bc37e8b4/solr/core/src/test/org/apache/solr/cloud/MoveReplicaTest.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/MoveReplicaTest.java b/solr/core/src/test/org/apache/solr/cloud/MoveReplicaTest.java
index 4368fea..8f00431 100644
--- a/solr/core/src/test/org/apache/solr/cloud/MoveReplicaTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/MoveReplicaTest.java
@@ -20,6 +20,7 @@ package org.apache.solr.cloud;
import java.io.IOException;
import java.lang.invoke.MethodHandles;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Set;
@@ -31,8 +32,10 @@ import org.apache.solr.client.solrj.request.CollectionAdminRequest;
import org.apache.solr.client.solrj.request.CoreAdminRequest;
import org.apache.solr.client.solrj.response.CoreAdminResponse;
import org.apache.solr.client.solrj.response.RequestStatusState;
+import org.apache.solr.common.cloud.DocCollection;
import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.cloud.Slice;
+import org.apache.solr.common.cloud.ZkStateReader;
import org.junit.BeforeClass;
import org.junit.Test;
import org.slf4j.Logger;
@@ -40,6 +43,7 @@ import org.slf4j.LoggerFactory;
public class MoveReplicaTest extends SolrCloudTestCase {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
@BeforeClass
public static void setupCluster() throws Exception {
configureCluster(4)
@@ -56,10 +60,11 @@ public class MoveReplicaTest extends SolrCloudTestCase {
cluster.waitForAllNodes(5000);
String coll = "movereplicatest_coll";
log.info("total_jettys: " + cluster.getJettySolrRunners().size());
+ int REPLICATION = 2;
CloudSolrClient cloudClient = cluster.getSolrClient();
- CollectionAdminRequest.Create create = CollectionAdminRequest.createCollection(coll, "conf1", 2, 2);
+ CollectionAdminRequest.Create create = CollectionAdminRequest.createCollection(coll, "conf1", 2, REPLICATION);
create.setMaxShardsPerNode(2);
cloudClient.request(create);
@@ -94,16 +99,87 @@ public class MoveReplicaTest extends SolrCloudTestCase {
break;
}
assertFalse(rsp.getRequestStatus() == RequestStatusState.FAILED);
- Thread.sleep(50);
+ Thread.sleep(500);
}
assertTrue(success);
checkNumOfCores(cloudClient, replica.getNodeName(), 0);
- checkNumOfCores(cloudClient, targetNode, 2);
+ assertTrue("should be at least one core on target node!", getNumOfCores(cloudClient, targetNode) > 0);
+ // wait for recovery
+ boolean recovered = false;
+ for (int i = 0; i < 300; i++) {
+ DocCollection collState = getCollectionState(coll);
+ log.debug("###### " + collState);
+ Collection<Replica> replicas = collState.getSlice(shardId).getReplicas();
+ boolean allActive = true;
+ boolean hasLeaders = true;
+ if (replicas != null && !replicas.isEmpty()) {
+ for (Replica r : replicas) {
+ if (!r.getNodeName().equals(targetNode)) {
+ continue;
+ }
+ if (!r.isActive(Collections.singleton(targetNode))) {
+ log.info("Not active: " + r);
+ allActive = false;
+ }
+ }
+ } else {
+ allActive = false;
+ }
+ for (Slice slice : collState.getSlices()) {
+ if (slice.getLeader() == null) {
+ hasLeaders = false;
+ }
+ }
+ if (allActive && hasLeaders) {
+ // check the number of active replicas
+ assertEquals("total number of replicas", REPLICATION, replicas.size());
+ recovered = true;
+ break;
+ } else {
+ log.info("--- waiting, allActive=" + allActive + ", hasLeaders=" + hasLeaders);
+ Thread.sleep(1000);
+ }
+ }
+ assertTrue("replica never fully recovered", recovered);
moveReplica = new CollectionAdminRequest.MoveReplica(coll, shardId, targetNode, replica.getNodeName());
moveReplica.process(cloudClient);
checkNumOfCores(cloudClient, replica.getNodeName(), 1);
- checkNumOfCores(cloudClient, targetNode, 1);
+ // wait for recovery
+ recovered = false;
+ for (int i = 0; i < 300; i++) {
+ DocCollection collState = getCollectionState(coll);
+ log.debug("###### " + collState);
+ Collection<Replica> replicas = collState.getSlice(shardId).getReplicas();
+ boolean allActive = true;
+ boolean hasLeaders = true;
+ if (replicas != null && !replicas.isEmpty()) {
+ for (Replica r : replicas) {
+ if (!r.getNodeName().equals(replica.getNodeName())) {
+ continue;
+ }
+ if (!r.isActive(Collections.singleton(replica.getNodeName()))) {
+ log.info("Not active yet: " + r);
+ allActive = false;
+ }
+ }
+ } else {
+ allActive = false;
+ }
+ for (Slice slice : collState.getSlices()) {
+ if (slice.getLeader() == null) {
+ hasLeaders = false;
+ }
+ }
+ if (allActive && hasLeaders) {
+ assertEquals("total number of replicas", REPLICATION, replicas.size());
+ recovered = true;
+ break;
+ } else {
+ Thread.sleep(1000);
+ }
+ }
+ assertTrue("replica never fully recovered", recovered);
}
private Replica getRandomReplica(String coll, CloudSolrClient cloudClient) {