You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ho...@apache.org on 2017/06/14 02:02:57 UTC
[21/35] lucene-solr:jira/SOLR-10834: SOLR-10704 Wait until all leader
replicas are recovered before deleting the originals.
SOLR-10704 Wait until all leader replicas are recovered before deleting
the originals.
Project: http://git-wip-us.apache.org/repos/asf/lucene-solr/repo
Commit: http://git-wip-us.apache.org/repos/asf/lucene-solr/commit/232eff08
Tree: http://git-wip-us.apache.org/repos/asf/lucene-solr/tree/232eff08
Diff: http://git-wip-us.apache.org/repos/asf/lucene-solr/diff/232eff08
Branch: refs/heads/jira/SOLR-10834
Commit: 232eff0893bccb93d01042f26a00e50870be2f29
Parents: f29e2d1
Author: Andrzej Bialecki <ab...@apache.org>
Authored: Tue Jun 13 17:36:51 2017 +0200
Committer: Andrzej Bialecki <ab...@apache.org>
Committed: Tue Jun 13 17:39:26 2017 +0200
----------------------------------------------------------------------
solr/CHANGES.txt | 3 +
.../org/apache/solr/cloud/ReplaceNodeCmd.java | 96 +++++++++++++++++++-
.../org/apache/solr/cloud/ReplaceNodeTest.java | 9 +-
3 files changed, 102 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/232eff08/solr/CHANGES.txt
----------------------------------------------------------------------
diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt
index 153ed82..d2a26c0 100644
--- a/solr/CHANGES.txt
+++ b/solr/CHANGES.txt
@@ -364,6 +364,9 @@ Bug Fixes
* SOLR-10835: Add support for point fields in Export Handler (Tomás Fernández Löbbe)
+* SOLR-10704: REPLACENODE may cause data loss when replicationFactor is 1. (ab, shalin)
+
+
Optimizations
----------------------
* SOLR-10634: JSON Facet API: When a field/terms facet will retrieve all buckets (i.e. limit:-1)
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/232eff08/solr/core/src/java/org/apache/solr/cloud/ReplaceNodeCmd.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/ReplaceNodeCmd.java b/solr/core/src/java/org/apache/solr/cloud/ReplaceNodeCmd.java
index e4240be..5adbe8c 100644
--- a/solr/core/src/java/org/apache/solr/cloud/ReplaceNodeCmd.java
+++ b/solr/core/src/java/org/apache/solr/cloud/ReplaceNodeCmd.java
@@ -20,15 +20,18 @@ package org.apache.solr.cloud;
import java.lang.invoke.MethodHandles;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
+import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.cloud.ClusterState;
+import org.apache.solr.common.cloud.CollectionStateWatcher;
import org.apache.solr.common.cloud.DocCollection;
import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.cloud.Slice;
@@ -60,6 +63,7 @@ public class ReplaceNodeCmd implements OverseerCollectionMessageHandler.Cmd {
String source = message.getStr("source");
String target = message.getStr("target");
String async = message.getStr("async");
+ int timeout = message.getInt("timeout", 10 * 60); // 10 minutes
boolean parallel = message.getBool("parallel", false);
ClusterState clusterState = zkStateReader.getClusterState();
@@ -70,13 +74,34 @@ public class ReplaceNodeCmd implements OverseerCollectionMessageHandler.Cmd {
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Target Node: " + target + " is not live");
}
List<ZkNodeProps> sourceReplicas = getReplicasOfNode(source, clusterState);
-
+ // how many leaders are we moving? for these replicas we have to make sure that either:
+ // * another existing replica can become a leader, or
+ // * we wait until the newly created replica completes recovery (and can become the new leader)
+ int numLeaders = 0;
+ for (ZkNodeProps props : sourceReplicas) {
+ if (props.getBool(ZkStateReader.LEADER_PROP, false)) {
+ numLeaders++;
+ }
+ }
+ // map of collectionName_coreNodeName to watchers
+ Map<String, RecoveryWatcher> watchers = new HashMap<>();
List<ZkNodeProps> createdReplicas = new ArrayList<>();
AtomicBoolean anyOneFailed = new AtomicBoolean(false);
CountDownLatch countDownLatch = new CountDownLatch(sourceReplicas.size());
+ CountDownLatch replicasToRecover = new CountDownLatch(numLeaders);
+
for (ZkNodeProps sourceReplica : sourceReplicas) {
+ if (sourceReplica.getBool(ZkStateReader.LEADER_PROP, false)) {
+ String shardName = sourceReplica.getStr(SHARD_ID_PROP);
+ String replicaName = sourceReplica.getStr(ZkStateReader.REPLICA_PROP);
+ String collectionName = sourceReplica.getStr(COLLECTION_PROP);
+ String key = collectionName + "_" + replicaName;
+ RecoveryWatcher watcher = new RecoveryWatcher(collectionName, shardName, replicaName, replicasToRecover);
+ watchers.put(key, watcher);
+ zkStateReader.registerCollectionStateWatcher(collectionName, watcher);
+ }
NamedList nl = new NamedList();
log.info("Going to create replica for collection={} shard={} on node={}", sourceReplica.getStr(COLLECTION_PROP), sourceReplica.getStr(SHARD_ID_PROP), target);
ZkNodeProps msg = sourceReplica.plus("parallel", String.valueOf(parallel)).plus(CoreAdminParams.NODE, target);
@@ -106,10 +131,26 @@ public class ReplaceNodeCmd implements OverseerCollectionMessageHandler.Cmd {
}
}
- log.debug("Waiting for replace node action to complete");
- countDownLatch.await(5, TimeUnit.MINUTES);
- log.debug("Finished waiting for replace node action to complete");
+ log.debug("Waiting for replicas to be added");
+ if (!countDownLatch.await(timeout, TimeUnit.SECONDS)) {
+ log.info("Timed out waiting for replicas to be added");
+ anyOneFailed.set(true);
+ } else {
+ log.debug("Finished waiting for replicas to be added");
+ }
+ // now wait for leader replicas to recover
+ log.debug("Waiting for " + numLeaders + " leader replicas to recover");
+ if (!replicasToRecover.await(timeout, TimeUnit.SECONDS)) {
+ log.info("Timed out waiting for " + replicasToRecover.getCount() + " leader replicas to recover");
+ anyOneFailed.set(true);
+ } else {
+ log.debug("Finished waiting for leader replicas to recover");
+ }
+ // remove the watchers, we're done either way
+ for (RecoveryWatcher watcher : watchers.values()) {
+ zkStateReader.removeCollectionStateWatcher(watcher.collectionId, watcher);
+ }
if (anyOneFailed.get()) {
log.info("Failed to create some replicas. Cleaning up all replicas on target node");
CountDownLatch cleanupLatch = new CountDownLatch(createdReplicas.size());
@@ -134,6 +175,7 @@ public class ReplaceNodeCmd implements OverseerCollectionMessageHandler.Cmd {
}
}
cleanupLatch.await(5, TimeUnit.MINUTES);
+ return;
}
@@ -155,6 +197,7 @@ public class ReplaceNodeCmd implements OverseerCollectionMessageHandler.Cmd {
ZkStateReader.CORE_NAME_PROP, replica.getCoreName(),
ZkStateReader.REPLICA_PROP, replica.getName(),
ZkStateReader.REPLICA_TYPE, replica.getType().name(),
+ ZkStateReader.LEADER_PROP, String.valueOf(replica.equals(slice.getLeader())),
CoreAdminParams.NODE, source);
sourceReplicas.add(props);
}
@@ -163,4 +206,49 @@ public class ReplaceNodeCmd implements OverseerCollectionMessageHandler.Cmd {
}
return sourceReplicas;
}
+
+ // we use this watcher to wait for replicas to recover
+ private static class RecoveryWatcher implements CollectionStateWatcher {
+ String collectionId;
+ String shardId;
+ String replicaId;
+ CountDownLatch countDownLatch;
+
+ RecoveryWatcher(String collectionId, String shardId, String replicaId, CountDownLatch countDownLatch) {
+ this.collectionId = collectionId;
+ this.shardId = shardId;
+ this.replicaId = replicaId;
+ this.countDownLatch = countDownLatch;
+ }
+
+ @Override
+ public boolean onStateChanged(Set<String> liveNodes, DocCollection collectionState) {
+ if (collectionState == null) { // collection has been deleted - don't wait
+ countDownLatch.countDown();
+ return true;
+ }
+ Slice slice = collectionState.getSlice(shardId);
+ if (slice == null) { // shard has been removed - don't wait
+ countDownLatch.countDown();
+ return true;
+ }
+ for (Replica replica : slice.getReplicas()) {
+ // check if another replica exists - doesn't have to be the one we're moving
+ // as long as it's active and can become a leader, in which case we don't have to wait
+ // for recovery of specifically the one that we've just added
+ if (!replica.getName().equals(replicaId)) {
+ if (replica.getType().equals(Replica.Type.PULL)) { // not eligible for leader election
+ continue;
+ }
+ // check its state
+ if (replica.isActive(liveNodes)) { // recovered - stop waiting
+ countDownLatch.countDown();
+ return true;
+ }
+ }
+ }
+ // set the watch again to wait for the new replica to recover
+ return false;
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/232eff08/solr/core/src/test/org/apache/solr/cloud/ReplaceNodeTest.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/ReplaceNodeTest.java b/solr/core/src/test/org/apache/solr/cloud/ReplaceNodeTest.java
index ecfd3ee..d7fae92 100644
--- a/solr/core/src/test/org/apache/solr/cloud/ReplaceNodeTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/ReplaceNodeTest.java
@@ -67,11 +67,16 @@ public class ReplaceNodeTest extends SolrCloudTestCase {
CollectionAdminRequest.Create create;
// NOTE: always using the createCollection that takes in 'int' for all types of replicas, so we never
// have to worry about null checking when comparing the Create command with the final Slices
- create = pickRandom(CollectionAdminRequest.createCollection(coll, "conf1", 5, 2,0,0),
+ create = pickRandom(
+ CollectionAdminRequest.createCollection(coll, "conf1", 5, 2,0,0),
CollectionAdminRequest.createCollection(coll, "conf1", 5, 1,1,0),
CollectionAdminRequest.createCollection(coll, "conf1", 5, 0,1,1),
CollectionAdminRequest.createCollection(coll, "conf1", 5, 1,0,1),
- CollectionAdminRequest.createCollection(coll, "conf1", 5, 0,2,0));
+ CollectionAdminRequest.createCollection(coll, "conf1", 5, 0,2,0),
+ // check also replicationFactor 1
+ CollectionAdminRequest.createCollection(coll, "conf1", 5, 1,0,0),
+ CollectionAdminRequest.createCollection(coll, "conf1", 5, 0,1,0)
+ );
create.setCreateNodeSet(StrUtils.join(l, ',')).setMaxShardsPerNode(3);
cloudClient.request(create);
log.info("excluded_node : {} ", emptyNode);