You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ho...@apache.org on 2017/06/14 02:02:57 UTC

[21/35] lucene-solr:jira/SOLR-10834: SOLR-10704 Wait until all leader replicas are recovered before deleting the originals.

SOLR-10704 Wait until all leader replicas are recovered before deleting
the originals.


Project: http://git-wip-us.apache.org/repos/asf/lucene-solr/repo
Commit: http://git-wip-us.apache.org/repos/asf/lucene-solr/commit/232eff08
Tree: http://git-wip-us.apache.org/repos/asf/lucene-solr/tree/232eff08
Diff: http://git-wip-us.apache.org/repos/asf/lucene-solr/diff/232eff08

Branch: refs/heads/jira/SOLR-10834
Commit: 232eff0893bccb93d01042f26a00e50870be2f29
Parents: f29e2d1
Author: Andrzej Bialecki <ab...@apache.org>
Authored: Tue Jun 13 17:36:51 2017 +0200
Committer: Andrzej Bialecki <ab...@apache.org>
Committed: Tue Jun 13 17:39:26 2017 +0200

----------------------------------------------------------------------
 solr/CHANGES.txt                                |  3 +
 .../org/apache/solr/cloud/ReplaceNodeCmd.java   | 96 +++++++++++++++++++-
 .../org/apache/solr/cloud/ReplaceNodeTest.java  |  9 +-
 3 files changed, 102 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/232eff08/solr/CHANGES.txt
----------------------------------------------------------------------
diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt
index 153ed82..d2a26c0 100644
--- a/solr/CHANGES.txt
+++ b/solr/CHANGES.txt
@@ -364,6 +364,9 @@ Bug Fixes
 
 * SOLR-10835: Add support for point fields in Export Handler (Tomás Fernández Löbbe)
 
+* SOLR-10704: REPLACENODE may cause data loss when replicationFactor is 1. (ab, shalin)
+
+
 Optimizations
 ----------------------
 * SOLR-10634: JSON Facet API: When a field/terms facet will retrieve all buckets (i.e. limit:-1)

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/232eff08/solr/core/src/java/org/apache/solr/cloud/ReplaceNodeCmd.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/ReplaceNodeCmd.java b/solr/core/src/java/org/apache/solr/cloud/ReplaceNodeCmd.java
index e4240be..5adbe8c 100644
--- a/solr/core/src/java/org/apache/solr/cloud/ReplaceNodeCmd.java
+++ b/solr/core/src/java/org/apache/solr/cloud/ReplaceNodeCmd.java
@@ -20,15 +20,18 @@ package org.apache.solr.cloud;
 
 import java.lang.invoke.MethodHandles;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Locale;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.solr.common.SolrException;
 import org.apache.solr.common.cloud.ClusterState;
+import org.apache.solr.common.cloud.CollectionStateWatcher;
 import org.apache.solr.common.cloud.DocCollection;
 import org.apache.solr.common.cloud.Replica;
 import org.apache.solr.common.cloud.Slice;
@@ -60,6 +63,7 @@ public class ReplaceNodeCmd implements OverseerCollectionMessageHandler.Cmd {
     String source = message.getStr("source");
     String target = message.getStr("target");
     String async = message.getStr("async");
+    int timeout = message.getInt("timeout", 10 * 60); // 10 minutes
     boolean parallel = message.getBool("parallel", false);
     ClusterState clusterState = zkStateReader.getClusterState();
 
@@ -70,13 +74,34 @@ public class ReplaceNodeCmd implements OverseerCollectionMessageHandler.Cmd {
       throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Target Node: " + target + " is not live");
     }
     List<ZkNodeProps> sourceReplicas = getReplicasOfNode(source, clusterState);
-
+    // how many leaders are we moving? for these replicas we have to make sure that either:
+    // * another existing replica can become a leader, or
+    // * we wait until the newly created replica completes recovery (and can become the new leader)
+    int numLeaders = 0;
+    for (ZkNodeProps props : sourceReplicas) {
+      if (props.getBool(ZkStateReader.LEADER_PROP, false)) {
+        numLeaders++;
+      }
+    }
+    // map of collectionName_coreNodeName to watchers
+    Map<String, RecoveryWatcher> watchers = new HashMap<>();
     List<ZkNodeProps> createdReplicas = new ArrayList<>();
 
     AtomicBoolean anyOneFailed = new AtomicBoolean(false);
     CountDownLatch countDownLatch = new CountDownLatch(sourceReplicas.size());
 
+    CountDownLatch replicasToRecover = new CountDownLatch(numLeaders);
+
     for (ZkNodeProps sourceReplica : sourceReplicas) {
+      if (sourceReplica.getBool(ZkStateReader.LEADER_PROP, false)) {
+        String shardName = sourceReplica.getStr(SHARD_ID_PROP);
+        String replicaName = sourceReplica.getStr(ZkStateReader.REPLICA_PROP);
+        String collectionName = sourceReplica.getStr(COLLECTION_PROP);
+        String key = collectionName + "_" + replicaName;
+        RecoveryWatcher watcher = new RecoveryWatcher(collectionName, shardName, replicaName, replicasToRecover);
+        watchers.put(key, watcher);
+        zkStateReader.registerCollectionStateWatcher(collectionName, watcher);
+      }
       NamedList nl = new NamedList();
       log.info("Going to create replica for collection={} shard={} on node={}", sourceReplica.getStr(COLLECTION_PROP), sourceReplica.getStr(SHARD_ID_PROP), target);
       ZkNodeProps msg = sourceReplica.plus("parallel", String.valueOf(parallel)).plus(CoreAdminParams.NODE, target);
@@ -106,10 +131,26 @@ public class ReplaceNodeCmd implements OverseerCollectionMessageHandler.Cmd {
       }
     }
 
-    log.debug("Waiting for replace node action to complete");
-    countDownLatch.await(5, TimeUnit.MINUTES);
-    log.debug("Finished waiting for replace node action to complete");
+    log.debug("Waiting for replicas to be added");
+    if (!countDownLatch.await(timeout, TimeUnit.SECONDS)) {
+      log.info("Timed out waiting for replicas to be added");
+      anyOneFailed.set(true);
+    } else {
+      log.debug("Finished waiting for replicas to be added");
+    }
 
+    // now wait for leader replicas to recover
+    log.debug("Waiting for " + numLeaders + " leader replicas to recover");
+    if (!replicasToRecover.await(timeout, TimeUnit.SECONDS)) {
+      log.info("Timed out waiting for " + replicasToRecover.getCount() + " leader replicas to recover");
+      anyOneFailed.set(true);
+    } else {
+      log.debug("Finished waiting for leader replicas to recover");
+    }
+    // remove the watchers, we're done either way
+    for (RecoveryWatcher watcher : watchers.values()) {
+      zkStateReader.removeCollectionStateWatcher(watcher.collectionId, watcher);
+    }
     if (anyOneFailed.get()) {
       log.info("Failed to create some replicas. Cleaning up all replicas on target node");
       CountDownLatch cleanupLatch = new CountDownLatch(createdReplicas.size());
@@ -134,6 +175,7 @@ public class ReplaceNodeCmd implements OverseerCollectionMessageHandler.Cmd {
         }
       }
       cleanupLatch.await(5, TimeUnit.MINUTES);
+      return;
     }
 
 
