You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ab...@apache.org on 2017/06/13 15:56:32 UTC
lucene-solr:branch_6x: SOLR-10704 Wait until all leader replicas are
recovered before deleting the originals.
Repository: lucene-solr
Updated Branches:
refs/heads/branch_6x 519146204 -> ccd1f45b3
SOLR-10704 Wait until all leader replicas are recovered before deleting
the originals.
Project: http://git-wip-us.apache.org/repos/asf/lucene-solr/repo
Commit: http://git-wip-us.apache.org/repos/asf/lucene-solr/commit/ccd1f45b
Tree: http://git-wip-us.apache.org/repos/asf/lucene-solr/tree/ccd1f45b
Diff: http://git-wip-us.apache.org/repos/asf/lucene-solr/diff/ccd1f45b
Branch: refs/heads/branch_6x
Commit: ccd1f45b3ba3fbb862cae5d0ab0ce821b966026a
Parents: 5191462
Author: Andrzej Bialecki <ab...@apache.org>
Authored: Tue Jun 13 17:36:51 2017 +0200
Committer: Andrzej Bialecki <ab...@apache.org>
Committed: Tue Jun 13 17:56:17 2017 +0200
----------------------------------------------------------------------
solr/CHANGES.txt | 3 +
.../org/apache/solr/cloud/ReplaceNodeCmd.java | 93 +++++++++++++++++++-
.../org/apache/solr/cloud/ReplaceNodeTest.java | 9 +-
3 files changed, 100 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/ccd1f45b/solr/CHANGES.txt
----------------------------------------------------------------------
diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt
index 568353e..0ba7795 100644
--- a/solr/CHANGES.txt
+++ b/solr/CHANGES.txt
@@ -122,6 +122,9 @@ Bug Fixes
* SOLR-10835: Add support for point fields in Export Handler (Tomás Fernández Löbbe)
+* SOLR-10704: REPLACENODE may cause data loss when replicationFactor is 1. (ab, shalin)
+
+
Optimizations
----------------------
* SOLR-10634: JSON Facet API: When a field/terms facet will retrieve all buckets (i.e. limit:-1)
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/ccd1f45b/solr/core/src/java/org/apache/solr/cloud/ReplaceNodeCmd.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/ReplaceNodeCmd.java b/solr/core/src/java/org/apache/solr/cloud/ReplaceNodeCmd.java
index 92c9afe..797d3fd 100644
--- a/solr/core/src/java/org/apache/solr/cloud/ReplaceNodeCmd.java
+++ b/solr/core/src/java/org/apache/solr/cloud/ReplaceNodeCmd.java
@@ -20,15 +20,18 @@ package org.apache.solr.cloud;
import java.lang.invoke.MethodHandles;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
+import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.cloud.ClusterState;
+import org.apache.solr.common.cloud.CollectionStateWatcher;
import org.apache.solr.common.cloud.DocCollection;
import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.cloud.Slice;
@@ -60,6 +63,7 @@ public class ReplaceNodeCmd implements OverseerCollectionMessageHandler.Cmd {
String source = message.getStr("source");
String target = message.getStr("target");
String async = message.getStr("async");
+ int timeout = message.getInt("timeout", 10 * 60); // 10 minutes
boolean parallel = message.getBool("parallel", false);
ClusterState clusterState = zkStateReader.getClusterState();
@@ -70,13 +74,34 @@ public class ReplaceNodeCmd implements OverseerCollectionMessageHandler.Cmd {
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Target Node: " + target + " is not live");
}
List<ZkNodeProps> sourceReplicas = getReplicasOfNode(source, clusterState);
-
+ // how many leaders are we moving? for these replicas we have to make sure that either:
+ // * another existing replica can become a leader, or
+ // * we wait until the newly created replica completes recovery (and can become the new leader)
+ int numLeaders = 0;
+ for (ZkNodeProps props : sourceReplicas) {
+ if (props.getBool(ZkStateReader.LEADER_PROP, false)) {
+ numLeaders++;
+ }
+ }
+ // map of collectionName_coreNodeName to watchers
+ Map<String, RecoveryWatcher> watchers = new HashMap<>();
List<ZkNodeProps> createdReplicas = new ArrayList<>();
AtomicBoolean anyOneFailed = new AtomicBoolean(false);
CountDownLatch countDownLatch = new CountDownLatch(sourceReplicas.size());
+ CountDownLatch replicasToRecover = new CountDownLatch(numLeaders);
+
for (ZkNodeProps sourceReplica : sourceReplicas) {
+ if (sourceReplica.getBool(ZkStateReader.LEADER_PROP, false)) {
+ String shardName = sourceReplica.getStr(SHARD_ID_PROP);
+ String replicaName = sourceReplica.getStr(ZkStateReader.REPLICA_PROP);
+ String collectionName = sourceReplica.getStr(COLLECTION_PROP);
+ String key = collectionName + "_" + replicaName;
+ RecoveryWatcher watcher = new RecoveryWatcher(collectionName, shardName, replicaName, replicasToRecover);
+ watchers.put(key, watcher);
+ zkStateReader.registerCollectionStateWatcher(collectionName, watcher);
+ }
NamedList nl = new NamedList();
log.info("Going to create replica for collection={} shard={} on node={}", sourceReplica.getStr(COLLECTION_PROP), sourceReplica.getStr(SHARD_ID_PROP), target);
ZkNodeProps msg = sourceReplica.plus("parallel", String.valueOf(parallel)).plus(CoreAdminParams.NODE, target);
@@ -106,10 +131,26 @@ public class ReplaceNodeCmd implements OverseerCollectionMessageHandler.Cmd {
}
}
- log.debug("Waiting for replace node action to complete");
- countDownLatch.await(5, TimeUnit.MINUTES);
- log.debug("Finished waiting for replace node action to complete");
+ log.debug("Waiting for replicas to be added");
+ if (!countDownLatch.await(timeout, TimeUnit.SECONDS)) {
+ log.info("Timed out waiting for replicas to be added");
+ anyOneFailed.set(true);
+ } else {
+ log.debug("Finished waiting for replicas to be added");
+ }
+ // now wait for leader replicas to recover
+ log.debug("Waiting for " + numLeaders + " leader replicas to recover");
+ if (!replicasToRecover.await(timeout, TimeUnit.SECONDS)) {
+ log.info("Timed out waiting for " + replicasToRecover.getCount() + " leader replicas to recover");
+ anyOneFailed.set(true);
+ } else {
+ log.debug("Finished waiting for leader replicas to recover");
+ }
+ // remove the watchers, we're done either way
+ for (RecoveryWatcher watcher : watchers.values()) {
+ zkStateReader.removeCollectionStateWatcher(watcher.collectionId, watcher);
+ }
if (anyOneFailed.get()) {
log.info("Failed to create some replicas. Cleaning up all replicas on target node");
CountDownLatch cleanupLatch = new CountDownLatch(createdReplicas.size());
@@ -134,6 +175,7 @@ public class ReplaceNodeCmd implements OverseerCollectionMessageHandler.Cmd {
}
}
cleanupLatch.await(5, TimeUnit.MINUTES);
+ return;
}
@@ -154,6 +196,7 @@ public class ReplaceNodeCmd implements OverseerCollectionMessageHandler.Cmd {
SHARD_ID_PROP, slice.getName(),
ZkStateReader.CORE_NAME_PROP, replica.getCoreName(),
ZkStateReader.REPLICA_PROP, replica.getName(),
+ ZkStateReader.LEADER_PROP, String.valueOf(replica.equals(slice.getLeader())),
CoreAdminParams.NODE, source);
sourceReplicas.add(props
);
@@ -163,4 +206,46 @@ public class ReplaceNodeCmd implements OverseerCollectionMessageHandler.Cmd {
}
return sourceReplicas;
}
+
+ // we use this watcher to wait for replicas to recover
+ private static class RecoveryWatcher implements CollectionStateWatcher {
+ String collectionId;
+ String shardId;
+ String replicaId;
+ CountDownLatch countDownLatch;
+
+ RecoveryWatcher(String collectionId, String shardId, String replicaId, CountDownLatch countDownLatch) {
+ this.collectionId = collectionId;
+ this.shardId = shardId;
+ this.replicaId = replicaId;
+ this.countDownLatch = countDownLatch;
+ }
+
+ @Override
+ public boolean onStateChanged(Set<String> liveNodes, DocCollection collectionState) {
+ if (collectionState == null) { // collection has been deleted - don't wait
+ countDownLatch.countDown();
+ return true;
+ }
+ Slice slice = collectionState.getSlice(shardId);
+ if (slice == null) { // shard has been removed - don't wait
+ countDownLatch.countDown();
+ return true;
+ }
+ for (Replica replica : slice.getReplicas()) {
+ // check if another replica exists - doesn't have to be the one we're moving
+ // as long as it's active and can become a leader, in which case we don't have to wait
+ // for recovery of specifically the one that we've just added
+ if (!replica.getName().equals(replicaId)) {
+ // check its state
+ if (replica.isActive(liveNodes)) { // recovered - stop waiting
+ countDownLatch.countDown();
+ return true;
+ }
+ }
+ }
+ // set the watch again to wait for the new replica to recover
+ return false;
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/ccd1f45b/solr/core/src/test/org/apache/solr/cloud/ReplaceNodeTest.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/ReplaceNodeTest.java b/solr/core/src/test/org/apache/solr/cloud/ReplaceNodeTest.java
index 1c7575d..fa9ae64 100644
--- a/solr/core/src/test/org/apache/solr/cloud/ReplaceNodeTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/ReplaceNodeTest.java
@@ -60,7 +60,14 @@ public class ReplaceNodeTest extends SolrCloudTestCase {
Collections.shuffle(l, random());
String emptyNode = l.remove(0);
String node2bdecommissioned = l.get(0);
- CollectionAdminRequest.Create create = CollectionAdminRequest.createCollection(coll, "conf1", 5, 2);
+ CollectionAdminRequest.Create create;
+ // NOTE: always using the createCollection that takes in 'int' for all types of replicas, so we never
+ // have to worry about null checking when comparing the Create command with the final Slices
+ create = pickRandom(
+ CollectionAdminRequest.createCollection(coll, "conf1", 5, 2),
+ // check also replicationFactor 1
+ CollectionAdminRequest.createCollection(coll, "conf1", 5, 1)
+ );
create.setCreateNodeSet(StrUtils.join(l, ',')).setMaxShardsPerNode(3);
cloudClient.request(create);
log.info("excluded_node : {} ", emptyNode);