You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ab...@apache.org on 2017/06/13 15:56:32 UTC

lucene-solr:branch_6x: SOLR-10704 Wait until all leader replicas are recovered before deleting the originals.

Repository: lucene-solr
Updated Branches:
  refs/heads/branch_6x 519146204 -> ccd1f45b3


SOLR-10704 Wait until all leader replicas are recovered before deleting
the originals.


Project: http://git-wip-us.apache.org/repos/asf/lucene-solr/repo
Commit: http://git-wip-us.apache.org/repos/asf/lucene-solr/commit/ccd1f45b
Tree: http://git-wip-us.apache.org/repos/asf/lucene-solr/tree/ccd1f45b
Diff: http://git-wip-us.apache.org/repos/asf/lucene-solr/diff/ccd1f45b

Branch: refs/heads/branch_6x
Commit: ccd1f45b3ba3fbb862cae5d0ab0ce821b966026a
Parents: 5191462
Author: Andrzej Bialecki <ab...@apache.org>
Authored: Tue Jun 13 17:36:51 2017 +0200
Committer: Andrzej Bialecki <ab...@apache.org>
Committed: Tue Jun 13 17:56:17 2017 +0200

----------------------------------------------------------------------
 solr/CHANGES.txt                                |  3 +
 .../org/apache/solr/cloud/ReplaceNodeCmd.java   | 93 +++++++++++++++++++-
 .../org/apache/solr/cloud/ReplaceNodeTest.java  |  9 +-
 3 files changed, 100 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/ccd1f45b/solr/CHANGES.txt
----------------------------------------------------------------------
diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt
index 568353e..0ba7795 100644
--- a/solr/CHANGES.txt
+++ b/solr/CHANGES.txt
@@ -122,6 +122,9 @@ Bug Fixes
 
 * SOLR-10835: Add support for point fields in Export Handler (Tomás Fernández Löbbe)
 
+* SOLR-10704: REPLACENODE may cause data loss when replicationFactor is 1. (ab, shalin)
+
+
 Optimizations
 ----------------------
 * SOLR-10634: JSON Facet API: When a field/terms facet will retrieve all buckets (i.e. limit:-1)

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/ccd1f45b/solr/core/src/java/org/apache/solr/cloud/ReplaceNodeCmd.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/ReplaceNodeCmd.java b/solr/core/src/java/org/apache/solr/cloud/ReplaceNodeCmd.java
index 92c9afe..797d3fd 100644
--- a/solr/core/src/java/org/apache/solr/cloud/ReplaceNodeCmd.java
+++ b/solr/core/src/java/org/apache/solr/cloud/ReplaceNodeCmd.java
@@ -20,15 +20,18 @@ package org.apache.solr.cloud;
 
 import java.lang.invoke.MethodHandles;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Locale;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.solr.common.SolrException;
 import org.apache.solr.common.cloud.ClusterState;
+import org.apache.solr.common.cloud.CollectionStateWatcher;
 import org.apache.solr.common.cloud.DocCollection;
 import org.apache.solr.common.cloud.Replica;
 import org.apache.solr.common.cloud.Slice;
@@ -60,6 +63,7 @@ public class ReplaceNodeCmd implements OverseerCollectionMessageHandler.Cmd {
     String source = message.getStr("source");
     String target = message.getStr("target");
     String async = message.getStr("async");
+    int timeout = message.getInt("timeout", 10 * 60); // 10 minutes
     boolean parallel = message.getBool("parallel", false);
     ClusterState clusterState = zkStateReader.getClusterState();
 
@@ -70,13 +74,34 @@ public class ReplaceNodeCmd implements OverseerCollectionMessageHandler.Cmd {
       throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Target Node: " + target + " is not live");
     }
     List<ZkNodeProps> sourceReplicas = getReplicasOfNode(source, clusterState);
-
+    // how many leaders are we moving? for these replicas we have to make sure that either:
+    // * another existing replica can become a leader, or
+    // * we wait until the newly created replica completes recovery (and can become the new leader)
+    int numLeaders = 0;
+    for (ZkNodeProps props : sourceReplicas) {
+      if (props.getBool(ZkStateReader.LEADER_PROP, false)) {
+        numLeaders++;
+      }
+    }
+    // map of collectionName_coreNodeName to watchers
+    Map<String, RecoveryWatcher> watchers = new HashMap<>();
     List<ZkNodeProps> createdReplicas = new ArrayList<>();
 
     AtomicBoolean anyOneFailed = new AtomicBoolean(false);
     CountDownLatch countDownLatch = new CountDownLatch(sourceReplicas.size());
 
+    CountDownLatch replicasToRecover = new CountDownLatch(numLeaders);
+
     for (ZkNodeProps sourceReplica : sourceReplicas) {
+      if (sourceReplica.getBool(ZkStateReader.LEADER_PROP, false)) {
+        String shardName = sourceReplica.getStr(SHARD_ID_PROP);
+        String replicaName = sourceReplica.getStr(ZkStateReader.REPLICA_PROP);
+        String collectionName = sourceReplica.getStr(COLLECTION_PROP);
+        String key = collectionName + "_" + replicaName;
+        RecoveryWatcher watcher = new RecoveryWatcher(collectionName, shardName, replicaName, replicasToRecover);
+        watchers.put(key, watcher);
+        zkStateReader.registerCollectionStateWatcher(collectionName, watcher);
+      }
       NamedList nl = new NamedList();
       log.info("Going to create replica for collection={} shard={} on node={}", sourceReplica.getStr(COLLECTION_PROP), sourceReplica.getStr(SHARD_ID_PROP), target);
       ZkNodeProps msg = sourceReplica.plus("parallel", String.valueOf(parallel)).plus(CoreAdminParams.NODE, target);
@@ -106,10 +131,26 @@ public class ReplaceNodeCmd implements OverseerCollectionMessageHandler.Cmd {
       }
     }
 
-    log.debug("Waiting for replace node action to complete");
-    countDownLatch.await(5, TimeUnit.MINUTES);
-    log.debug("Finished waiting for replace node action to complete");
+    log.debug("Waiting for replicas to be added");
+    if (!countDownLatch.await(timeout, TimeUnit.SECONDS)) {
+      log.info("Timed out waiting for replicas to be added");
+      anyOneFailed.set(true);
+    } else {
+      log.debug("Finished waiting for replicas to be added");
+    }
 
+    // now wait for leader replicas to recover
+    log.debug("Waiting for " + numLeaders + " leader replicas to recover");
+    if (!replicasToRecover.await(timeout, TimeUnit.SECONDS)) {
+      log.info("Timed out waiting for " + replicasToRecover.getCount() + " leader replicas to recover");
+      anyOneFailed.set(true);
+    } else {
+      log.debug("Finished waiting for leader replicas to recover");
+    }
+    // remove the watchers, we're done either way
+    for (RecoveryWatcher watcher : watchers.values()) {
+      zkStateReader.removeCollectionStateWatcher(watcher.collectionId, watcher);
+    }
     if (anyOneFailed.get()) {
       log.info("Failed to create some replicas. Cleaning up all replicas on target node");
       CountDownLatch cleanupLatch = new CountDownLatch(createdReplicas.size());
@@ -134,6 +175,7 @@ public class ReplaceNodeCmd implements OverseerCollectionMessageHandler.Cmd {
         }
       }
       cleanupLatch.await(5, TimeUnit.MINUTES);
+      return;
     }
 
 
