You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by no...@apache.org on 2021/02/11 13:37:09 UTC

[lucene-solr] branch jira/solr15138_8x updated: porting PRS improvements from master

This is an automated email from the ASF dual-hosted git repository.

noble pushed a commit to branch jira/solr15138_8x
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git


The following commit(s) were added to refs/heads/jira/solr15138_8x by this push:
     new dcd538d  porting PRS improvements from master
dcd538d is described below

commit dcd538daf5f04f40c3e4a52f9dd1a9e5ac1c4880
Author: Noble Paul <no...@gmail.com>
AuthorDate: Fri Feb 12 00:36:34 2021 +1100

    porting PRS improvements from master
---
 .../src/java/org/apache/solr/cloud/Overseer.java   | 24 +++++++
 .../solr/cloud/RefreshCollectionMessage.java       | 58 ++++++++++++++++
 .../cloud/api/collections/CreateCollectionCmd.java | 78 +++++++++++++++++-----
 .../apache/solr/common/cloud/PerReplicaStates.java | 13 ++++
 4 files changed, 156 insertions(+), 17 deletions(-)

diff --git a/solr/core/src/java/org/apache/solr/cloud/Overseer.java b/solr/core/src/java/org/apache/solr/cloud/Overseer.java
index 3f52f74..b287f0c 100644
--- a/solr/core/src/java/org/apache/solr/cloud/Overseer.java
+++ b/solr/core/src/java/org/apache/solr/cloud/Overseer.java
@@ -29,6 +29,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
+import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.function.BiConsumer;
 
 import org.apache.lucene.util.Version;
@@ -95,6 +96,7 @@ public class Overseer implements SolrCloseable {
 
   public static final int NUM_RESPONSES_TO_STORE = 10000;
   public static final String OVERSEER_ELECT = "/overseer_elect";
+  private final CopyOnWriteArrayList<Message> unprocessedMessages = new CopyOnWriteArrayList<>();
 
   private SolrMetricsContext solrMetricsContext;
   private volatile String metricTag = SolrMetricProducer.getUniqueMetricTag(this, null);
@@ -200,6 +202,13 @@ public class Overseer implements SolrCloseable {
                 }
                 // force flush to ZK after each message because there is no fallback if workQueue items
                 // are removed from workQueue but fail to be written to ZK
+                while (unprocessedMessages.size() > 0) {
+                  zkStateWriter.writePendingUpdates();
+                  Message m = unprocessedMessages.remove(0);
+                  if (m instanceof RefreshCollectionMessage) {
+                    clusterState = ((RefreshCollectionMessage) m).run(clusterState, Overseer.this);
+                  }
+                }
                 try {
                   clusterState = processQueueItem(message, clusterState, zkStateWriter, false, null);
                 } catch (Exception e) {
@@ -1018,4 +1027,19 @@ public class Overseer implements SolrCloseable {
     getStateUpdateQueue().offer(data);
   }
 
+  /**Submit an intra-process message
+   * This will be picked up and executed when clusterstate updater thread runs
+   */
+  public void submit(Message message) {
+    unprocessedMessages.add(message);
+  }
+
+  public interface Message {
+    Operation getOperation();
+
+    enum Operation {
+      REFRESH_COLL;
+    }
+  }
+
 }
diff --git a/solr/core/src/java/org/apache/solr/cloud/RefreshCollectionMessage.java b/solr/core/src/java/org/apache/solr/cloud/RefreshCollectionMessage.java
new file mode 100644
index 0000000..0716ad6
--- /dev/null
+++ b/solr/core/src/java/org/apache/solr/cloud/RefreshCollectionMessage.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.cloud;
+
+import org.apache.solr.common.cloud.ClusterState;
+import org.apache.solr.common.cloud.DocCollection;
+import org.apache.solr.common.cloud.ZkStateReader;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.data.Stat;
+
+/**Refresh the Cluster State for a given collection
+ *
+ */
+public class RefreshCollectionMessage implements Overseer.Message {
+  public final Operation operation;
+  public final String collection;
+
+  public RefreshCollectionMessage(String collection) {
+    this.operation = Operation.REFRESH_COLL;
+    this.collection = collection;
+  }
+
+  ClusterState run(ClusterState clusterState, Overseer overseer) throws InterruptedException, KeeperException {
+    Stat stat = overseer.getZkStateReader().getZkClient().exists(ZkStateReader.getCollectionPath(collection), null, true);
+    if (stat == null) {
+      //collection does not exist
+      return clusterState.copyWith(collection, null);
+    }
+    DocCollection coll = clusterState.getCollectionOrNull(collection);
+    if (coll != null && !coll.isModified(stat.getVersion(), stat.getCversion())) {
+      //our state is up to date
+      return clusterState;
+    } else {
+      coll = ZkStateReader.getCollectionLive(overseer.getZkStateReader(), collection);
+      return clusterState.copyWith(collection, coll);
+    }
+  }
+
+  @Override
+  public Operation getOperation() {
+    return operation;
+  }
+}
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateCollectionCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateCollectionCmd.java
index a9299cc..31d8675 100644
--- a/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateCollectionCmd.java
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateCollectionCmd.java
@@ -41,9 +41,12 @@ import org.apache.solr.client.solrj.cloud.autoscaling.NotEmptyException;
 import org.apache.solr.client.solrj.cloud.autoscaling.PolicyHelper;
 import org.apache.solr.client.solrj.cloud.autoscaling.VersionedData;
 import org.apache.solr.cloud.Overseer;
+import org.apache.solr.cloud.RefreshCollectionMessage;
 import org.apache.solr.cloud.ZkController;
 import org.apache.solr.cloud.api.collections.OverseerCollectionMessageHandler.ShardRequestTracker;
 import org.apache.solr.cloud.overseer.ClusterStateMutator;
+import org.apache.solr.cloud.overseer.SliceMutator;
+import org.apache.solr.cloud.overseer.ZkWriteCommand;
 import org.apache.solr.common.SolrException;
 import org.apache.solr.common.SolrException.ErrorCode;
 import org.apache.solr.common.cloud.Aliases;
@@ -51,6 +54,7 @@ import org.apache.solr.common.cloud.ClusterState;
 import org.apache.solr.common.cloud.DocCollection;
 import org.apache.solr.common.cloud.DocRouter;
 import org.apache.solr.common.cloud.ImplicitDocRouter;
+import org.apache.solr.common.cloud.PerReplicaStates;
 import org.apache.solr.common.cloud.Replica;
 import org.apache.solr.common.cloud.ReplicaPosition;
 import org.apache.solr.common.cloud.ZkConfigManager;
@@ -114,6 +118,7 @@ public class CreateCollectionCmd implements OverseerCollectionMessageHandler.Cmd
     final boolean waitForFinalState = message.getBool(WAIT_FOR_FINAL_STATE, false);
     final String alias = message.getStr(ALIAS, collectionName);
     log.info("Create collection {}", collectionName);
+    final boolean isPrs = message.getBool(DocCollection.PER_REPLICA_STATE, false);
     if (clusterState.hasCollection(collectionName)) {
       throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "collection already exists: " + collectionName);
     }
