You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ma...@apache.org on 2020/11/09 16:11:53 UTC

[lucene-solr] branch reference_impl_dev updated: @1120 Finishing DeleteCollectionCmd.

This is an automated email from the ASF dual-hosted git repository.

markrmiller pushed a commit to branch reference_impl_dev
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git


The following commit(s) were added to refs/heads/reference_impl_dev by this push:
     new dc9ff2f  @1120 Finishing DeleteCollectionCmd.
dc9ff2f is described below

commit dc9ff2f38c82861a4cbd9f873d103a446710992e
Author: markrmiller@gmail.com <ma...@gmail.com>
AuthorDate: Mon Nov 9 10:11:21 2020 -0600

    @1120 Finishing DeleteCollectionCmd.
---
 .../java/org/apache/solr/cloud/ZkShardTerms.java   |  4 ++
 .../cloud/api/collections/DeleteCollectionCmd.java | 56 +++++++++++++++++-----
 .../OverseerCollectionMessageHandler.java          | 29 ++++++++---
 3 files changed, 69 insertions(+), 20 deletions(-)

diff --git a/solr/core/src/java/org/apache/solr/cloud/ZkShardTerms.java b/solr/core/src/java/org/apache/solr/cloud/ZkShardTerms.java
index a44795a..6fedd89 100644
--- a/solr/core/src/java/org/apache/solr/cloud/ZkShardTerms.java
+++ b/solr/core/src/java/org/apache/solr/cloud/ZkShardTerms.java
@@ -338,6 +338,10 @@ public class ZkShardTerms implements AutoCloseable{
       Stat stat = new Stat();
       byte[] data = zkClient.getData(znodePath, null, stat, true);
       newTerms = new ShardTerms((Map<String, Long>) Utils.fromJSON(data), stat.getVersion());
+    } catch (KeeperException.NoNodeException e) {
+      if (log.isDebugEnabled()) log.debug("No node found for refresh terms", e);
+      // we have likely been deleted
+      return;
     } catch (InterruptedException e) {
       ParWork.propagateInterrupt(e);
       throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Error updating shard term for collection: " + collection, e);
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteCollectionCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteCollectionCmd.java
index 1666912..f86dd55 100644
--- a/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteCollectionCmd.java
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteCollectionCmd.java
@@ -27,6 +27,7 @@ import org.apache.solr.common.cloud.ClusterState;
 import org.apache.solr.common.cloud.DocCollection;
 import org.apache.solr.common.cloud.Replica;
 import org.apache.solr.common.cloud.ReplicaPosition;
+import org.apache.solr.common.cloud.Slice;
 import org.apache.solr.common.cloud.SolrZkClient;
 import org.apache.solr.common.cloud.ZkNodeProps;
 import org.apache.solr.common.cloud.ZkStateReader;
@@ -38,7 +39,9 @@ import org.apache.solr.common.util.Utils;
 import org.apache.solr.core.SolrInfoBean;
 import org.apache.solr.core.snapshots.SolrSnapshotManager;
 import org.apache.solr.handler.admin.MetricsHistoryHandler;
+import org.apache.solr.handler.component.ShardHandler;
 import org.apache.solr.metrics.SolrMetricManager;
+import org.apache.zookeeper.KeeperException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -60,6 +63,12 @@ import java.util.stream.Collectors;
 
 public class DeleteCollectionCmd implements OverseerCollectionMessageHandler.Cmd {
   private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+  static Set<String> okayExceptions = new HashSet<>(1);
+  static {
+    okayExceptions.add(NonExistentCoreException.class.getName());
+  }
+
   private final OverseerCollectionMessageHandler ocmh;
   private final TimeSource timeSource;
 
@@ -77,7 +86,8 @@ public class DeleteCollectionCmd implements OverseerCollectionMessageHandler.Cmd
     }
     final String extCollection = message.getStr(NAME);
     ZkStateReader zkStateReader = ocmh.zkStateReader;
-
+    ShardHandler shardHandler = null;
+    OverseerCollectionMessageHandler.ShardRequestTracker shardRequestTracker = null;
     boolean skipFinalStateWork = false;
 
     if (zkStateReader.aliasesManager != null) { // not a mock ZkStateReader
@@ -122,8 +132,7 @@ public class DeleteCollectionCmd implements OverseerCollectionMessageHandler.Cmd
 
       String asyncId = message.getStr(ASYNC);
 
-      Set<String> okayExceptions = new HashSet<>(1);
-      okayExceptions.add(NonExistentCoreException.class.getName());
+
       ZkNodeProps internalMsg = message.plus(NAME, collection);
 
       if (!zkStateReader.getZkClient().exists(ZkStateReader.COLLECTIONS_ZKNODE + "/" + collection)) {
@@ -132,8 +141,16 @@ public class DeleteCollectionCmd implements OverseerCollectionMessageHandler.Cmd
 
 
       clusterState = new ClusterStateMutator(ocmh.cloudManager) .deleteCollection(clusterState, collection);
+
+      shardHandler = ocmh.shardHandlerFactory.getShardHandler(ocmh.overseerLbClient);
+
+      shardRequestTracker =
+          new OverseerCollectionMessageHandler.ShardRequestTracker(asyncId, message.getStr("operation"), ocmh.adminPath, zkStateReader,
+              ocmh.shardHandlerFactory, ocmh.overseer);
+
       @SuppressWarnings({"unchecked"})
-      List<Replica> failedReplicas = ocmh.collectionCmd(internalMsg, params, results, null, asyncId, okayExceptions);
+      List<Replica> failedReplicas = ocmh.collectionCmd(internalMsg, params, results, null, asyncId, okayExceptions,
+          shardHandler, shardRequestTracker);
 
       if (failedReplicas == null) {
         // TODO: handle this in any special way? more logging?
@@ -161,17 +178,30 @@ public class DeleteCollectionCmd implements OverseerCollectionMessageHandler.Cmd
 
     AddReplicaCmd.Response response = new AddReplicaCmd.Response();
 
-    if (results.get("failure") == null && results.get("exception") == null) {
 
-      response.asyncFinalRunner = new OverseerCollectionMessageHandler.Finalize() {
-        @Override
-        public AddReplicaCmd.Response call() {
-          // TODO: wait for delete collection?
-          AddReplicaCmd.Response response = new AddReplicaCmd.Response();
-          return response;
+
+    //if (results.get("failure") == null && results.get("exception") == null) {
+
+    ShardHandler finalShardHandler = shardHandler;
+    OverseerCollectionMessageHandler.ShardRequestTracker finalShardRequestTracker = shardRequestTracker;
+    response.asyncFinalRunner = new OverseerCollectionMessageHandler.Finalize() {
+      @Override
+      public AddReplicaCmd.Response call() {
+        if (finalShardHandler != null && finalShardRequestTracker != null) {
+          try {
+            finalShardRequestTracker.processResponses(results, finalShardHandler, false, null, okayExceptions);
+            // TODO: wait for delete collection?
+            zkStateReader.waitForState(collection, 5, TimeUnit.SECONDS, (l, c) -> c == null);
+          } catch (Exception e) {
+            log.error("Exception waiting for results of delete collection cmd", e);
+          }
         }
-      };
-    }
+
+        AddReplicaCmd.Response response = new AddReplicaCmd.Response();
+        return response;
+      }
+    };
+    //}
     response.clusterState = clusterState;
     return response;
   }
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/OverseerCollectionMessageHandler.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/OverseerCollectionMessageHandler.java
index d7222ea..a176120 100644
--- a/solr/core/src/java/org/apache/solr/cloud/api/collections/OverseerCollectionMessageHandler.java
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/OverseerCollectionMessageHandler.java
@@ -718,27 +718,42 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler,
     return collectionCmd( message, params, results, stateMatcher, asyncId, Collections.emptySet());
   }
 
+  List<Replica> collectionCmd(ZkNodeProps message, ModifiableSolrParams params,
+      NamedList<Object> results, Replica.State stateMatcher,
+      String asyncId, Set<String> okayExceptions) throws KeeperException, InterruptedException {
+    return collectionCmd(message, params, results, stateMatcher, asyncId, okayExceptions, null, null);
+  }
+
   /**
    * Send request to all replicas of a collection
    * @return List of replicas which is not live for receiving the request
    */
   List<Replica> collectionCmd(ZkNodeProps message, ModifiableSolrParams params,
-                              NamedList<Object> results, Replica.State stateMatcher, String asyncId, Set<String> okayExceptions) throws KeeperException, InterruptedException {
+                              NamedList<Object> results, Replica.State stateMatcher,
+      String asyncId, Set<String> okayExceptions, ShardHandler shardHandler,
+      ShardRequestTracker shardRequestTracker) throws KeeperException, InterruptedException {
     log.info("Executing Collection Cmd={}, asyncId={}", params, asyncId);
     String collectionName = message.getStr(NAME);
-    @SuppressWarnings("deprecation")
-    ShardHandler shardHandler = shardHandlerFactory.getShardHandler(overseerLbClient);
+    boolean processResponses = false;
+
+    if (shardHandler == null) {
+      shardHandler = shardHandlerFactory.getShardHandler(overseerLbClient);
+      processResponses = true;
+    }
 
     ClusterState clusterState = zkStateReader.getClusterState();
     DocCollection coll = clusterState.getCollectionOrNull(collectionName);
     if (coll == null) return null;
     List<Replica> notLivesReplicas = new ArrayList<>();
-    final ShardRequestTracker shardRequestTracker = new ShardRequestTracker(asyncId, message.getStr("operation"), adminPath, zkStateReader, shardHandlerFactory, overseer);
+    if (shardRequestTracker == null) {
+      shardRequestTracker = new ShardRequestTracker(asyncId, message.getStr("operation"), adminPath, zkStateReader, shardHandlerFactory, overseer);
+    }
     for (Slice slice : coll.getSlices()) {
       notLivesReplicas.addAll(shardRequestTracker.sliceCmd(clusterState, params, stateMatcher, slice, shardHandler));
     }
-
-    shardRequestTracker.processResponses(results, shardHandler, false, null, okayExceptions);
+    if (processResponses) {
+      shardRequestTracker.processResponses(results, shardHandler, false, null, okayExceptions);
+    }
     return notLivesReplicas;
   }
 
@@ -1021,7 +1036,7 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler,
     private final Overseer overseer;
     private final String operation;
 
-    private ShardRequestTracker(String asyncId, String operation, String adminPath, ZkStateReader reader,  HttpShardHandlerFactory shardHandlerFactory,  Overseer overseer) {
+    ShardRequestTracker(String asyncId, String operation, String adminPath, ZkStateReader reader, HttpShardHandlerFactory shardHandlerFactory, Overseer overseer) {
       this.asyncId = asyncId;
       this.adminPath = adminPath;
       this.operation = operation;