You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ma...@apache.org on 2020/10/30 03:10:06 UTC

[lucene-solr] branch reference_impl_dev updated: @1079 Harden delete replica.

This is an automated email from the ASF dual-hosted git repository.

markrmiller pushed a commit to branch reference_impl_dev
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git


The following commit(s) were added to refs/heads/reference_impl_dev by this push:
     new dde24df  @1079 Harden delete replica.
dde24df is described below

commit dde24dfa70242cb68114e1daf4d654d90add84ce
Author: markrmiller@gmail.com <ma...@gmail.com>
AuthorDate: Thu Oct 29 21:57:53 2020 -0500

    @1079 Harden delete replica.
---
 .../cloud/api/collections/DeleteReplicaCmd.java    | 95 ++++++++++++++++++----
 1 file changed, 77 insertions(+), 18 deletions(-)

diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteReplicaCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteReplicaCmd.java
index bb4d118..4cb628a 100644
--- a/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteReplicaCmd.java
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteReplicaCmd.java
@@ -23,6 +23,8 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 
 import org.apache.solr.cloud.api.collections.OverseerCollectionMessageHandler.Cmd;
 import org.apache.solr.cloud.api.collections.OverseerCollectionMessageHandler.ShardRequestTracker;
@@ -64,23 +66,45 @@ public class DeleteReplicaCmd implements Cmd {
   @SuppressWarnings("unchecked")
 
   public Runnable call(ClusterState clusterState, ZkNodeProps message, @SuppressWarnings({"rawtypes"})NamedList results) throws Exception {
-    deleteReplica(clusterState, message, results,null);
-    return null;
+    AddReplicaCmd.Response response = deleteReplica(clusterState, message, results, null);
+    if (response == null) return null;
+    return response.asyncFinalRunner;
   }
 
 
   @SuppressWarnings("unchecked")
-  void deleteReplica(ClusterState clusterState, ZkNodeProps message, @SuppressWarnings({"rawtypes"})NamedList results, Runnable onComplete)
+  AddReplicaCmd.Response deleteReplica(ClusterState clusterState, ZkNodeProps message, @SuppressWarnings({"rawtypes"})NamedList results, Runnable onComplete)
           throws KeeperException, InterruptedException {
 
     log.info("deleteReplica() : {}", Utils.toJSONString(message));
 
-    boolean parallel = message.getBool("parallel", false);
-
     //If a count is specified the strategy needs be different
     if (message.getStr(COUNT_PROP) != null) {
-      deleteReplicaBasedOnCount(clusterState, message, results, onComplete, parallel);
-      return;
+      Set<String> pickReplicasTobeDeleted = deleteReplicaBasedOnCount(clusterState, message, results, onComplete);
+      if (pickReplicasTobeDeleted != null && pickReplicasTobeDeleted.size() > 0) {
+        if (results.get("failure") == null && results.get("exception") == null) {
+          String collectionName = message.getStr(COLLECTION_PROP);
+          String shard = message.getStr(SHARD_ID_PROP);
+          AddReplicaCmd.Response response = new AddReplicaCmd.Response();
+          pickReplicasTobeDeleted.forEach(replicaName -> {
+
+            response.asyncFinalRunner = new Runnable() {
+              @Override
+              public void run() {
+                try {
+                  waitForCoreNodeGone(collectionName, shard, replicaName, 30000);
+                } catch (Exception e) {
+                  log.error("", e);
+                }
+              }
+            };
+          });
+          return response;
+        }
+        return null;
+      } else {
+        return null;
+      }
     }
 
     ocmh.checkRequired(message, COLLECTION_PROP, SHARD_ID_PROP, REPLICA_PROP);
@@ -103,20 +127,37 @@ public class DeleteReplicaCmd implements Cmd {
               "Invalid shard name : " +  shard + " in collection : " +  collectionName);
     }
 
-    deleteCore(slice, collectionName, replicaName, message, shard, results, onComplete,  parallel);
+    deleteCore(slice, collectionName, replicaName, message, shard, results, onComplete);
+
+    AddReplicaCmd.Response response = new AddReplicaCmd.Response();
+
+    if (results.get("failure") == null && results.get("exception") == null) {
+      response.asyncFinalRunner = new Runnable() {
+        @Override
+        public void run() {
+          try {
+            waitForCoreNodeGone(collectionName, shard, replicaName, 30000);
+          } catch (Exception e) {
+            log.error("", e);
+          }
+        }
+      };
+    }
+
+    return response;
   }
 
 
   /**
    * Delete replicas based on count for a given collection. If a shard is passed, uses that
    * else deletes given num replicas across all shards for the given collection.
+   * @return
    */
   @SuppressWarnings({"unchecked"})