@@ -148,6 +153,8 @@ public class CreateCollectionCmd implements OverseerCollectionMessageHandler.Cmd
     // fail fast if parameters are wrong or incomplete
     List<String> shardNames = populateShardNames(message, router);
     checkReplicaTypes(message);
+    DocCollection newColl = null;
+    final String collectionPath = ZkStateReader.getCollectionPath(collectionName);
 
     AtomicReference<PolicyHelper.SessionWrapper> sessionWrapper = new AtomicReference<>();
 
@@ -170,27 +177,38 @@ public class CreateCollectionCmd implements OverseerCollectionMessageHandler.Cmd
       }
 
       createCollectionZkNode(stateManager, collectionName, collectionParams);
-      
-      ocmh.overseer.offerStateUpdate(Utils.toJSON(message));
-
-      // wait for a while until we see the collection
-      TimeOut waitUntil = new TimeOut(30, TimeUnit.SECONDS, timeSource);
-      boolean created = false;
-      while (! waitUntil.hasTimedOut()) {
-        waitUntil.sleep(100);
-        created = ocmh.cloudManager.getClusterStateProvider().getClusterState().hasCollection(collectionName);
-        if (created) break;
-      }
-      if (!created) {
-        throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Could not fully create collection: " + collectionName);
+
+      if(isPrs) {
+        ZkWriteCommand command = new ClusterStateMutator(ocmh.cloudManager).createCollection(clusterState, message);
+        byte[] data = Utils.toJSON(Collections.singletonMap(collectionName, command.collection));
+        ocmh.zkStateReader.getZkClient().create(collectionPath, data, CreateMode.PERSISTENT, true);
+        clusterState = clusterState.copyWith(collectionName, command.collection);
+        newColl = command.collection;
+      } else {
+        ocmh.overseer.offerStateUpdate(Utils.toJSON(message));
+
+        // wait for a while until we see the collection
+        TimeOut waitUntil = new TimeOut(30, TimeUnit.SECONDS, timeSource);
+        boolean created = false;
+        while (!waitUntil.hasTimedOut()) {
+          waitUntil.sleep(100);
+          created = ocmh.cloudManager.getClusterStateProvider().getClusterState().hasCollection(collectionName);
+          if (created) break;
+        }
+        if (!created) {
+          throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Could not fully create collection: " + collectionName);
+        }
+
+        // refresh cluster state
+        clusterState = ocmh.cloudManager.getClusterStateProvider().getClusterState();
+        newColl = clusterState.getCollection(collectionName);
+
       }
 
-      // refresh cluster state
-      clusterState = ocmh.cloudManager.getClusterStateProvider().getClusterState();
 
       List<ReplicaPosition> replicaPositions = null;
       try {
-        replicaPositions = buildReplicaPositions(ocmh.cloudManager, clusterState, clusterState.getCollection(collectionName), message, shardNames, sessionWrapper);
+        replicaPositions = buildReplicaPositions(ocmh.cloudManager, clusterState, newColl, message, shardNames, sessionWrapper);
       } catch (Assign.AssignmentException e) {
         ZkNodeProps deleteMessage = new ZkNodeProps("name", collectionName);
         new DeleteCollectionCmd(ocmh).call(clusterState, deleteMessage, results);
@@ -250,7 +268,16 @@ public class CreateCollectionCmd implements OverseerCollectionMessageHandler.Cmd
               ZkStateReader.NODE_NAME_PROP, nodeName,
               ZkStateReader.REPLICA_TYPE, replicaPosition.type.name(),
               CommonAdminParams.WAIT_FOR_FINAL_STATE, Boolean.toString(waitForFinalState));
-          ocmh.overseer.offerStateUpdate(Utils.toJSON(props));
+          if(isPrs) {
+            ZkWriteCommand command = new SliceMutator(ocmh.cloudManager).addReplica(clusterState, props);
+            byte[] data = Utils.toJSON(Collections.singletonMap(collectionName, command.collection));
+//        log.info("collection updated : {}", new String(data, StandardCharsets.UTF_8));
+            ocmh.zkStateReader.getZkClient().setData(collectionPath, data, true);
+            clusterState = clusterState.copyWith(collectionName, command.collection);
+            newColl = command.collection;
+          } else {
+            ocmh.overseer.offerStateUpdate(Utils.toJSON(props));
+          }
         }
 
         // Need to create new params for each request
