You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ma...@apache.org on 2020/09/07 07:14:42 UTC

[lucene-solr] branch reference_impl_dev updated: @769 Some work on addReplica and async.

This is an automated email from the ASF dual-hosted git repository.

markrmiller pushed a commit to branch reference_impl_dev
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git


The following commit(s) were added to refs/heads/reference_impl_dev by this push:
     new 3a58695  @769 Some work on addReplica and async.
3a58695 is described below

commit 3a58695fa5e08ada310dce67f47f2e3e5a1bd6c3
Author: markrmiller@gmail.com <ma...@gmail.com>
AuthorDate: Mon Sep 7 02:14:27 2020 -0500

    @769 Some work on addReplica and async.
---
 .../solr/cloud/api/collections/AddReplicaCmd.java  | 33 +++++++++++++++++++---
 .../OverseerCollectionMessageHandler.java          | 29 ++++++++++---------
 .../solr/handler/admin/CollectionsHandler.java     |  2 +-
 .../test/org/apache/solr/cloud/AddReplicaTest.java | 20 ++++++++-----
 4 files changed, 58 insertions(+), 26 deletions(-)

diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/AddReplicaCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/AddReplicaCmd.java
index 9407f05..fab63a4 100644
--- a/solr/core/src/java/org/apache/solr/cloud/api/collections/AddReplicaCmd.java
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/AddReplicaCmd.java
@@ -193,6 +193,7 @@ public class AddReplicaCmd implements OverseerCollectionMessageHandler.Cmd {
       shardRequestTracker.sendShardRequest(createReplica.node, params, shardHandler);
     }
 
