You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by no...@apache.org on 2021/02/15 05:53:42 UTC

[lucene-solr] branch jira/solr15138_2 created (now b1ce5a3)

This is an automated email from the ASF dual-hosted git repository.

noble pushed a change to branch jira/solr15138_2
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git.


      at b1ce5a3  fixing the shardsplit tests

This branch includes the following new commits:

     new b1ce5a3  fixing the shardsplit tests

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[lucene-solr] 01/01: fixing the shardsplit tests

Posted by no...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

noble pushed a commit to branch jira/solr15138_2
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git

commit b1ce5a335556d8bc303a4301e27cedf4c7f5bc3a
Author: noblepaul <no...@gmail.com>
AuthorDate: Mon Feb 15 16:53:00 2021 +1100

    fixing the shardsplit tests
---
 .../src/java/org/apache/solr/cloud/Overseer.java   | 22 +++++
 .../solr/cloud/RefreshCollectionMessage.java       | 51 +++++++++++
 .../cloud/api/collections/CreateCollectionCmd.java | 98 +++++++++++++++++-----
 .../apache/solr/common/cloud/PerReplicaStates.java | 14 ++++
 .../solr/common/cloud/PerReplicaStatesOps.java     | 18 ++--
 5 files changed, 175 insertions(+), 28 deletions(-)

diff --git a/solr/core/src/java/org/apache/solr/cloud/Overseer.java b/solr/core/src/java/org/apache/solr/cloud/Overseer.java
index 3f52f74..ff23ddd 100644
--- a/solr/core/src/java/org/apache/solr/cloud/Overseer.java
+++ b/solr/core/src/java/org/apache/solr/cloud/Overseer.java
@@ -29,6 +29,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
+import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.function.BiConsumer;
 
 import org.apache.lucene.util.Version;