@@ -300,6 +327,23 @@ public class CreateCollectionCmd implements OverseerCollectionMessageHandler.Cmd
       shardRequestTracker.processResponses(results, shardHandler, false, null, Collections.emptySet());
       @SuppressWarnings({"rawtypes"})
       boolean failure = results.get("failure") != null && ((SimpleOrderedMap)results.get("failure")).size() > 0;
+      if(isPrs) {
+        TimeOut timeout = new TimeOut(Integer.getInteger("solr.waitToSeeReplicasInStateTimeoutSeconds", 120), TimeUnit.SECONDS, timeSource); // could be a big cluster
+        PerReplicaStates prs = PerReplicaStates.fetch(collectionPath, ocmh.zkStateReader.getZkClient(), null);
+        while (!timeout.hasTimedOut()) {
+          if(prs.allActive()) break;
+          Thread.sleep(100);
+          prs = PerReplicaStates.fetch(collectionPath, ocmh.zkStateReader.getZkClient(), null);
+        }
+        if (prs.allActive()) {
+          // we have successfully found all replicas to be ACTIVE
+          // Now ask Overseer to fetch the latest state of collection
+          // from ZK
+          ocmh.overseer.submit(new RefreshCollectionMessage(collectionName));
+        } else {
+          failure = true;
+        }
+      }
       if (failure) {
         // Let's cleanup as we hit an exception
         // We shouldn't be passing 'results' here for the cleanup as the response would then contain 'success'
diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/PerReplicaStates.java b/solr/solrj/src/java/org/apache/solr/common/cloud/PerReplicaStates.java
index af4bb43..be40066 100644
--- a/solr/solrj/src/java/org/apache/solr/common/cloud/PerReplicaStates.java
+++ b/solr/solrj/src/java/org/apache/solr/common/cloud/PerReplicaStates.java
@@ -67,6 +67,8 @@ public class PerReplicaStates implements ReflectMapWriter {
   @JsonProperty
   public final SimpleMap<State> states;
 
+  private volatile Boolean allActive;
+
   /**
    * Construct with data read from ZK
    * @param path path from where this is loaded
@@ -92,6 +94,17 @@ public class PerReplicaStates implements ReflectMapWriter {
 
   }
 
+  /** Check and return if all replicas are ACTIVE
+   */
+  public boolean allActive() {
+    if (this.allActive != null) return allActive;
+    boolean[] result = new boolean[]{true};
+    states.forEachEntry((r, s) -> {
+      if (s.state != Replica.State.ACTIVE) result[0] = false;
+    });
+    return this.allActive = result[0];
+  }
+
   /**Get the changed replicas
    */
   public static Set<String> findModifiedReplicas(PerReplicaStates old, PerReplicaStates fresh) {