+    int finalTotalReplicas = totalReplicas;
     Runnable runnable = () -> {
       try {
         shardRequestTracker.processResponses(results, shardHandler, true, "ADDREPLICA failed to create replica");
@@ -202,18 +203,42 @@ public class AddReplicaCmd implements OverseerCollectionMessageHandler.Cmd {
         ParWork.propegateInterrupt(e);
         throw new SolrException(SolrException.ErrorCode.SERVICE_UNAVAILABLE, "Interrupted", e);
       }
-      for (CreateReplica replica : createReplicas) {
-        ocmh.waitForCoreNodeName(zkStateReader, collectionName, replica.node, replica.coreName);
+
+      if (asyncId != null) {
+        List<String> coreNodeNames = new ArrayList<>();
+        for (CreateReplica replica : createReplicas) {
+          coreNodeNames.add(ocmh.waitForCoreNodeName(zkStateReader, collectionName, replica.node, replica.coreName));
+        }
+
+        SolrCloseableLatch latch = new SolrCloseableLatch(finalTotalReplicas, ocmh);
+        ActiveReplicaWatcher watcher = new ActiveReplicaWatcher(collectionName, coreNodeNames, null, latch);
+        try {
+          zkStateReader.registerCollectionStateWatcher(collectionName, watcher);
+          try {
+            if (!latch.await(timeout, TimeUnit.SECONDS)) {
+              throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Timeout waiting " + timeout + " seconds for replica to become active.");
+            }
+          } catch (InterruptedException e) {
+            ParWork.propegateInterrupt(e);
+            throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e);
+          }
+        } finally {
+          zkStateReader.removeCollectionStateWatcher(collectionName, watcher);
+        }
       }
+
       if (onComplete != null) onComplete.run();
     };
 
 
 
     if (asyncId == null) {
-
+      List<String> coreNodeNames = new ArrayList<>();
+      for (CreateReplica replica : createReplicas) {
+        coreNodeNames.add(ocmh.waitForCoreNodeName(zkStateReader, collectionName, replica.node, replica.coreName));
+      }
       SolrCloseableLatch latch = new SolrCloseableLatch(totalReplicas, ocmh);
-      ActiveReplicaWatcher watcher = new ActiveReplicaWatcher(collectionName, null, createReplicas.stream().map(createReplica -> createReplica.coreName).collect(Collectors.toList()), latch);
+      ActiveReplicaWatcher watcher = new ActiveReplicaWatcher(collectionName, coreNodeNames, null, latch);
       try {
         zkStateReader.registerCollectionStateWatcher(collectionName, watcher);
         ParWork.getRootSharedExecutor().execute(runnable);
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/OverseerCollectionMessageHandler.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/OverseerCollectionMessageHandler.java
index 126804d..e7603f7 100644
--- a/solr/core/src/java/org/apache/solr/cloud/api/collections/OverseerCollectionMessageHandler.java
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/OverseerCollectionMessageHandler.java
@@ -894,7 +894,7 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler,
       Watcher waitForAsyncId = new Watcher() {
         @Override
         public void process(WatchedEvent event) {
-          if (Watcher.Event.EventType.None.equals(event.getType()) && !Watcher.Event.KeeperState.Expired.equals(event.getState())) {
+          if (Watcher.Event.EventType.None.equals(event.getType())) {
             return;
           }
           if (event.getType().equals(Watcher.Event.EventType.NodeCreated)) {
@@ -902,20 +902,21 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler,
           } else if (event.getType().equals(Event.EventType.NodeDeleted)) {
             // no-op: gets deleted below once we're done with it
             return;
-          } else {
-            Stat rstats2 = null;
-            try {
-              rstats2 = zkStateReader.getZkClient().exists(asyncPathToWaitOn, this);
-            } catch (KeeperException e) {
-              log.error("ZooKeeper exception", e);
-            } catch (InterruptedException e) {
-              ParWork.propegateInterrupt(e);
-              return;
-            }
-            if (rstats2 != null) {
-              latch.countDown();
-            }
           }
+
+          Stat rstats2 = null;
+          try {
+            rstats2 = zkStateReader.getZkClient().exists(asyncPathToWaitOn, this);
+          } catch (KeeperException e) {
+            log.error("ZooKeeper exception", e);
+          } catch (InterruptedException e) {
+            ParWork.propegateInterrupt(e);
+            return;
+          }
+          if (rstats2 != null) {
+            latch.countDown();
+          }
+
         }
       };
 
diff --git a/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java b/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java
index 983eb2a..15239cb 100644
--- a/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java
+++ b/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java
@@ -300,7 +300,7 @@ public class CollectionsHandler extends RequestHandlerBase implements Permission
       }
       if (collection != null) {
         DocCollection coll = coreContainer.getZkController().getZkStateReader().getClusterState().getCollectionOrNull(collection);
-        if (coll != null) {
+        if (coll != null && !action.equals(DELETE)) {
           rsp.add("csver", coll.getZNodeVersion());
         } else {
           // deleted
diff --git a/solr/core/src/test/org/apache/solr/cloud/AddReplicaTest.java b/solr/core/src/test/org/apache/solr/cloud/AddReplicaTest.java
index f798c6b..fe680df 100644
--- a/solr/core/src/test/org/apache/solr/cloud/AddReplicaTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/AddReplicaTest.java
@@ -23,6 +23,7 @@ import java.lang.invoke.MethodHandles;
 import java.util.Collection;
 import java.util.EnumSet;
 import java.util.LinkedHashSet;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.solr.client.solrj.impl.CloudHttp2SolrClient;
 import org.apache.solr.client.solrj.impl.CloudSolrClient;
@@ -48,6 +49,9 @@ import org.slf4j.LoggerFactory;
 public class AddReplicaTest extends SolrCloudTestCase {
   private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
 
+  private AtomicInteger asyncId = new AtomicInteger();
+
+
   @BeforeClass
   public static void setupCluster() throws Exception {
     configureCluster(3)
@@ -159,8 +163,9 @@ public class AddReplicaTest extends SolrCloudTestCase {
     String sliceName = coll.getSlices().iterator().next().getName();
     Collection<Replica> replicas = coll.getSlice(sliceName).getReplicas();
     CollectionAdminRequest.AddReplica addReplica = CollectionAdminRequest.addReplicaToShard(collection, sliceName);
-    addReplica.processAsync("000", cloudClient);
-    CollectionAdminRequest.RequestStatus requestStatus = CollectionAdminRequest.requestStatus("000");
+    int aid1 = asyncId.incrementAndGet();
+    addReplica.processAsync(Integer.toString(aid1), cloudClient);
+    CollectionAdminRequest.RequestStatus requestStatus = CollectionAdminRequest.requestStatus(Integer.toString(aid1));
     CollectionAdminRequest.RequestStatusResponse rsp = requestStatus.process(cloudClient);
 
     assertNotSame(rsp.getRequestStatus(), COMPLETED);
@@ -183,10 +188,14 @@ public class AddReplicaTest extends SolrCloudTestCase {
     replicas2.removeAll(replicas);
     assertEquals(1, replicas2.size());
 
+
+    cluster.waitForActiveCollection(collection, 2, 2);
+
     // use waitForFinalState
     addReplica.setWaitForFinalState(true);
-    addReplica.processAsync("001", cloudClient);
-    requestStatus = CollectionAdminRequest.requestStatus("001");
+    int aid2 = asyncId.incrementAndGet();
+    addReplica.processAsync(Integer.toString(aid2), cloudClient);
+    requestStatus = CollectionAdminRequest.requestStatus(Integer.toString(aid2));
     rsp = requestStatus.process(cloudClient);
     assertNotSame(rsp.getRequestStatus(), COMPLETED);
     // wait for async request success
@@ -209,9 +218,6 @@ public class AddReplicaTest extends SolrCloudTestCase {
     String replica2 = replicas2.iterator().next().getName();
     assertEquals(2, replicas3.size());
     for (Replica replica : replicas3) {
-      if (replica.getName().equals(replica2)) {
-        continue; // may be still recovering
-      }
       assertSame(coll.toString() + "\n" + replica.toString(), replica.getState(), Replica.State.ACTIVE);
     }
   }