You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ma...@apache.org on 2020/11/09 16:11:53 UTC
[lucene-solr] branch reference_impl_dev updated: @1120 Finishing
DeleteCollectionCmd.
This is an automated email from the ASF dual-hosted git repository.
markrmiller pushed a commit to branch reference_impl_dev
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git
The following commit(s) were added to refs/heads/reference_impl_dev by this push:
new dc9ff2f @1120 Finishing DeleteCollectionCmd.
dc9ff2f is described below
commit dc9ff2f38c82861a4cbd9f873d103a446710992e
Author: markrmiller@gmail.com <ma...@gmail.com>
AuthorDate: Mon Nov 9 10:11:21 2020 -0600
@1120 Finishing DeleteCollectionCmd.
---
.../java/org/apache/solr/cloud/ZkShardTerms.java | 4 ++
.../cloud/api/collections/DeleteCollectionCmd.java | 56 +++++++++++++++++-----
.../OverseerCollectionMessageHandler.java | 29 ++++++++---
3 files changed, 69 insertions(+), 20 deletions(-)
diff --git a/solr/core/src/java/org/apache/solr/cloud/ZkShardTerms.java b/solr/core/src/java/org/apache/solr/cloud/ZkShardTerms.java
index a44795a..6fedd89 100644
--- a/solr/core/src/java/org/apache/solr/cloud/ZkShardTerms.java
+++ b/solr/core/src/java/org/apache/solr/cloud/ZkShardTerms.java
@@ -338,6 +338,10 @@ public class ZkShardTerms implements AutoCloseable{
Stat stat = new Stat();
byte[] data = zkClient.getData(znodePath, null, stat, true);
newTerms = new ShardTerms((Map<String, Long>) Utils.fromJSON(data), stat.getVersion());
+ } catch (KeeperException.NoNodeException e) {
+ if (log.isDebugEnabled()) log.debug("No node found for refresh terms", e);
+ // we have likely been deleted
+ return;
} catch (InterruptedException e) {
ParWork.propagateInterrupt(e);
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Error updating shard term for collection: " + collection, e);
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteCollectionCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteCollectionCmd.java
index 1666912..f86dd55 100644
--- a/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteCollectionCmd.java
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteCollectionCmd.java
@@ -27,6 +27,7 @@ import org.apache.solr.common.cloud.ClusterState;
import org.apache.solr.common.cloud.DocCollection;
import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.cloud.ReplicaPosition;
+import org.apache.solr.common.cloud.Slice;
import org.apache.solr.common.cloud.SolrZkClient;
import org.apache.solr.common.cloud.ZkNodeProps;
import org.apache.solr.common.cloud.ZkStateReader;
@@ -38,7 +39,9 @@ import org.apache.solr.common.util.Utils;
import org.apache.solr.core.SolrInfoBean;
import org.apache.solr.core.snapshots.SolrSnapshotManager;
import org.apache.solr.handler.admin.MetricsHistoryHandler;
+import org.apache.solr.handler.component.ShardHandler;
import org.apache.solr.metrics.SolrMetricManager;
+import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -60,6 +63,12 @@ import java.util.stream.Collectors;
public class DeleteCollectionCmd implements OverseerCollectionMessageHandler.Cmd {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+ static Set<String> okayExceptions = new HashSet<>(1);
+ static {
+ okayExceptions.add(NonExistentCoreException.class.getName());
+ }
+
private final OverseerCollectionMessageHandler ocmh;
private final TimeSource timeSource;
@@ -77,7 +86,8 @@ public class DeleteCollectionCmd implements OverseerCollectionMessageHandler.Cmd
}
final String extCollection = message.getStr(NAME);
ZkStateReader zkStateReader = ocmh.zkStateReader;
-
+ ShardHandler shardHandler = null;
+ OverseerCollectionMessageHandler.ShardRequestTracker shardRequestTracker = null;
boolean skipFinalStateWork = false;
if (zkStateReader.aliasesManager != null) { // not a mock ZkStateReader
@@ -122,8 +132,7 @@ public class DeleteCollectionCmd implements OverseerCollectionMessageHandler.Cmd
String asyncId = message.getStr(ASYNC);
- Set<String> okayExceptions = new HashSet<>(1);
- okayExceptions.add(NonExistentCoreException.class.getName());
+
ZkNodeProps internalMsg = message.plus(NAME, collection);
if (!zkStateReader.getZkClient().exists(ZkStateReader.COLLECTIONS_ZKNODE + "/" + collection)) {
@@ -132,8 +141,16 @@ public class DeleteCollectionCmd implements OverseerCollectionMessageHandler.Cmd
clusterState = new ClusterStateMutator(ocmh.cloudManager) .deleteCollection(clusterState, collection);
+
+ shardHandler = ocmh.shardHandlerFactory.getShardHandler(ocmh.overseerLbClient);
+
+ shardRequestTracker =
+ new OverseerCollectionMessageHandler.ShardRequestTracker(asyncId, message.getStr("operation"), ocmh.adminPath, zkStateReader,
+ ocmh.shardHandlerFactory, ocmh.overseer);
+
@SuppressWarnings({"unchecked"})
- List<Replica> failedReplicas = ocmh.collectionCmd(internalMsg, params, results, null, asyncId, okayExceptions);
+ List<Replica> failedReplicas = ocmh.collectionCmd(internalMsg, params, results, null, asyncId, okayExceptions,
+ shardHandler, shardRequestTracker);
if (failedReplicas == null) {
// TODO: handle this in any special way? more logging?
@@ -161,17 +178,30 @@ public class DeleteCollectionCmd implements OverseerCollectionMessageHandler.Cmd
AddReplicaCmd.Response response = new AddReplicaCmd.Response();
- if (results.get("failure") == null && results.get("exception") == null) {
- response.asyncFinalRunner = new OverseerCollectionMessageHandler.Finalize() {
- @Override
- public AddReplicaCmd.Response call() {
- // TODO: wait for delete collection?
- AddReplicaCmd.Response response = new AddReplicaCmd.Response();
- return response;
+
+ //if (results.get("failure") == null && results.get("exception") == null) {
+
+ ShardHandler finalShardHandler = shardHandler;
+ OverseerCollectionMessageHandler.ShardRequestTracker finalShardRequestTracker = shardRequestTracker;
+ response.asyncFinalRunner = new OverseerCollectionMessageHandler.Finalize() {
+ @Override
+ public AddReplicaCmd.Response call() {
+ if (finalShardHandler != null && finalShardRequestTracker != null) {
+ try {
+ finalShardRequestTracker.processResponses(results, finalShardHandler, false, null, okayExceptions);
+ // TODO: wait for delete collection?
+ zkStateReader.waitForState(collection, 5, TimeUnit.SECONDS, (l, c) -> c == null);
+ } catch (Exception e) {
+ log.error("Exception waiting for results of delete collection cmd", e);
+ }
}
- };
- }
+
+ AddReplicaCmd.Response response = new AddReplicaCmd.Response();
+ return response;
+ }
+ };
+ //}
response.clusterState = clusterState;
return response;
}
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/OverseerCollectionMessageHandler.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/OverseerCollectionMessageHandler.java
index d7222ea..a176120 100644
--- a/solr/core/src/java/org/apache/solr/cloud/api/collections/OverseerCollectionMessageHandler.java
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/OverseerCollectionMessageHandler.java
@@ -718,27 +718,42 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler,
return collectionCmd( message, params, results, stateMatcher, asyncId, Collections.emptySet());
}
+ List<Replica> collectionCmd(ZkNodeProps message, ModifiableSolrParams params,
+ NamedList<Object> results, Replica.State stateMatcher,
+ String asyncId, Set<String> okayExceptions) throws KeeperException, InterruptedException {
+ return collectionCmd(message, params, results, stateMatcher, asyncId, okayExceptions, null, null);
+ }
+
/**
* Send request to all replicas of a collection
* @return List of replicas which is not live for receiving the request
*/
List<Replica> collectionCmd(ZkNodeProps message, ModifiableSolrParams params,
- NamedList<Object> results, Replica.State stateMatcher, String asyncId, Set<String> okayExceptions) throws KeeperException, InterruptedException {
+ NamedList<Object> results, Replica.State stateMatcher,
+ String asyncId, Set<String> okayExceptions, ShardHandler shardHandler,
+ ShardRequestTracker shardRequestTracker) throws KeeperException, InterruptedException {
log.info("Executing Collection Cmd={}, asyncId={}", params, asyncId);
String collectionName = message.getStr(NAME);
- @SuppressWarnings("deprecation")
- ShardHandler shardHandler = shardHandlerFactory.getShardHandler(overseerLbClient);
+ boolean processResponses = false;
+
+ if (shardHandler == null) {
+ shardHandler = shardHandlerFactory.getShardHandler(overseerLbClient);
+ processResponses = true;
+ }
ClusterState clusterState = zkStateReader.getClusterState();
DocCollection coll = clusterState.getCollectionOrNull(collectionName);
if (coll == null) return null;
List<Replica> notLivesReplicas = new ArrayList<>();
- final ShardRequestTracker shardRequestTracker = new ShardRequestTracker(asyncId, message.getStr("operation"), adminPath, zkStateReader, shardHandlerFactory, overseer);
+ if (shardRequestTracker == null) {
+ shardRequestTracker = new ShardRequestTracker(asyncId, message.getStr("operation"), adminPath, zkStateReader, shardHandlerFactory, overseer);
+ }
for (Slice slice : coll.getSlices()) {
notLivesReplicas.addAll(shardRequestTracker.sliceCmd(clusterState, params, stateMatcher, slice, shardHandler));
}
-
- shardRequestTracker.processResponses(results, shardHandler, false, null, okayExceptions);
+ if (processResponses) {
+ shardRequestTracker.processResponses(results, shardHandler, false, null, okayExceptions);
+ }
return notLivesReplicas;
}
@@ -1021,7 +1036,7 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler,
private final Overseer overseer;
private final String operation;
- private ShardRequestTracker(String asyncId, String operation, String adminPath, ZkStateReader reader, HttpShardHandlerFactory shardHandlerFactory, Overseer overseer) {
+ ShardRequestTracker(String asyncId, String operation, String adminPath, ZkStateReader reader, HttpShardHandlerFactory shardHandlerFactory, Overseer overseer) {
this.asyncId = asyncId;
this.adminPath = adminPath;
this.operation = operation;