-  void deleteReplicaBasedOnCount(ClusterState clusterState,
+  Set<String> deleteReplicaBasedOnCount(ClusterState clusterState,
                                  ZkNodeProps message,
                                  @SuppressWarnings({"rawtypes"})NamedList results,
-                                 Runnable onComplete,
-                                 boolean parallel)
+                                 Runnable onComplete)
           throws KeeperException, InterruptedException {
     ocmh.checkRequired(message, COLLECTION_PROP, COUNT_PROP);
     int count = Integer.parseInt(message.getStr(COUNT_PROP));
@@ -132,22 +173,22 @@ public class DeleteReplicaCmd implements Cmd {
                 "Invalid shard name : " +  shard +  " in collection : " + collectionName);
       }
     }
-
+    Set<String> replicasToBeDeleted = null;
     Map<Slice, Set<String>> shardToReplicasMapping = new HashMap<Slice, Set<String>>();
     if (slice != null) {
-      Set<String> replicasToBeDeleted = pickReplicasTobeDeleted(slice, shard, collectionName, count);
+      replicasToBeDeleted = pickReplicasTobeDeleted(slice, shard, collectionName, count);
       shardToReplicasMapping.put(slice,replicasToBeDeleted);
     } else {
 
       //If there are many replicas left, remove the rest based on count.
       Collection<Slice> allSlices = coll.getSlices();
       for (Slice individualSlice : allSlices) {
-        Set<String> replicasToBeDeleted = pickReplicasTobeDeleted(individualSlice, individualSlice.getName(), collectionName, count);
+        replicasToBeDeleted = pickReplicasTobeDeleted(individualSlice, individualSlice.getName(), collectionName, count);
         shardToReplicasMapping.put(individualSlice, replicasToBeDeleted);
       }
     }
 
-    try (ParWork worker = new ParWork(this)) {
+    try (ParWork worker = new ParWork(this, false, true)) {
 
       for (Map.Entry<Slice,Set<String>> entry : shardToReplicasMapping.entrySet()) {
         Slice shardSlice = entry.getKey();
@@ -156,13 +197,13 @@ public class DeleteReplicaCmd implements Cmd {
         // callDeleteReplica on all replicas
         for (String replica : replicas) {
           if (log.isDebugEnabled()) log.debug("Deleting replica {}  for shard {} based on count {}", replica, shardId, count);
-          worker.collect("deleteCore", () -> { deleteCore(shardSlice, collectionName, replica, message, shard, results, onComplete, parallel); return replica; });
+          worker.collect("deleteCore", () -> { deleteCore(shardSlice, collectionName, replica, message, shard, results, onComplete); return replica; });
         }
         results.add("shard_id", shardId);
         results.add("replicas_deleted", replicas);
       }
     }
-
+    return replicasToBeDeleted;
   }
 
 
@@ -215,7 +256,7 @@ public class DeleteReplicaCmd implements Cmd {
   }
 
   @SuppressWarnings({"unchecked"})
-  void deleteCore(Slice slice, String collectionName, String replicaName,ZkNodeProps message, String shard, @SuppressWarnings({"rawtypes"})NamedList results, Runnable onComplete, boolean parallel) throws KeeperException, InterruptedException {
+  void deleteCore(Slice slice, String collectionName, String replicaName,ZkNodeProps message, String shard, @SuppressWarnings({"rawtypes"})NamedList results, Runnable onComplete) throws KeeperException, InterruptedException {
     log.info("delete core {}", replicaName);
     Replica replica = slice.getReplica(replicaName);
     if (replica == null) {
@@ -270,4 +311,22 @@ public class DeleteReplicaCmd implements Cmd {
       throw new SolrException(SolrException.ErrorCode.UNKNOWN, "Error waiting for corenode gone", ex);
     }
   }
+
+  boolean waitForCoreNodeGone(String collectionName, String shard, String replicaName, int timeoutms) throws InterruptedException {
+    try {
+      ocmh.zkStateReader.waitForState(collectionName, timeoutms, TimeUnit.MILLISECONDS, (c) -> {
+        if (c == null)
+          return true;
+        Slice slice = c.getSlice(shard);
+        if(slice == null || slice.getReplica(replicaName) == null) {
+          return true;
+        }
+        return false;
+      });
+    } catch (TimeoutException e) {
+      return false;
+    }
+
+    return true;
+  }
 }