@@ -154,6 +196,7 @@ public class ReplaceNodeCmd implements OverseerCollectionMessageHandler.Cmd {
                 SHARD_ID_PROP, slice.getName(),
                 ZkStateReader.CORE_NAME_PROP, replica.getCoreName(),
                 ZkStateReader.REPLICA_PROP, replica.getName(),
+                ZkStateReader.LEADER_PROP, String.valueOf(replica.equals(slice.getLeader())),
                 CoreAdminParams.NODE, source);
             sourceReplicas.add(props
             );
@@ -163,4 +206,46 @@ public class ReplaceNodeCmd implements OverseerCollectionMessageHandler.Cmd {
     }
     return sourceReplicas;
   }
+
+  // we use this watcher to wait for replicas to recover
+  private static class RecoveryWatcher implements CollectionStateWatcher {
+    String collectionId;
+    String shardId;
+    String replicaId;
+    CountDownLatch countDownLatch;
+
+    RecoveryWatcher(String collectionId, String shardId, String replicaId, CountDownLatch countDownLatch) {
+      this.collectionId = collectionId;
+      this.shardId = shardId;
+      this.replicaId = replicaId;
+      this.countDownLatch = countDownLatch;
+    }
+
+    @Override
+    public boolean onStateChanged(Set<String> liveNodes, DocCollection collectionState) {
+      if (collectionState == null) { // collection has been deleted - don't wait
+        countDownLatch.countDown();
+        return true;
+      }
+      Slice slice = collectionState.getSlice(shardId);
+      if (slice == null) { // shard has been removed - don't wait
+        countDownLatch.countDown();
+        return true;
+      }
+      for (Replica replica : slice.getReplicas()) {
+        // check if another replica exists - doesn't have to be the one we're moving
+        // as long as it's active and can become a leader, in which case we don't have to wait
+        // for recovery of specifically the one that we've just added
+        if (!replica.getName().equals(replicaId)) {
+          // check its state
+          if (replica.isActive(liveNodes)) { // recovered - stop waiting
+            countDownLatch.countDown();
+            return true;
+          }
+        }
+      }
+      // set the watch again to wait for the new replica to recover
+      return false;
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/ccd1f45b/solr/core/src/test/org/apache/solr/cloud/ReplaceNodeTest.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/ReplaceNodeTest.java b/solr/core/src/test/org/apache/solr/cloud/ReplaceNodeTest.java
index 1c7575d..fa9ae64 100644
--- a/solr/core/src/test/org/apache/solr/cloud/ReplaceNodeTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/ReplaceNodeTest.java
@@ -60,7 +60,14 @@ public class ReplaceNodeTest extends SolrCloudTestCase {
     Collections.shuffle(l, random());
     String emptyNode = l.remove(0);
     String node2bdecommissioned = l.get(0);
-    CollectionAdminRequest.Create create = CollectionAdminRequest.createCollection(coll, "conf1", 5, 2);
+    CollectionAdminRequest.Create create;
+    // NOTE: always using the createCollection that takes in 'int' for all types of replicas, so we never
+    // have to worry about null checking when comparing the Create command with the final Slices
+    create = pickRandom(
+                        CollectionAdminRequest.createCollection(coll, "conf1", 5, 2),
+                        // check also replicationFactor 1
+                        CollectionAdminRequest.createCollection(coll, "conf1", 5, 1)
+    );
     create.setCreateNodeSet(StrUtils.join(l, ',')).setMaxShardsPerNode(3);
     cloudClient.request(create);
     log.info("excluded_node : {}  ", emptyNode);