You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by is...@apache.org on 2021/02/12 06:01:36 UTC

[lucene-solr] branch jira/solr15138_8x updated: General cleanup, address review comment

This is an automated email from the ASF dual-hosted git repository.

ishan pushed a commit to branch jira/solr15138_8x
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git


The following commit(s) were added to refs/heads/jira/solr15138_8x by this push:
     new 2a2bba1  General cleanup, address review comment
2a2bba1 is described below

commit 2a2bba1c4d1fdbf78f0b671cf4b92bfeb476fff0
Author: Ishan Chattopadhyaya <is...@apache.org>
AuthorDate: Fri Feb 12 11:31:03 2021 +0530

    General cleanup, address review comment
---
 solr/core/src/java/org/apache/solr/cloud/Overseer.java  |  5 +++--
 .../org/apache/solr/cloud/RefreshCollectionMessage.java |  8 ++++----
 .../solr/cloud/api/collections/CreateCollectionCmd.java | 17 +++++++++++------
 .../org/apache/solr/common/cloud/PerReplicaStates.java  |  8 +++++---
 .../org/apache/solr/common/cloud/ZkStateReader.java     |  6 +++---
 .../solr/client/solrj/impl/CloudSolrClientTest.java     |  2 +-
 6 files changed, 27 insertions(+), 19 deletions(-)

diff --git a/solr/core/src/java/org/apache/solr/cloud/Overseer.java b/solr/core/src/java/org/apache/solr/cloud/Overseer.java
index 358ae71..a10e8a6 100644
--- a/solr/core/src/java/org/apache/solr/cloud/Overseer.java
+++ b/solr/core/src/java/org/apache/solr/cloud/Overseer.java
@@ -1027,8 +1027,9 @@ public class Overseer implements SolrCloseable {
     getStateUpdateQueue().offer(data);
   }
 