@@ -155,6 +197,7 @@ public class ReplaceNodeCmd implements OverseerCollectionMessageHandler.Cmd {
                 ZkStateReader.CORE_NAME_PROP, replica.getCoreName(),
                 ZkStateReader.REPLICA_PROP, replica.getName(),
                 ZkStateReader.REPLICA_TYPE, replica.getType().name(),
+                ZkStateReader.LEADER_PROP, String.valueOf(replica.equals(slice.getLeader())),
                 CoreAdminParams.NODE, source);
             sourceReplicas.add(props);
           }
@@ -163,4 +206,49 @@ public class ReplaceNodeCmd implements OverseerCollectionMessageHandler.Cmd {
     }
     return sourceReplicas;
   }
+
+  // we use this watcher to wait for replicas to recover
+  private static class RecoveryWatcher implements CollectionStateWatcher {
+    String collectionId;
+    String shardId;
+    String replicaId;
+    CountDownLatch countDownLatch;
+
+    RecoveryWatcher(String collectionId, String shardId, String replicaId, CountDownLatch countDownLatch) {
+      this.collectionId = collectionId;
+      this.shardId = shardId;
+      this.replicaId = replicaId;
+      this.countDownLatch = countDownLatch;
+    }
+
+    @Override
+    public boolean onStateChanged(Set<String> liveNodes, DocCollection collectionState) {
+      if (collectionState == null) { // collection has been deleted - don't wait
+        countDownLatch.countDown();
+        return true;
+      }
+      Slice slice = collectionState.getSlice(shardId);
+      if (slice == null) { // shard has been removed - don't wait
+        countDownLatch.countDown();
+        return true;
+      }
+      for (Replica replica : slice.getReplicas()) {
+        // check if another replica exists - doesn't have to be the one we're moving
+        // as long as it's active and can become a leader, in which case we don't have to wait
+        // for recovery of specifically the one that we've just added
+        if (!replica.getName().equals(replicaId)) {
+          if (replica.getType().equals(Replica.Type.PULL)) { // not eligible for leader election
+            continue;
+          }
+          // check its state
+          if (replica.isActive(liveNodes)) { // recovered - stop waiting
+            countDownLatch.countDown();
+            return true;
+          }
+        }
+      }
+      // set the watch again to wait for the new replica to recover
+      return false;
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/232eff08/solr/core/src/test/org/apache/solr/cloud/ReplaceNodeTest.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/ReplaceNodeTest.java b/solr/core/src/test/org/apache/solr/cloud/ReplaceNodeTest.java
index ecfd3ee..d7fae92 100644
--- a/solr/core/src/test/org/apache/solr/cloud/ReplaceNodeTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/ReplaceNodeTest.java
@@ -67,11 +67,16 @@ public class ReplaceNodeTest extends SolrCloudTestCase {
     CollectionAdminRequest.Create create;
     // NOTE: always using the createCollection that takes in 'int' for all types of replicas, so we never
     // have to worry about null checking when comparing the Create command with the final Slices
-    create = pickRandom(CollectionAdminRequest.createCollection(coll, "conf1", 5, 2,0,0),
+    create = pickRandom(
+                        CollectionAdminRequest.createCollection(coll, "conf1", 5, 2,0,0),
                         CollectionAdminRequest.createCollection(coll, "conf1", 5, 1,1,0),
                         CollectionAdminRequest.createCollection(coll, "conf1", 5, 0,1,1),
                         CollectionAdminRequest.createCollection(coll, "conf1", 5, 1,0,1),
-                        CollectionAdminRequest.createCollection(coll, "conf1", 5, 0,2,0));
+                        CollectionAdminRequest.createCollection(coll, "conf1", 5, 0,2,0),
+                        // check also replicationFactor 1
+                        CollectionAdminRequest.createCollection(coll, "conf1", 5, 1,0,0),
+                        CollectionAdminRequest.createCollection(coll, "conf1", 5, 0,1,0)
+    );
     create.setCreateNodeSet(StrUtils.join(l, ',')).setMaxShardsPerNode(3);
     cloudClient.request(create);
     log.info("excluded_node : {}  ", emptyNode);