You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ma...@apache.org on 2020/09/07 07:14:42 UTC
[lucene-solr] branch reference_impl_dev updated: @769 Some work on
addReplica and async.
This is an automated email from the ASF dual-hosted git repository.
markrmiller pushed a commit to branch reference_impl_dev
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git
The following commit(s) were added to refs/heads/reference_impl_dev by this push:
new 3a58695 @769 Some work on addReplica and async.
3a58695 is described below
commit 3a58695fa5e08ada310dce67f47f2e3e5a1bd6c3
Author: markrmiller@gmail.com <ma...@gmail.com>
AuthorDate: Mon Sep 7 02:14:27 2020 -0500
@769 Some work on addReplica and async.
---
.../solr/cloud/api/collections/AddReplicaCmd.java | 33 +++++++++++++++++++---
.../OverseerCollectionMessageHandler.java | 29 ++++++++++---------
.../solr/handler/admin/CollectionsHandler.java | 2 +-
.../test/org/apache/solr/cloud/AddReplicaTest.java | 20 ++++++++-----
4 files changed, 58 insertions(+), 26 deletions(-)
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/AddReplicaCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/AddReplicaCmd.java
index 9407f05..fab63a4 100644
--- a/solr/core/src/java/org/apache/solr/cloud/api/collections/AddReplicaCmd.java
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/AddReplicaCmd.java
@@ -193,6 +193,7 @@ public class AddReplicaCmd implements OverseerCollectionMessageHandler.Cmd {
shardRequestTracker.sendShardRequest(createReplica.node, params, shardHandler);
}
+ int finalTotalReplicas = totalReplicas;
Runnable runnable = () -> {
try {
shardRequestTracker.processResponses(results, shardHandler, true, "ADDREPLICA failed to create replica");
@@ -202,18 +203,42 @@ public class AddReplicaCmd implements OverseerCollectionMessageHandler.Cmd {
ParWork.propegateInterrupt(e);
throw new SolrException(SolrException.ErrorCode.SERVICE_UNAVAILABLE, "Interrupted", e);
}
- for (CreateReplica replica : createReplicas) {
- ocmh.waitForCoreNodeName(zkStateReader, collectionName, replica.node, replica.coreName);
+
+ if (asyncId != null) {
+ List<String> coreNodeNames = new ArrayList<>();
+ for (CreateReplica replica : createReplicas) {
+ coreNodeNames.add(ocmh.waitForCoreNodeName(zkStateReader, collectionName, replica.node, replica.coreName));
+ }
+
+ SolrCloseableLatch latch = new SolrCloseableLatch(finalTotalReplicas, ocmh);
+ ActiveReplicaWatcher watcher = new ActiveReplicaWatcher(collectionName, coreNodeNames, null, latch);
+ try {
+ zkStateReader.registerCollectionStateWatcher(collectionName, watcher);
+ try {
+ if (!latch.await(timeout, TimeUnit.SECONDS)) {
+ throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Timeout waiting " + timeout + " seconds for replica to become active.");
+ }
+ } catch (InterruptedException e) {
+ ParWork.propegateInterrupt(e);
+ throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e);
+ }
+ } finally {
+ zkStateReader.removeCollectionStateWatcher(collectionName, watcher);
+ }
}
+
if (onComplete != null) onComplete.run();
};
if (asyncId == null) {
-
+ List<String> coreNodeNames = new ArrayList<>();
+ for (CreateReplica replica : createReplicas) {
+ coreNodeNames.add(ocmh.waitForCoreNodeName(zkStateReader, collectionName, replica.node, replica.coreName));
+ }
SolrCloseableLatch latch = new SolrCloseableLatch(totalReplicas, ocmh);
- ActiveReplicaWatcher watcher = new ActiveReplicaWatcher(collectionName, null, createReplicas.stream().map(createReplica -> createReplica.coreName).collect(Collectors.toList()), latch);
+ ActiveReplicaWatcher watcher = new ActiveReplicaWatcher(collectionName, coreNodeNames, null, latch);
try {
zkStateReader.registerCollectionStateWatcher(collectionName, watcher);
ParWork.getRootSharedExecutor().execute(runnable);
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/OverseerCollectionMessageHandler.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/OverseerCollectionMessageHandler.java
index 126804d..e7603f7 100644
--- a/solr/core/src/java/org/apache/solr/cloud/api/collections/OverseerCollectionMessageHandler.java
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/OverseerCollectionMessageHandler.java
@@ -894,7 +894,7 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler,
Watcher waitForAsyncId = new Watcher() {
@Override
public void process(WatchedEvent event) {
- if (Watcher.Event.EventType.None.equals(event.getType()) && !Watcher.Event.KeeperState.Expired.equals(event.getState())) {
+ if (Watcher.Event.EventType.None.equals(event.getType())) {
return;
}
if (event.getType().equals(Watcher.Event.EventType.NodeCreated)) {
@@ -902,20 +902,21 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler,
} else if (event.getType().equals(Event.EventType.NodeDeleted)) {
// no-op: gets deleted below once we're done with it
return;
- } else {
- Stat rstats2 = null;
- try {
- rstats2 = zkStateReader.getZkClient().exists(asyncPathToWaitOn, this);
- } catch (KeeperException e) {
- log.error("ZooKeeper exception", e);
- } catch (InterruptedException e) {
- ParWork.propegateInterrupt(e);
- return;
- }
- if (rstats2 != null) {
- latch.countDown();
- }
}
+
+ Stat rstats2 = null;
+ try {
+ rstats2 = zkStateReader.getZkClient().exists(asyncPathToWaitOn, this);
+ } catch (KeeperException e) {
+ log.error("ZooKeeper exception", e);
+ } catch (InterruptedException e) {
+ ParWork.propegateInterrupt(e);
+ return;
+ }
+ if (rstats2 != null) {
+ latch.countDown();
+ }
+
}
};
diff --git a/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java b/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java
index 983eb2a..15239cb 100644
--- a/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java
+++ b/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java
@@ -300,7 +300,7 @@ public class CollectionsHandler extends RequestHandlerBase implements Permission
}
if (collection != null) {
DocCollection coll = coreContainer.getZkController().getZkStateReader().getClusterState().getCollectionOrNull(collection);
- if (coll != null) {
+ if (coll != null && !action.equals(DELETE)) {
rsp.add("csver", coll.getZNodeVersion());
} else {
// deleted
diff --git a/solr/core/src/test/org/apache/solr/cloud/AddReplicaTest.java b/solr/core/src/test/org/apache/solr/cloud/AddReplicaTest.java
index f798c6b..fe680df 100644
--- a/solr/core/src/test/org/apache/solr/cloud/AddReplicaTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/AddReplicaTest.java
@@ -23,6 +23,7 @@ import java.lang.invoke.MethodHandles;
import java.util.Collection;
import java.util.EnumSet;
import java.util.LinkedHashSet;
+import java.util.concurrent.atomic.AtomicInteger;
import org.apache.solr.client.solrj.impl.CloudHttp2SolrClient;
import org.apache.solr.client.solrj.impl.CloudSolrClient;
@@ -48,6 +49,9 @@ import org.slf4j.LoggerFactory;
public class AddReplicaTest extends SolrCloudTestCase {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+ private AtomicInteger asyncId = new AtomicInteger();
+
+
@BeforeClass
public static void setupCluster() throws Exception {
configureCluster(3)
@@ -159,8 +163,9 @@ public class AddReplicaTest extends SolrCloudTestCase {
String sliceName = coll.getSlices().iterator().next().getName();
Collection<Replica> replicas = coll.getSlice(sliceName).getReplicas();
CollectionAdminRequest.AddReplica addReplica = CollectionAdminRequest.addReplicaToShard(collection, sliceName);
- addReplica.processAsync("000", cloudClient);
- CollectionAdminRequest.RequestStatus requestStatus = CollectionAdminRequest.requestStatus("000");
+ int aid1 = asyncId.incrementAndGet();
+ addReplica.processAsync(Integer.toString(aid1), cloudClient);
+ CollectionAdminRequest.RequestStatus requestStatus = CollectionAdminRequest.requestStatus(Integer.toString(aid1));
CollectionAdminRequest.RequestStatusResponse rsp = requestStatus.process(cloudClient);
assertNotSame(rsp.getRequestStatus(), COMPLETED);
@@ -183,10 +188,14 @@ public class AddReplicaTest extends SolrCloudTestCase {
replicas2.removeAll(replicas);
assertEquals(1, replicas2.size());
+
+ cluster.waitForActiveCollection(collection, 2, 2);
+
// use waitForFinalState
addReplica.setWaitForFinalState(true);
- addReplica.processAsync("001", cloudClient);
- requestStatus = CollectionAdminRequest.requestStatus("001");
+ int aid2 = asyncId.incrementAndGet();
+ addReplica.processAsync(Integer.toString(aid2), cloudClient);
+ requestStatus = CollectionAdminRequest.requestStatus(Integer.toString(aid2));
rsp = requestStatus.process(cloudClient);
assertNotSame(rsp.getRequestStatus(), COMPLETED);
// wait for async request success
@@ -209,9 +218,6 @@ public class AddReplicaTest extends SolrCloudTestCase {
String replica2 = replicas2.iterator().next().getName();
assertEquals(2, replicas3.size());
for (Replica replica : replicas3) {
- if (replica.getName().equals(replica2)) {
- continue; // may be still recovering
- }
assertSame(coll.toString() + "\n" + replica.toString(), replica.getState(), Replica.State.ACTIVE);
}
}