-  /**Submit an intra-process message
-   * This will be picked up and executed when clusterstate updater thread runs
+  /**
+   * Submit an intra-process message which will be picked up and executed when {@link ClusterStateUpdater}'s
+   * loop runs next time
    */
   public void submit(Message message) {
     unprocessedMessages.add(message);
diff --git a/solr/core/src/java/org/apache/solr/cloud/RefreshCollectionMessage.java b/solr/core/src/java/org/apache/solr/cloud/RefreshCollectionMessage.java
index 0716ad6..5375322 100644
--- a/solr/core/src/java/org/apache/solr/cloud/RefreshCollectionMessage.java
+++ b/solr/core/src/java/org/apache/solr/cloud/RefreshCollectionMessage.java
@@ -23,8 +23,8 @@ import org.apache.solr.common.cloud.ZkStateReader;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.data.Stat;
 
-/**Refresh the Cluster State for a given collection
- *
+/**
+ * Refresh the ClusterState for a given collection
  */
 public class RefreshCollectionMessage implements Overseer.Message {
   public final Operation operation;
@@ -38,12 +38,12 @@ public class RefreshCollectionMessage implements Overseer.Message {
   ClusterState run(ClusterState clusterState, Overseer overseer) throws InterruptedException, KeeperException {
     Stat stat = overseer.getZkStateReader().getZkClient().exists(ZkStateReader.getCollectionPath(collection), null, true);
     if (stat == null) {
-      //collection does not exist
+      // collection does not exist
       return clusterState.copyWith(collection, null);
     }
     DocCollection coll = clusterState.getCollectionOrNull(collection);
     if (coll != null && !coll.isModified(stat.getVersion(), stat.getCversion())) {
-      //our state is up to date
+      // our state is up to date
       return clusterState;
     } else {
       coll = ZkStateReader.getCollectionLive(overseer.getZkStateReader(), collection);
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateCollectionCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateCollectionCmd.java
index c430c0d..6c1d51c 100644
--- a/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateCollectionCmd.java
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateCollectionCmd.java
@@ -119,7 +119,7 @@ public class CreateCollectionCmd implements OverseerCollectionMessageHandler.Cmd
     final boolean waitForFinalState = message.getBool(WAIT_FOR_FINAL_STATE, false);
     final String alias = message.getStr(ALIAS, collectionName);
     log.info("Create collection {}", collectionName);
-    final boolean isPrs = message.getBool(DocCollection.PER_REPLICA_STATE, false);
+    final boolean isPRS = message.getBool(DocCollection.PER_REPLICA_STATE, false);
     if (clusterState.hasCollection(collectionName)) {
       throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "collection already exists: " + collectionName);
     }
@@ -179,7 +179,10 @@ public class CreateCollectionCmd implements OverseerCollectionMessageHandler.Cmd
 
       createCollectionZkNode(stateManager, collectionName, collectionParams);
 
-      if(isPrs) {
+      if (isPRS) {
+        // In case of a PRS collection, create the collection structure directly instead of resubmitting
+        // to the overseer queue.
+        // TODO: Consider doing this for all collections, not just the PRS collections.
         ZkWriteCommand command = new ClusterStateMutator(ocmh.cloudManager).createCollection(clusterState, message);
         byte[] data = Utils.toJSON(Collections.singletonMap(collectionName, command.collection));
         ocmh.zkStateReader.getZkClient().create(collectionPath, data, CreateMode.PERSISTENT, true);
@@ -269,10 +272,12 @@ public class CreateCollectionCmd implements OverseerCollectionMessageHandler.Cmd
               ZkStateReader.NODE_NAME_PROP, nodeName,
               ZkStateReader.REPLICA_TYPE, replicaPosition.type.name(),
               CommonAdminParams.WAIT_FOR_FINAL_STATE, Boolean.toString(waitForFinalState));
-          if(isPrs) {
+          if (isPRS) {
+            // In case of a PRS collection, execute the ADDREPLICA directly instead of resubmitting
+            // to the overseer queue.
+            // TODO: Consider doing this for all collections, not just the PRS collections.
             ZkWriteCommand command = new SliceMutator(ocmh.cloudManager).addReplica(clusterState, props);
             byte[] data = Utils.toJSON(Collections.singletonMap(collectionName, command.collection));
-//        log.info("collection updated : {}", new String(data, StandardCharsets.UTF_8));
             ocmh.zkStateReader.getZkClient().setData(collectionPath, data, true);
             clusterState = clusterState.copyWith(collectionName, command.collection);
             newColl = command.collection;
@@ -318,7 +323,7 @@ public class CreateCollectionCmd implements OverseerCollectionMessageHandler.Cmd
       if(!isLegacyCloud) {
         // wait for all replica entries to be created
         Map<String, Replica> replicas ;
-        if(isPrs) {
+        if (isPRS) {
           replicas = new ConcurrentHashMap<>();
           newColl.getSlices().stream().flatMap(slice -> slice.getReplicas().stream())
               .filter(r -> coresToCreate.containsKey(r.getCoreName()))       // Only the elements that were asked for...
@@ -337,7 +342,7 @@ public class CreateCollectionCmd implements OverseerCollectionMessageHandler.Cmd
       shardRequestTracker.processResponses(results, shardHandler, false, null, Collections.emptySet());
       @SuppressWarnings({"rawtypes"})
       boolean failure = results.get("failure") != null && ((SimpleOrderedMap)results.get("failure")).size() > 0;
-      if(isPrs) {
+      if (isPRS) {
         TimeOut timeout = new TimeOut(Integer.getInteger("solr.waitToSeeReplicasInStateTimeoutSeconds", 120), TimeUnit.SECONDS, timeSource); // could be a big cluster
         PerReplicaStates prs = PerReplicaStates.fetch(collectionPath, ocmh.zkStateReader.getZkClient(), null);
         while (!timeout.hasTimedOut()) {
diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/PerReplicaStates.java b/solr/solrj/src/java/org/apache/solr/common/cloud/PerReplicaStates.java
index be40066..4fdfc32 100644
--- a/solr/solrj/src/java/org/apache/solr/common/cloud/PerReplicaStates.java
+++ b/solr/solrj/src/java/org/apache/solr/common/cloud/PerReplicaStates.java
@@ -27,6 +27,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.function.BiConsumer;
 
 import org.apache.solr.cluster.api.SimpleMap;
@@ -67,7 +68,7 @@ public class PerReplicaStates implements ReflectMapWriter {
   @JsonProperty
   public final SimpleMap<State> states;
 
-  private volatile Boolean allActive;
+  private volatile AtomicBoolean allActive;
 
   /**
    * Construct with data read from ZK
@@ -97,12 +98,13 @@ public class PerReplicaStates implements ReflectMapWriter {
   /** Check and return if all replicas are ACTIVE
    */
   public boolean allActive() {
-    if (this.allActive != null) return allActive;
+    if (this.allActive != null) return allActive.get();
     boolean[] result = new boolean[]{true};
     states.forEachEntry((r, s) -> {
       if (s.state != Replica.State.ACTIVE) result[0] = false;
     });
-    return this.allActive = result[0];
+    this.allActive.set(result[0]);
+    return this.allActive.get();
   }
 
   /**Get the changed replicas
diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java b/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java
index d9e831e..3ac45b4 100644
--- a/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java
+++ b/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java
@@ -1400,7 +1400,7 @@ public class ZkStateReader implements SolrCloseable {
         }
 
       } catch (NoNodeException e) {
-        log.info("{} is deleted, stop watching children", collectionPath);
+        log.debug("{} is deleted, stop watching children", collectionPath);
       }
     }
   }
@@ -1620,7 +1620,7 @@ public class ZkStateReader implements SolrCloseable {
       ClusterState.initReplicaStateProvider(() -> {
         try {
           PerReplicaStates replicaStates = PerReplicaStates.fetch(collectionPath, zkClient, null);
-          log.info("per-replica-state ver: {} fetched for initializing {} ", replicaStates.cversion, collectionPath);
+          log.debug("per-replica-state ver: {} fetched for initializing {} ", replicaStates.cversion, collectionPath);
           return replicaStates;
         } catch (Exception e) {
           throw new SolrException(ErrorCode.SERVER_ERROR, "Error fetching per-replica-states");
@@ -1764,7 +1764,7 @@ public class ZkStateReader implements SolrCloseable {
         v = new CollectionWatch<>();
         watchSet.set(true);
       }
-      log.info("already watching , added to stateWatchers");
+      log.debug("already watching , added to stateWatchers");
       v.stateWatchers.add(stateWatcher);
       return v;
     });
diff --git a/solr/solrj/src/test/org/apache/solr/client/solrj/impl/CloudSolrClientTest.java b/solr/solrj/src/test/org/apache/solr/client/solrj/impl/CloudSolrClientTest.java
index e6920bf..b85499d 100644
--- a/solr/solrj/src/test/org/apache/solr/client/solrj/impl/CloudSolrClientTest.java
+++ b/solr/solrj/src/test/org/apache/solr/client/solrj/impl/CloudSolrClientTest.java
@@ -1100,7 +1100,7 @@ public class CloudSolrClientTest extends SolrCloudTestCase {
     PerReplicaStates prs = PerReplicaStates.fetch(ZkStateReader.getCollectionPath(testCollection), cluster.getZkClient(), null);
     assertEquals(4, prs.states.size());
 
-    //Now let's do an add replica
+    // Now let's do an add replica
     CollectionAdminRequest
         .addReplicaToShard(testCollection, "shard1")
         .process(cluster.getSolrClient());