You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ma...@apache.org on 2020/11/10 04:38:26 UTC

[lucene-solr] 02/05: @1130 Tuning collection delete

This is an automated email from the ASF dual-hosted git repository.

markrmiller pushed a commit to branch reference_impl_dev
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git

commit b38a92ddf07ec50a417f36ee7b9bb7d06d25afe8
Author: markrmiller@gmail.com <ma...@gmail.com>
AuthorDate: Mon Nov 9 20:42:04 2020 -0600

    @1130 Tuning collection delete
---
 .../src/java/org/apache/solr/cloud/Overseer.java   | 110 +++++++++++----------
 .../cloud/api/collections/DeleteCollectionCmd.java |  28 +++---
 .../org/apache/solr/common/cloud/SolrZkClient.java |   8 +-
 3 files changed, 77 insertions(+), 69 deletions(-)

diff --git a/solr/core/src/java/org/apache/solr/cloud/Overseer.java b/solr/core/src/java/org/apache/solr/cloud/Overseer.java
index 3535dfb..4f73290 100644
--- a/solr/core/src/java/org/apache/solr/cloud/Overseer.java
+++ b/solr/core/src/java/org/apache/solr/cloud/Overseer.java
@@ -956,11 +956,11 @@ public class Overseer implements SolrCloseable {
 
           Map<String,byte[]> data = zkController.getZkClient().getData(fullPaths);
 
-          ParWork.getRootSharedExecutor().submit(() -> {
+          overseer.getTaskExecutor().submit(() -> {
             try {
               runAsync(items, fullPaths, data);
             } catch (Exception e) {
-              log.error("failed processing collection queue items " + items);
+              log.error("failed processing collection queue items " + items, e);
             }
           });
         } finally {
@@ -969,73 +969,81 @@ public class Overseer implements SolrCloseable {
 
       }
 
-      private void runAsync(List<String> items, List<String> fullPaths, Map<String,byte[]> data) throws KeeperException {
+      private void runAsync(List<String> items, List<String> fullPaths, Map<String,byte[]> data) {
         for (Map.Entry<String,byte[]> entry : data.entrySet()) {
-          byte[] item = entry.getValue();
-          if (item == null) {
-            log.error("empty item {}", entry.getKey());
-            continue;
-          }
-
-          final ZkNodeProps message = ZkNodeProps.load(item);
           try {
-            String operation = message.getStr(Overseer.QUEUE_OPERATION);
-            if (operation == null) {
-              log.error("Msg does not have required " + Overseer.QUEUE_OPERATION + ": {}", message);
+            byte[] item = entry.getValue();
+            if (item == null) {
+              log.error("empty item {}", entry.getKey());
               continue;
             }
 
-            final String asyncId = message.getStr(ASYNC);
+            final ZkNodeProps message = ZkNodeProps.load(item);
+            try {
+              String operation = message.getStr(Overseer.QUEUE_OPERATION);
+              if (operation == null) {
+                log.error("Msg does not have required " + Overseer.QUEUE_OPERATION + ": {}", message);
+                continue;
+              }
 
-            OverseerSolrResponse response;
-            if (operation != null && operation.startsWith(CONFIGSETS_ACTION_PREFIX)) {
-              response = configMessageHandler.processMessage(message, operation);
-            } else {
-              response = collMessageHandler.processMessage(message, operation);
-            }
+              final String asyncId = message.getStr(ASYNC);
 
-            //          try {
-            //            overseer.writePendingUpdates();
-            //          } catch (InterruptedException e) {
-            //            log.error("Overseer state update queue processing interrupted");
-            //            return;
-            //          }
+              OverseerSolrResponse response;
+              if (operation != null && operation.startsWith(CONFIGSETS_ACTION_PREFIX)) {
+                response = configMessageHandler.processMessage(message, operation);
+              } else {
+                response = collMessageHandler.processMessage(message, operation);
+              }
 
-            log.info("response {}", response);
+              //          try {
+              //            overseer.writePendingUpdates();
+              //          } catch (InterruptedException e) {
+              //            log.error("Overseer state update queue processing interrupted");
+              //            return;
+              //          }
+
+              log.info("response {}", response);
+
+              if (asyncId != null) {
+                if (response != null && (response.getResponse().get("failure") != null || response.getResponse().get("exception") != null)) {
+                  if (log.isDebugEnabled()) {
+                    log.debug("Updated failed map for task with id:[{}]", asyncId);
+                  }
+                  failureMap.put(asyncId, OverseerSolrResponseSerializer.serialize(response));
+                } else {
+                  if (log.isDebugEnabled()) {
+                    log.debug("Updated completed map for task with zkid:[{}]", asyncId);
+                  }
+                  completedMap.put(asyncId, OverseerSolrResponseSerializer.serialize(response));
 
-            if (asyncId != null) {
-              if (response != null && (response.getResponse().get("failure") != null || response.getResponse().get("exception") != null)) {
-                if (log.isDebugEnabled()) {
-                  log.debug("Updated failed map for task with id:[{}]", asyncId);
                 }
-                failureMap.put(asyncId, OverseerSolrResponseSerializer.serialize(response));
               } else {
-                if (log.isDebugEnabled()) {
-                  log.debug("Updated completed map for task with zkid:[{}]", asyncId);
-                }
-                completedMap.put(asyncId, OverseerSolrResponseSerializer.serialize(response));
-
+                byte[] sdata = OverseerSolrResponseSerializer.serialize(response);
+                String responsePath = Overseer.OVERSEER_COLLECTION_MAP_COMPLETED + "/" + OverseerTaskQueue.RESPONSE_PREFIX + entry.getKey().substring(entry.getKey().lastIndexOf("-") + 1);
+                zkController.getZkClient().setData(responsePath, sdata, true);
+                log.debug("Completed task:[{}] {}", message, response.getResponse());
               }
-            } else {
-              byte[] sdata = OverseerSolrResponseSerializer.serialize(response);
-              String responsePath = Overseer.OVERSEER_COLLECTION_MAP_COMPLETED + "/" + OverseerTaskQueue.RESPONSE_PREFIX + entry.getKey().substring(entry.getKey().lastIndexOf("-") + 1);
-              zkController.getZkClient().setData(responsePath, sdata, true);
-              log.debug("Completed task:[{}] {}", message, response.getResponse());
+
+            } catch (InterruptedException e) {
+              log.error("Overseer state update queue processing interrupted");
+              return;
             }
 
-          } catch (InterruptedException e) {
-            log.error("Overseer state update queue processing interrupted");
-            return;
+          } catch (Exception e) {
+            log.warn("Exception deleting processed zk nodes", e);
           }
-        }
+          try {
+            for (String item : items) {
+              if (item.startsWith("qnr-")) {
+                fullPaths.remove(path + "/" + item);
+              }
+            }
 
-        for (String item : items) {
-          if (item.startsWith("qnr-")) {
-            fullPaths.remove(path + "/" + item);
+            zkController.getZkClient().delete(fullPaths, true);
+          } catch (Exception e) {
+            log.warn("Exception deleting processed zk nodes", e);
           }
         }
-
-        zkController.getZkClient().delete(fullPaths, true);
       }
     }
   }
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteCollectionCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteCollectionCmd.java
index f86dd55..2ac109e 100644
--- a/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteCollectionCmd.java
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteCollectionCmd.java
@@ -79,7 +79,7 @@ public class DeleteCollectionCmd implements OverseerCollectionMessageHandler.Cmd
 
   @Override
   public AddReplicaCmd.Response call(ClusterState clusterState, ZkNodeProps message, @SuppressWarnings({"rawtypes"})NamedList results) throws Exception {
-    log.info("delete collection called");
+    log.info("delete collection called {}", message);
     Object o = message.get(MaintainRoutedAliasCmd.INVOKED_BY_ROUTED_ALIAS);
     if (o != null) {
       ((Runnable)o).run(); // this will ensure the collection is removed from the alias before it disappears.
@@ -116,6 +116,14 @@ public class DeleteCollectionCmd implements OverseerCollectionMessageHandler.Cmd
       SolrZkClient zkClient = zkStateReader.getZkClient();
       SolrSnapshotManager.cleanupCollectionLevelSnapshots(zkClient, collection);
 
+      log.info("Check if collection exists in zookeeper {}", collection);
+
+      if (!zkStateReader.getZkClient().exists(ZkStateReader.COLLECTIONS_ZKNODE + "/" + collection)) {
+        throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Could not find collection");
+      }
+
+
+      log.info("Collection exists, remove it, {}", collection);
       // remove collection-level metrics history
       if (deleteHistory) {
         MetricsHistoryHandler historyHandler = ocmh.overseer.getCoreContainer().getMetricsHistoryHandler();
@@ -132,25 +140,15 @@ public class DeleteCollectionCmd implements OverseerCollectionMessageHandler.Cmd
 
       String asyncId = message.getStr(ASYNC);
 
-
       ZkNodeProps internalMsg = message.plus(NAME, collection);
 
-      if (!zkStateReader.getZkClient().exists(ZkStateReader.COLLECTIONS_ZKNODE + "/" + collection)) {
-        throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Could not find collection");
-      }
-
-
-      clusterState = new ClusterStateMutator(ocmh.cloudManager) .deleteCollection(clusterState, collection);
+      clusterState = new ClusterStateMutator(ocmh.cloudManager).deleteCollection(clusterState, collection);
 
       shardHandler = ocmh.shardHandlerFactory.getShardHandler(ocmh.overseerLbClient);
 
-      shardRequestTracker =
-          new OverseerCollectionMessageHandler.ShardRequestTracker(asyncId, message.getStr("operation"), ocmh.adminPath, zkStateReader,
-              ocmh.shardHandlerFactory, ocmh.overseer);
+      shardRequestTracker = new OverseerCollectionMessageHandler.ShardRequestTracker(asyncId, message.getStr("operation"), ocmh.adminPath, zkStateReader, ocmh.shardHandlerFactory, ocmh.overseer);
 
-      @SuppressWarnings({"unchecked"})
-      List<Replica> failedReplicas = ocmh.collectionCmd(internalMsg, params, results, null, asyncId, okayExceptions,
-          shardHandler, shardRequestTracker);
+      @SuppressWarnings({"unchecked"}) List<Replica> failedReplicas = ocmh.collectionCmd(internalMsg, params, results, null, asyncId, okayExceptions, shardHandler, shardRequestTracker);
 
       if (failedReplicas == null) {
         // TODO: handle this in any special way? more logging?
@@ -190,8 +188,6 @@ public class DeleteCollectionCmd implements OverseerCollectionMessageHandler.Cmd
         if (finalShardHandler != null && finalShardRequestTracker != null) {
           try {
             finalShardRequestTracker.processResponses(results, finalShardHandler, false, null, okayExceptions);
-            // TODO: wait for delete collection?
-            zkStateReader.waitForState(collection, 5, TimeUnit.SECONDS, (l, c) -> c == null);
           } catch (Exception e) {
             log.error("Exception waiting for results of delete collection cmd", e);
           }
diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/SolrZkClient.java b/solr/solrj/src/java/org/apache/solr/common/cloud/SolrZkClient.java
index 8b0fb93..f352a88 100644
--- a/solr/solrj/src/java/org/apache/solr/common/cloud/SolrZkClient.java
+++ b/solr/solrj/src/java/org/apache/solr/common/cloud/SolrZkClient.java
@@ -315,9 +315,13 @@ public class SolrZkClient implements Closeable {
       throws KeeperException, InterruptedException {
     ZooKeeper keeper = connManager.getKeeper();
     if (retryOnConnLoss) {
-      return zkCmdExecutor.retryOperation(() -> keeper.exists(path, null) != null);
+      Stat existsStat = zkCmdExecutor.retryOperation(() -> keeper.exists(path, null));
+      log.info("exists state return is {} {}", path, existsStat);
+      return existsStat != null;
     } else {
-      return keeper.exists(path, null) != null;
+      Stat existsStat = keeper.exists(path, null);
+      log.info("exists state return is {} {}", path, existsStat);
+      return existsStat != null;
     }
   }