@@ -95,6 +96,7 @@ public class Overseer implements SolrCloseable {
 
   public static final int NUM_RESPONSES_TO_STORE = 10000;
   public static final String OVERSEER_ELECT = "/overseer_elect";
+  private final CopyOnWriteArrayList<Message> unprocessedMessages = new CopyOnWriteArrayList<>();
 
   private SolrMetricsContext solrMetricsContext;
   private volatile String metricTag = SolrMetricProducer.getUniqueMetricTag(this, null);
@@ -260,6 +262,13 @@ public class Overseer implements SolrCloseable {
 
                 processedNodes.add(head.first());
                 fallbackQueueSize = processedNodes.size();
+                // force flush to ZK after each message because there is no fallback if workQueue items
+                // are removed from workQueue but fail to be written to ZK
+                while (unprocessedMessages.size() > 0) {
+                  clusterState = zkStateWriter.writePendingUpdates();
+                  Message m = unprocessedMessages.remove(0);
+                  clusterState = m.run(clusterState, Overseer.this);
+                }
                 // The callback always be called on this thread
                 clusterState = processQueueItem(message, clusterState, zkStateWriter, true, () -> {
                   stateUpdateQueue.remove(processedNodes);
@@ -1018,4 +1027,17 @@ public class Overseer implements SolrCloseable {
     getStateUpdateQueue().offer(data);
   }
 
+  /**
+   * Submit an intra-process message which will be picked up and executed when {@link ClusterStateUpdater}'s
+   * loop runs next time
+   */
+  public void submit(Message message) {
+    unprocessedMessages.add(message);
+  }
+
+  public interface Message {
+    ClusterState run(ClusterState clusterState, Overseer overseer) throws Exception;
+
+  }
+
 }
diff --git a/solr/core/src/java/org/apache/solr/cloud/RefreshCollectionMessage.java b/solr/core/src/java/org/apache/solr/cloud/RefreshCollectionMessage.java
new file mode 100644
index 0000000..2f221af
--- /dev/null
+++ b/solr/core/src/java/org/apache/solr/cloud/RefreshCollectionMessage.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.cloud;
+
+import org.apache.solr.common.cloud.ClusterState;
+import org.apache.solr.common.cloud.DocCollection;
+import org.apache.solr.common.cloud.ZkStateReader;
+import org.apache.zookeeper.data.Stat;
+
+/**
+ * Refresh the Cluster State for a given collection
+ *
+ */
+public class RefreshCollectionMessage implements Overseer.Message {
+  public final String collection;
+
+  public RefreshCollectionMessage(String collection) {
+    this.collection = collection;
+  }
+
+  public ClusterState run(ClusterState clusterState, Overseer overseer) throws Exception {
+    Stat stat = overseer.getZkStateReader().getZkClient().exists(ZkStateReader.getCollectionPath(collection), null, true);
+    if (stat == null) {
+      //collection does not exist
+      return clusterState.copyWith(collection, null);
+    }
+    DocCollection coll = clusterState.getCollectionOrNull(collection);
+    if (coll != null && !coll.isModified(stat.getVersion(), stat.getCversion())) {
+      //our state is up to date
+      return clusterState;
+    } else {
+      coll = ZkStateReader.getCollectionLive(overseer.getZkStateReader(), collection);
+      return clusterState.copyWith(collection, coll);
+    }
+  }
+}
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateCollectionCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateCollectionCmd.java
index a9299cc..744adf1 100644
--- a/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateCollectionCmd.java
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateCollectionCmd.java
@@ -29,6 +29,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.NoSuchElementException;
 import java.util.Properties;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicReference;
@@ -41,9 +42,12 @@ import org.apache.solr.client.solrj.cloud.autoscaling.NotEmptyException;
 import org.apache.solr.client.solrj.cloud.autoscaling.PolicyHelper;
 import org.apache.solr.client.solrj.cloud.autoscaling.VersionedData;
 import org.apache.solr.cloud.Overseer;
+import org.apache.solr.cloud.RefreshCollectionMessage;
 import org.apache.solr.cloud.ZkController;
 import org.apache.solr.cloud.api.collections.OverseerCollectionMessageHandler.ShardRequestTracker;
 import org.apache.solr.cloud.overseer.ClusterStateMutator;
+import org.apache.solr.cloud.overseer.SliceMutator;
+import org.apache.solr.cloud.overseer.ZkWriteCommand;
 import org.apache.solr.common.SolrException;
 import org.apache.solr.common.SolrException.ErrorCode;
 import org.apache.solr.common.cloud.Aliases;
@@ -51,6 +55,7 @@ import org.apache.solr.common.cloud.ClusterState;
 import org.apache.solr.common.cloud.DocCollection;
 import org.apache.solr.common.cloud.DocRouter;
 import org.apache.solr.common.cloud.ImplicitDocRouter;
+import org.apache.solr.common.cloud.PerReplicaStates;
 import org.apache.solr.common.cloud.Replica;
 import org.apache.solr.common.cloud.ReplicaPosition;
 import org.apache.solr.common.cloud.ZkConfigManager;
@@ -114,6 +119,7 @@ public class CreateCollectionCmd implements OverseerCollectionMessageHandler.Cmd
     final boolean waitForFinalState = message.getBool(WAIT_FOR_FINAL_STATE, false);
     final String alias = message.getStr(ALIAS, collectionName);
     log.info("Create collection {}", collectionName);
+    final boolean isPRS = message.getBool(DocCollection.PER_REPLICA_STATE, false);
     if (clusterState.hasCollection(collectionName)) {
       throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "collection already exists: " + collectionName);
     }
@@ -148,6 +154,8 @@ public class CreateCollectionCmd implements OverseerCollectionMessageHandler.Cmd
     // fail fast if parameters are wrong or incomplete
     List<String> shardNames = populateShardNames(message, router);
     checkReplicaTypes(message);
+    DocCollection newColl = null;
+    final String collectionPath = ZkStateReader.getCollectionPath(collectionName);
 
     AtomicReference<PolicyHelper.SessionWrapper> sessionWrapper = new AtomicReference<>();
 
@@ -170,27 +178,42 @@ public class CreateCollectionCmd implements OverseerCollectionMessageHandler.Cmd
       }
 
       createCollectionZkNode(stateManager, collectionName, collectionParams);
-      
-      ocmh.overseer.offerStateUpdate(Utils.toJSON(message));
-
-      // wait for a while until we see the collection
-      TimeOut waitUntil = new TimeOut(30, TimeUnit.SECONDS, timeSource);
-      boolean created = false;
-      while (! waitUntil.hasTimedOut()) {
-        waitUntil.sleep(100);
-        created = ocmh.cloudManager.getClusterStateProvider().getClusterState().hasCollection(collectionName);
-        if (created) break;
-      }
-      if (!created) {
-        throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Could not fully create collection: " + collectionName);
+
+      if (isPRS) {
+        // In case of a PRS collection, create the collection structure directly instead of resubmitting
+        // to the overseer queue.
+        // TODO: Consider doing this for all collections, not just the PRS collections.
+        ZkWriteCommand command = new ClusterStateMutator(ocmh.cloudManager).createCollection(clusterState, message);
+        byte[] data = Utils.toJSON(Collections.singletonMap(collectionName, command.collection));
+        ocmh.zkStateReader.getZkClient().create(collectionPath, data, CreateMode.PERSISTENT, true);
+        clusterState = clusterState.copyWith(collectionName, command.collection);
+        newColl = command.collection;
+        ocmh.overseer.submit(new RefreshCollectionMessage(collectionName));
+      } else {
+        ocmh.overseer.offerStateUpdate(Utils.toJSON(message));
+
+        // wait for a while until we see the collection
+        TimeOut waitUntil = new TimeOut(30, TimeUnit.SECONDS, timeSource);
+        boolean created = false;
+        while (!waitUntil.hasTimedOut()) {
+          waitUntil.sleep(100);
+          created = ocmh.cloudManager.getClusterStateProvider().getClusterState().hasCollection(collectionName);
+          if (created) break;
+        }
+        if (!created) {
+          throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Could not fully create collection: " + collectionName);
+        }
+
+        // refresh cluster state
+        clusterState = ocmh.cloudManager.getClusterStateProvider().getClusterState();
+        newColl = clusterState.getCollection(collectionName);
+
       }
 
-      // refresh cluster state
-      clusterState = ocmh.cloudManager.getClusterStateProvider().getClusterState();
 
       List<ReplicaPosition> replicaPositions = null;
       try {
-        replicaPositions = buildReplicaPositions(ocmh.cloudManager, clusterState, clusterState.getCollection(collectionName), message, shardNames, sessionWrapper);
+        replicaPositions = buildReplicaPositions(ocmh.cloudManager, clusterState, newColl, message, shardNames, sessionWrapper);
       } catch (Assign.AssignmentException e) {
         ZkNodeProps deleteMessage = new ZkNodeProps("name", collectionName);
         new DeleteCollectionCmd(ocmh).call(clusterState, deleteMessage, results);
@@ -230,7 +253,7 @@ public class CreateCollectionCmd implements OverseerCollectionMessageHandler.Cmd
         }
 
         String coreName = Assign.buildSolrCoreName(ocmh.cloudManager.getDistribStateManager(),
-            ocmh.cloudManager.getClusterStateProvider().getClusterState().getCollection(collectionName),
+            newColl,
             replicaPosition.shard, replicaPosition.type, true);
         if (log.isDebugEnabled()) {
           log.debug(formatString("Creating core {0} as part of shard {1} of collection {2} on {3}"
@@ -250,7 +273,19 @@ public class CreateCollectionCmd implements OverseerCollectionMessageHandler.Cmd
               ZkStateReader.NODE_NAME_PROP, nodeName,
               ZkStateReader.REPLICA_TYPE, replicaPosition.type.name(),
               CommonAdminParams.WAIT_FOR_FINAL_STATE, Boolean.toString(waitForFinalState));
-          ocmh.overseer.offerStateUpdate(Utils.toJSON(props));
+          if (isPRS) {
+            // In case of a PRS collection, execute the ADDREPLICA directly instead of resubmitting
+            // to the overseer queue.
+            // TODO: Consider doing this for all collections, not just the PRS collections.
+            ZkWriteCommand command = new SliceMutator(ocmh.cloudManager).addReplica(clusterState, props);
+            byte[] data = Utils.toJSON(Collections.singletonMap(collectionName, command.collection));
+            ocmh.zkStateReader.getZkClient().setData(collectionPath, data, true);
+            clusterState = clusterState.copyWith(collectionName, command.collection);
+            newColl = command.collection;
+            ocmh.overseer.submit(new RefreshCollectionMessage(collectionName));
+          } else {
+            ocmh.overseer.offerStateUpdate(Utils.toJSON(props));
+          }
         }
 
         // Need to create new params for each request
@@ -289,7 +324,16 @@ public class CreateCollectionCmd implements OverseerCollectionMessageHandler.Cmd
 
       if(!isLegacyCloud) {
         // wait for all replica entries to be created
-        Map<String, Replica> replicas = ocmh.waitToSeeReplicasInState(collectionName, coresToCreate.keySet());
+        Map<String, Replica> replicas ;
+        if (isPRS) {
+          replicas = new ConcurrentHashMap<>();
+          newColl.getSlices().stream().flatMap(slice -> slice.getReplicas().stream())
+              .filter(r -> coresToCreate.containsKey(r.getCoreName()))       // Only the elements that were asked for...
+              .forEach(r -> replicas.putIfAbsent(r.getCoreName(), r)); // ...get added to the map
+        } else {
+          replicas = ocmh.waitToSeeReplicasInState(collectionName, coresToCreate.keySet());
+        }
+
         for (Map.Entry<String, ShardRequest> e : coresToCreate.entrySet()) {
           ShardRequest sreq = e.getValue();
           sreq.params.set(CoreAdminParams.CORE_NODE_NAME, replicas.get(e.getKey()).getName());
@@ -300,6 +344,22 @@ public class CreateCollectionCmd implements OverseerCollectionMessageHandler.Cmd
       shardRequestTracker.processResponses(results, shardHandler, false, null, Collections.emptySet());
       @SuppressWarnings({"rawtypes"})
       boolean failure = results.get("failure") != null && ((SimpleOrderedMap)results.get("failure")).size() > 0;
+      if (isPRS) {
+        TimeOut timeout = new TimeOut(Integer.getInteger("solr.waitToSeeReplicasInStateTimeoutSeconds", 120), TimeUnit.SECONDS, timeSource); // could be a big cluster
+        PerReplicaStates prs = PerReplicaStates.fetch(collectionPath, ocmh.zkStateReader.getZkClient(), null);
+        while (!timeout.hasTimedOut()) {
+          if(prs.allActive()) break;
+          Thread.sleep(100);
+          prs = PerReplicaStates.fetch(collectionPath, ocmh.zkStateReader.getZkClient(), null);
+        }
+        if (!prs.allActive()) {
+          failure = true;
+        }  // we have successfully found all replicas to be ACTIVE
+
+        // Now ask Overseer to fetch the latest state of collection
+        // from ZK
+        ocmh.overseer.submit(new RefreshCollectionMessage(collectionName));
+      }
       if (failure) {
         // Let's cleanup as we hit an exception
         // We shouldn't be passing 'results' here for the cleanup as the response would then contain 'success'
diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/PerReplicaStates.java b/solr/solrj/src/java/org/apache/solr/common/cloud/PerReplicaStates.java
index af4bb43..975f654 100644
--- a/solr/solrj/src/java/org/apache/solr/common/cloud/PerReplicaStates.java
+++ b/solr/solrj/src/java/org/apache/solr/common/cloud/PerReplicaStates.java
@@ -27,6 +27,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.function.BiConsumer;
 
 import org.apache.solr.cluster.api.SimpleMap;
@@ -67,6 +68,8 @@ public class PerReplicaStates implements ReflectMapWriter {
   @JsonProperty
   public final SimpleMap<State> states;
 
+  private Boolean allActive;
+
   /**
    * Construct with data read from ZK
    * @param path path from where this is loaded
@@ -92,6 +95,17 @@ public class PerReplicaStates implements ReflectMapWriter {
 
   }
 
+  /** Check and return if all replicas are ACTIVE
+   */
+  public boolean allActive() {
+    if (this.allActive != null) return allActive;
+    AtomicBoolean result = new AtomicBoolean(true);
+    states.forEachEntry((r, s) -> {
+      if (s.state != Replica.State.ACTIVE) result.set(false);
+    });
+    return this.allActive = result.get();
+  }
+
   /**Get the changed replicas
    */
   public static Set<String> findModifiedReplicas(PerReplicaStates old, PerReplicaStates fresh) {
diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/PerReplicaStatesOps.java b/solr/solrj/src/java/org/apache/solr/common/cloud/PerReplicaStatesOps.java
index 1bf0ecc..3d3a184 100644
--- a/solr/solrj/src/java/org/apache/solr/common/cloud/PerReplicaStatesOps.java
+++ b/solr/solrj/src/java/org/apache/solr/common/cloud/PerReplicaStatesOps.java
@@ -117,7 +117,7 @@ public class PerReplicaStatesOps {
   public static PerReplicaStatesOps flipState(String replica, Replica.State newState, PerReplicaStates rs) {
     return new PerReplicaStatesOps(prs -> {
       List<PerReplicaStates.Operation> operations = new ArrayList<>(2);
-      PerReplicaStates.State existing = rs.get(replica);
+      PerReplicaStates.State existing = prs.get(replica);
       if (existing == null) {
         operations.add(new PerReplicaStates.Operation(PerReplicaStates.Operation.Type.ADD, new PerReplicaStates.State(replica, newState, Boolean.FALSE, 0)));
       } else {
@@ -125,7 +125,7 @@ public class PerReplicaStatesOps {
         addDeleteStaleNodes(operations, existing);
       }
       if (log.isDebugEnabled()) {
-        log.debug("flipState on {}, {} -> {}, ops :{}", rs.path, replica, newState, operations);
+        log.debug("flipState on {}, {} -> {}, ops :{}", prs.path, replica, newState, operations);
       }
       return operations;
     }).init(rs);
@@ -161,7 +161,7 @@ public class PerReplicaStatesOps {
     return new PerReplicaStatesOps(prs -> {
       List<PerReplicaStates.Operation> ops = new ArrayList<>();
       if (next != null) {
-        PerReplicaStates.State st = rs.get(next);
+        PerReplicaStates.State st = prs.get(next);
         if (st != null) {
           if (!st.isLeader) {
             ops.add(new PerReplicaStates.Operation(PerReplicaStates.Operation.Type.ADD, new PerReplicaStates.State(st.replica, Replica.State.ACTIVE, Boolean.TRUE, st.version + 1)));
@@ -177,7 +177,7 @@ public class PerReplicaStatesOps {
 
       // now go through all other replicas and unset previous leader
       for (String r : allReplicas) {
-        PerReplicaStates.State st = rs.get(r);
+        PerReplicaStates.State st = prs.get(r);
         if (st == null) continue;//unlikely
         if (!Objects.equals(r, next)) {
           if (st.isLeader) {
@@ -188,7 +188,7 @@ public class PerReplicaStatesOps {
         }
       }
       if (log.isDebugEnabled()) {
-        log.debug("flipLeader on:{}, {} -> {}, ops: {}", rs.path, allReplicas, next, ops);
+        log.debug("flipLeader on:{}, {} -> {}, ops: {}", prs.path, allReplicas, next, ops);
       }
       return ops;
     }).init(rs);
@@ -202,10 +202,10 @@ public class PerReplicaStatesOps {
   public static PerReplicaStatesOps deleteReplica(String replica, PerReplicaStates rs) {
     return new PerReplicaStatesOps(prs -> {
       List<PerReplicaStates.Operation> result;
-      if (rs == null) {
+      if (prs == null) {
         result = Collections.emptyList();
       } else {
-        PerReplicaStates.State state = rs.get(replica);
+        PerReplicaStates.State state = prs.get(replica);
         result = addDeleteStaleNodes(new ArrayList<>(), state);
       }
       return result;
@@ -224,7 +224,7 @@ public class PerReplicaStatesOps {
     return new PerReplicaStatesOps(prs -> {
       List<PerReplicaStates.Operation> operations = new ArrayList<>();
       for (String replica : replicas) {
-        PerReplicaStates.State r = rs.get(replica);
+        PerReplicaStates.State r = prs.get(replica);
         if (r != null) {
           if (r.state == Replica.State.DOWN && !r.isLeader) continue;
           operations.add(new PerReplicaStates.Operation(PerReplicaStates.Operation.Type.ADD, new PerReplicaStates.State(replica, Replica.State.DOWN, Boolean.FALSE, r.version + 1)));
@@ -234,7 +234,7 @@ public class PerReplicaStatesOps {
         }
       }
       if (log.isDebugEnabled()) {
-        log.debug("for coll: {} down replicas {}, ops {}", rs, replicas, operations);
+        log.debug("for coll: {} down replicas {}, ops {}", prs, replicas, operations);
       }
       return operations;
     }).init(rs);