You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ma...@apache.org on 2020/11/10 04:38:26 UTC
[lucene-solr] 02/05: @1130 Tuning collection delete
This is an automated email from the ASF dual-hosted git repository.
markrmiller pushed a commit to branch reference_impl_dev
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git
commit b38a92ddf07ec50a417f36ee7b9bb7d06d25afe8
Author: markrmiller@gmail.com <ma...@gmail.com>
AuthorDate: Mon Nov 9 20:42:04 2020 -0600
@1130 Tuning collection delete
---
.../src/java/org/apache/solr/cloud/Overseer.java | 110 +++++++++++----------
.../cloud/api/collections/DeleteCollectionCmd.java | 28 +++---
.../org/apache/solr/common/cloud/SolrZkClient.java | 8 +-
3 files changed, 77 insertions(+), 69 deletions(-)
diff --git a/solr/core/src/java/org/apache/solr/cloud/Overseer.java b/solr/core/src/java/org/apache/solr/cloud/Overseer.java
index 3535dfb..4f73290 100644
--- a/solr/core/src/java/org/apache/solr/cloud/Overseer.java
+++ b/solr/core/src/java/org/apache/solr/cloud/Overseer.java
@@ -956,11 +956,11 @@ public class Overseer implements SolrCloseable {
Map<String,byte[]> data = zkController.getZkClient().getData(fullPaths);
- ParWork.getRootSharedExecutor().submit(() -> {
+ overseer.getTaskExecutor().submit(() -> {
try {
runAsync(items, fullPaths, data);
} catch (Exception e) {
- log.error("failed processing collection queue items " + items);
+ log.error("failed processing collection queue items " + items, e);
}
});
} finally {
@@ -969,73 +969,81 @@ public class Overseer implements SolrCloseable {
}
- private void runAsync(List<String> items, List<String> fullPaths, Map<String,byte[]> data) throws KeeperException {
+ private void runAsync(List<String> items, List<String> fullPaths, Map<String,byte[]> data) {
for (Map.Entry<String,byte[]> entry : data.entrySet()) {
- byte[] item = entry.getValue();
- if (item == null) {
- log.error("empty item {}", entry.getKey());
- continue;
- }
-
- final ZkNodeProps message = ZkNodeProps.load(item);
try {
- String operation = message.getStr(Overseer.QUEUE_OPERATION);
- if (operation == null) {
- log.error("Msg does not have required " + Overseer.QUEUE_OPERATION + ": {}", message);
+ byte[] item = entry.getValue();
+ if (item == null) {
+ log.error("empty item {}", entry.getKey());
continue;
}
- final String asyncId = message.getStr(ASYNC);
+ final ZkNodeProps message = ZkNodeProps.load(item);
+ try {
+ String operation = message.getStr(Overseer.QUEUE_OPERATION);
+ if (operation == null) {
+ log.error("Msg does not have required " + Overseer.QUEUE_OPERATION + ": {}", message);
+ continue;
+ }
- OverseerSolrResponse response;
- if (operation != null && operation.startsWith(CONFIGSETS_ACTION_PREFIX)) {
- response = configMessageHandler.processMessage(message, operation);
- } else {
- response = collMessageHandler.processMessage(message, operation);
- }
+ final String asyncId = message.getStr(ASYNC);
- // try {
- // overseer.writePendingUpdates();
- // } catch (InterruptedException e) {
- // log.error("Overseer state update queue processing interrupted");
- // return;
- // }
+ OverseerSolrResponse response;
+ if (operation != null && operation.startsWith(CONFIGSETS_ACTION_PREFIX)) {
+ response = configMessageHandler.processMessage(message, operation);
+ } else {
+ response = collMessageHandler.processMessage(message, operation);
+ }
- log.info("response {}", response);
+ // try {
+ // overseer.writePendingUpdates();
+ // } catch (InterruptedException e) {
+ // log.error("Overseer state update queue processing interrupted");
+ // return;
+ // }
+
+ log.info("response {}", response);
+
+ if (asyncId != null) {
+ if (response != null && (response.getResponse().get("failure") != null || response.getResponse().get("exception") != null)) {
+ if (log.isDebugEnabled()) {
+ log.debug("Updated failed map for task with id:[{}]", asyncId);
+ }
+ failureMap.put(asyncId, OverseerSolrResponseSerializer.serialize(response));
+ } else {
+ if (log.isDebugEnabled()) {
+ log.debug("Updated completed map for task with zkid:[{}]", asyncId);
+ }
+ completedMap.put(asyncId, OverseerSolrResponseSerializer.serialize(response));
- if (asyncId != null) {
- if (response != null && (response.getResponse().get("failure") != null || response.getResponse().get("exception") != null)) {
- if (log.isDebugEnabled()) {
- log.debug("Updated failed map for task with id:[{}]", asyncId);
}
- failureMap.put(asyncId, OverseerSolrResponseSerializer.serialize(response));
} else {
- if (log.isDebugEnabled()) {
- log.debug("Updated completed map for task with zkid:[{}]", asyncId);
- }
- completedMap.put(asyncId, OverseerSolrResponseSerializer.serialize(response));
-
+ byte[] sdata = OverseerSolrResponseSerializer.serialize(response);
+ String responsePath = Overseer.OVERSEER_COLLECTION_MAP_COMPLETED + "/" + OverseerTaskQueue.RESPONSE_PREFIX + entry.getKey().substring(entry.getKey().lastIndexOf("-") + 1);
+ zkController.getZkClient().setData(responsePath, sdata, true);
+ log.debug("Completed task:[{}] {}", message, response.getResponse());
}
- } else {
- byte[] sdata = OverseerSolrResponseSerializer.serialize(response);
- String responsePath = Overseer.OVERSEER_COLLECTION_MAP_COMPLETED + "/" + OverseerTaskQueue.RESPONSE_PREFIX + entry.getKey().substring(entry.getKey().lastIndexOf("-") + 1);
- zkController.getZkClient().setData(responsePath, sdata, true);
- log.debug("Completed task:[{}] {}", message, response.getResponse());
+
+ } catch (InterruptedException e) {
+ log.error("Overseer state update queue processing interrupted");
+ return;
}
- } catch (InterruptedException e) {
- log.error("Overseer state update queue processing interrupted");
- return;
+ } catch (Exception e) {
+ log.warn("Exception deleting processed zk nodes", e);
}
- }
+ try {
+ for (String item : items) {
+ if (item.startsWith("qnr-")) {
+ fullPaths.remove(path + "/" + item);
+ }
+ }
- for (String item : items) {
- if (item.startsWith("qnr-")) {
- fullPaths.remove(path + "/" + item);
+ zkController.getZkClient().delete(fullPaths, true);
+ } catch (Exception e) {
+ log.warn("Exception deleting processed zk nodes", e);
}
}
-
- zkController.getZkClient().delete(fullPaths, true);
}
}
}
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteCollectionCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteCollectionCmd.java
index f86dd55..2ac109e 100644
--- a/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteCollectionCmd.java
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteCollectionCmd.java
@@ -79,7 +79,7 @@ public class DeleteCollectionCmd implements OverseerCollectionMessageHandler.Cmd
@Override
public AddReplicaCmd.Response call(ClusterState clusterState, ZkNodeProps message, @SuppressWarnings({"rawtypes"})NamedList results) throws Exception {
- log.info("delete collection called");
+ log.info("delete collection called {}", message);
Object o = message.get(MaintainRoutedAliasCmd.INVOKED_BY_ROUTED_ALIAS);
if (o != null) {
((Runnable)o).run(); // this will ensure the collection is removed from the alias before it disappears.
@@ -116,6 +116,14 @@ public class DeleteCollectionCmd implements OverseerCollectionMessageHandler.Cmd
SolrZkClient zkClient = zkStateReader.getZkClient();
SolrSnapshotManager.cleanupCollectionLevelSnapshots(zkClient, collection);
+ log.info("Check if collection exists in zookeeper {}", collection);
+
+ if (!zkStateReader.getZkClient().exists(ZkStateReader.COLLECTIONS_ZKNODE + "/" + collection)) {
+ throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Could not find collection");
+ }
+
+
+ log.info("Collection exists, remove it, {}", collection);
// remove collection-level metrics history
if (deleteHistory) {
MetricsHistoryHandler historyHandler = ocmh.overseer.getCoreContainer().getMetricsHistoryHandler();
@@ -132,25 +140,15 @@ public class DeleteCollectionCmd implements OverseerCollectionMessageHandler.Cmd
String asyncId = message.getStr(ASYNC);
-
ZkNodeProps internalMsg = message.plus(NAME, collection);
- if (!zkStateReader.getZkClient().exists(ZkStateReader.COLLECTIONS_ZKNODE + "/" + collection)) {
- throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Could not find collection");
- }
-
-
- clusterState = new ClusterStateMutator(ocmh.cloudManager) .deleteCollection(clusterState, collection);
+ clusterState = new ClusterStateMutator(ocmh.cloudManager).deleteCollection(clusterState, collection);
shardHandler = ocmh.shardHandlerFactory.getShardHandler(ocmh.overseerLbClient);
- shardRequestTracker =
- new OverseerCollectionMessageHandler.ShardRequestTracker(asyncId, message.getStr("operation"), ocmh.adminPath, zkStateReader,
- ocmh.shardHandlerFactory, ocmh.overseer);
+ shardRequestTracker = new OverseerCollectionMessageHandler.ShardRequestTracker(asyncId, message.getStr("operation"), ocmh.adminPath, zkStateReader, ocmh.shardHandlerFactory, ocmh.overseer);
- @SuppressWarnings({"unchecked"})
- List<Replica> failedReplicas = ocmh.collectionCmd(internalMsg, params, results, null, asyncId, okayExceptions,
- shardHandler, shardRequestTracker);
+ @SuppressWarnings({"unchecked"}) List<Replica> failedReplicas = ocmh.collectionCmd(internalMsg, params, results, null, asyncId, okayExceptions, shardHandler, shardRequestTracker);
if (failedReplicas == null) {
// TODO: handle this in any special way? more logging?
@@ -190,8 +188,6 @@ public class DeleteCollectionCmd implements OverseerCollectionMessageHandler.Cmd
if (finalShardHandler != null && finalShardRequestTracker != null) {
try {
finalShardRequestTracker.processResponses(results, finalShardHandler, false, null, okayExceptions);
- // TODO: wait for delete collection?
- zkStateReader.waitForState(collection, 5, TimeUnit.SECONDS, (l, c) -> c == null);
} catch (Exception e) {
log.error("Exception waiting for results of delete collection cmd", e);
}
diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/SolrZkClient.java b/solr/solrj/src/java/org/apache/solr/common/cloud/SolrZkClient.java
index 8b0fb93..f352a88 100644
--- a/solr/solrj/src/java/org/apache/solr/common/cloud/SolrZkClient.java
+++ b/solr/solrj/src/java/org/apache/solr/common/cloud/SolrZkClient.java
@@ -315,9 +315,13 @@ public class SolrZkClient implements Closeable {
throws KeeperException, InterruptedException {
ZooKeeper keeper = connManager.getKeeper();
if (retryOnConnLoss) {
- return zkCmdExecutor.retryOperation(() -> keeper.exists(path, null) != null);
+ Stat existsStat = zkCmdExecutor.retryOperation(() -> keeper.exists(path, null));
+ log.info("exists state return is {} {}", path, existsStat);
+ return existsStat != null;
} else {
- return keeper.exists(path, null) != null;
+ Stat existsStat = keeper.exists(path, null);
+ log.info("exists state return is {} {}", path, existsStat);
+ return existsStat != null;
}
}