You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by il...@apache.org on 2021/02/13 01:42:35 UTC
[lucene-solr] branch master updated: SOLR-14928: allow cluster
state updates to be done in a distributed way and not through Overseer
(#2364)
This is an automated email from the ASF dual-hosted git repository.
ilan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git
The following commit(s) were added to refs/heads/master by this push:
new 23755dd SOLR-14928: allow cluster state updates to be done in a distributed way and not through Overseer (#2364)
23755dd is described below
commit 23755ddfdd36a9613010cb9e6201127df55be744
Author: Ilan Ginzburg <il...@gmail.com>
AuthorDate: Sat Feb 13 02:42:18 2021 +0100
SOLR-14928: allow cluster state updates to be done in a distributed way and not through Overseer (#2364)
---
solr/CHANGES.txt | 2 +
.../solr/cloud/DistributedClusterStateUpdater.java | 822 +++++++++++++++++++++
.../src/java/org/apache/solr/cloud/Overseer.java | 61 +-
.../apache/solr/cloud/OverseerNodePrioritizer.java | 18 +-
.../solr/cloud/RefreshCollectionMessage.java | 3 +-
.../solr/cloud/ShardLeaderElectionContext.java | 15 +-
.../solr/cloud/ShardLeaderElectionContextBase.java | 9 +-
.../java/org/apache/solr/cloud/ZkController.java | 82 +-
.../solr/cloud/api/collections/AddReplicaCmd.java | 18 +-
.../solr/cloud/api/collections/AliasCmd.java | 2 +-
.../cloud/api/collections/CreateCollectionCmd.java | 70 +-
.../solr/cloud/api/collections/CreateShardCmd.java | 16 +-
.../cloud/api/collections/DeleteCollectionCmd.java | 11 +-
.../solr/cloud/api/collections/DeleteShardCmd.java | 22 +-
.../solr/cloud/api/collections/MigrateCmd.java | 8 +-
.../OverseerCollectionMessageHandler.java | 46 +-
.../cloud/api/collections/OverseerStatusCmd.java | 10 +
.../api/collections/ReindexCollectionCmd.java | 29 +-
.../solr/cloud/api/collections/RestoreCmd.java | 52 +-
.../solr/cloud/api/collections/SplitShardCmd.java | 62 +-
.../solr/cloud/overseer/CollectionMutator.java | 23 +-
.../apache/solr/cloud/overseer/NodeMutator.java | 90 ++-
.../apache/solr/cloud/overseer/SliceMutator.java | 2 +-
.../apache/solr/cloud/overseer/ZkStateWriter.java | 31 +-
.../apache/solr/cloud/overseer/ZkWriteCommand.java | 43 +-
.../src/java/org/apache/solr/core/CloudConfig.java | 17 +-
.../java/org/apache/solr/core/SolrXmlConfig.java | 3 +
.../solr/handler/admin/CollectionsHandler.java | 32 +-
.../processor/DistributedZkUpdateProcessor.java | 12 +-
.../solr/cloud/CreateCollectionCleanupTest.java | 2 +
.../org/apache/solr/cloud/DeleteReplicaTest.java | 34 +-
.../org/apache/solr/cloud/DeleteShardTest.java | 13 +-
.../test/org/apache/solr/cloud/MockSolrSource.java | 15 +-
.../OverseerCollectionConfigSetProcessorTest.java | 266 +++++--
.../org/apache/solr/cloud/OverseerRolesTest.java | 7 +-
.../org/apache/solr/cloud/OverseerStatusTest.java | 35 +-
.../test/org/apache/solr/cloud/OverseerTest.java | 43 +-
.../solr/cloud/TestRandomRequestDistribution.java | 11 +-
.../solr/cloud/TestSkipOverseerOperations.java | 29 +-
.../org/apache/solr/cloud/ZkControllerTest.java | 21 +-
solr/server/solr/solr.xml | 1 +
solr/solr-ref-guide/src/format-of-solr-xml.adoc | 4 +
.../solr/common/cloud/PerReplicaStatesOps.java | 9 -
.../apache/solr/common/cloud/ZkStateReader.java | 10 +-
.../apache/solr/cloud/MiniSolrCloudCluster.java | 1 +
.../org/apache/solr/cloud/SolrCloudTestCase.java | 54 ++
46 files changed, 1790 insertions(+), 376 deletions(-)
diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt
index 378cb54..c05cd84 100644
--- a/solr/CHANGES.txt
+++ b/solr/CHANGES.txt
@@ -201,6 +201,8 @@ Other Changes
* SOLR-15118: Switch /v2/collections APIs over to the now-preferred annotated-POJO implementation approach (Jason Gerlowski)
+* SOLR-14928: Allow cluster state updates to be done in a distributed fashion without going through Overseer (Ilan Ginzburg)
+
Bug Fixes
---------------------
* SOLR-14546: Fix for a relatively hard to hit issue in OverseerTaskProcessor that could lead to out of order execution
diff --git a/solr/core/src/java/org/apache/solr/cloud/DistributedClusterStateUpdater.java b/solr/core/src/java/org/apache/solr/cloud/DistributedClusterStateUpdater.java
new file mode 100644
index 0000000..f57cc31
--- /dev/null
+++ b/solr/core/src/java/org/apache/solr/cloud/DistributedClusterStateUpdater.java
@@ -0,0 +1,822 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.cloud;
+
+import org.apache.solr.client.solrj.cloud.SolrCloudManager;
+import org.apache.solr.cloud.overseer.*;
+import org.apache.solr.common.SolrException;
+import org.apache.solr.common.cloud.*;
+import org.apache.solr.common.params.CollectionParams;
+import org.apache.solr.common.params.CommonParams;
+import org.apache.solr.common.util.Pair;
+import org.apache.solr.common.util.Utils;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.data.Stat;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.invoke.MethodHandles;
+import java.util.*;
+
+import static java.util.Collections.singletonMap;
+import static org.apache.solr.cloud.overseer.ZkStateWriter.NO_OP;
+import static org.apache.solr.common.cloud.ZkStateReader.COLLECTIONS_ZKNODE;
+import static org.apache.solr.common.params.CollectionParams.CollectionAction.CREATE;
+import static org.apache.solr.common.params.CollectionParams.CollectionAction.DELETE;
+import static org.apache.solr.common.params.CollectionParams.CollectionAction.CREATESHARD;
+import static org.apache.solr.common.params.CollectionParams.CollectionAction.DELETESHARD;
+import static org.apache.solr.common.params.CollectionParams.CollectionAction.ADDREPLICA;
+import static org.apache.solr.common.params.CollectionParams.CollectionAction.ADDREPLICAPROP;
+import static org.apache.solr.common.params.CollectionParams.CollectionAction.DELETEREPLICAPROP;
+import static org.apache.solr.common.params.CollectionParams.CollectionAction.BALANCESHARDUNIQUE;
+import static org.apache.solr.common.params.CollectionParams.CollectionAction.MODIFYCOLLECTION;
+
+/**
+ * Gives access to distributed cluster state update methods and allows code to inquire whether distributed state update is enabled.
+ */
+public class DistributedClusterStateUpdater {
+
+ private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+ /**
+ * When {@code true} each node updates Zookeeper directly for changing state.json files. When {@code false} messages
+ * are instead sent to the Overseer and the update is done there.
+ */
+ private final boolean useDistributedStateUpdate;
+
+ /**
+ * Builds an instance with the specified behavior regarding distribution of state updates, allowing to know distributed
+ * updates are not enabled (parameter {@code useDistributedStateUpdate} is {@code false}), or when they are (parameter
+ * is {@code true)}, gives access to methods and classes allowing the execution of the updates.
+ *
+ * @param useDistributedStateUpdate when this parameter is {@code false}, only method expected to ever be called on this
+ * instance is {@link #isDistributedStateUpdate}, and it will return {@code false}.
+ */
+ public DistributedClusterStateUpdater(boolean useDistributedStateUpdate) {
+ this.useDistributedStateUpdate = useDistributedStateUpdate;
+ if (log.isInfoEnabled()) {
+ log.info("Creating DistributedClusterStateUpdater with useDistributedStateUpdate=" + useDistributedStateUpdate
+ + ". Solr will be using " + (useDistributedStateUpdate ? "distributed" : "Overseer based") + " cluster state updates."); // nowarn
+ }
+ }
+
+ /**
+ * Create a new instance of {@link StateChangeRecorder} for a given collection and a given intention (collection
+ * creation vs. operations on an existing collection)
+ */
+ public StateChangeRecorder createStateChangeRecorder(String collectionName, boolean isCollectionCreation) {
+ if (!useDistributedStateUpdate) {
+ // Seeing this exception or any other of this kind here means there's a big bug in the code. No user input can cause this.
+ throw new IllegalStateException("Not expecting to create instances of StateChangeRecorder when not using distributed state update");
+ }
+ return new StateChangeRecorder(collectionName, isCollectionCreation);
+ }
+
+ /**
+ * Syntactic sugar to allow a single change to the cluster state to be made in a single call.
+ */
+ public void doSingleStateUpdate(MutatingCommand command, ZkNodeProps message,
+ SolrCloudManager scm, ZkStateReader zkStateReader) throws KeeperException, InterruptedException {
+ if (!useDistributedStateUpdate) {
+ throw new IllegalStateException("Not expecting to execute doSingleStateUpdate when not using distributed state update");
+ }
+ String collectionName = command.getCollectionName(message);
+ final StateChangeRecorder scr = new StateChangeRecorder(collectionName, command.isCollectionCreation());
+ scr.record(command, message);
+ scr.executeStateUpdates(scm, zkStateReader);
+ }
+
+ public void executeNodeDownStateUpdate(String nodeName, ZkStateReader zkStateReader) {
+ if (!useDistributedStateUpdate) {
+ throw new IllegalStateException("Not expecting to execute executeNodeDownStateUpdate when not using distributed state update");
+ }
+ CollectionNodeDownChangeCalculator.executeNodeDownStateUpdate(nodeName, zkStateReader);
+ }
+
+ /**
+ * When this method returns {@code false} the legacy behavior of enqueueing cluster state update messages to Overseer
+ * should be used and no other method of this class should be called.
+ */
+ public boolean isDistributedStateUpdate() {
+ return useDistributedStateUpdate;
+ }
+
+ /**
+ * Naming of enum instances are the mutator object name (e.g. {@code Cluster} for {@link ClusterStateMutator} or
+ * {@code Collection} for {@link CollectionMutator}) followed by the method name of the mutator.
+ * For example {@link #SliceAddReplica} represents {@link SliceMutator#addReplica}.
+ * <p>
+ * Even though the various mutator classes do not implement any common interface, luckily their constructors and methods
+ * take the same set of parameters so all can be called from the enum method {@link #buildWriteCommand(SolrCloudManager, ClusterState, ZkNodeProps)}.
+ * <p>
+ * Given that {@link OverseerAction#DOWNNODE} is different (it returns a list of write commands and impacts more than one collection),
+ * it is handled specifically in {@link CollectionNodeDownChangeCalculator#executeNodeDownStateUpdate}.
+ */
+ public enum MutatingCommand {
+ BalanceShardsUnique(BALANCESHARDUNIQUE, ZkStateReader.COLLECTION_PROP) {
+ @Override
+ public ZkWriteCommand buildWriteCommand(SolrCloudManager scm, ClusterState cs, ZkNodeProps message) {
+ ExclusiveSliceProperty dProp = new ExclusiveSliceProperty(cs, message);
+ // Next line is where the actual work is done
+ if (dProp.balanceProperty()) {
+ return new ZkWriteCommand(getCollectionName(message), dProp.getDocCollection());
+ } else {
+ return NO_OP;
+ }
+ }
+ },
+ ClusterCreateCollection(CREATE, CommonParams.NAME) {
+ @Override
+ public ZkWriteCommand buildWriteCommand(SolrCloudManager scm, ClusterState cs, ZkNodeProps message) {
+ return new ClusterStateMutator(scm).createCollection(cs, message);
+ }
+
+ @Override
+ public boolean isCollectionCreation() {
+ return true;
+ }
+ },
+ ClusterDeleteCollection(DELETE, CommonParams.NAME) {
+ @Override
+ public ZkWriteCommand buildWriteCommand(SolrCloudManager scm, ClusterState cs, ZkNodeProps message) {
+ return new ClusterStateMutator(scm).deleteCollection(cs, message);
+ }
+ },
+ CollectionDeleteShard(DELETESHARD, ZkStateReader.COLLECTION_PROP) {
+ @Override
+ public ZkWriteCommand buildWriteCommand(SolrCloudManager scm, ClusterState cs, ZkNodeProps message) {
+ return new CollectionMutator(scm).deleteShard(cs, message);
+ }
+ },
+ CollectionModifyCollection(MODIFYCOLLECTION, ZkStateReader.COLLECTION_PROP) {
+ @Override
+ public ZkWriteCommand buildWriteCommand(SolrCloudManager scm, ClusterState cs, ZkNodeProps message) {
+ return new CollectionMutator(scm).modifyCollection(cs, message);
+ }
+ },
+ CollectionCreateShard(CREATESHARD, ZkStateReader.COLLECTION_PROP) {
+ @Override
+ public ZkWriteCommand buildWriteCommand(SolrCloudManager scm, ClusterState cs, ZkNodeProps message) {
+ return new CollectionMutator(scm).createShard(cs, message);
+ }
+ },
+ ReplicaAddReplicaProperty(ADDREPLICAPROP, ZkStateReader.COLLECTION_PROP) {
+ @Override
+ public ZkWriteCommand buildWriteCommand(SolrCloudManager scm, ClusterState cs, ZkNodeProps message) {
+ return new ReplicaMutator(scm).addReplicaProperty(cs, message);
+ }
+ },
+ ReplicaDeleteReplicaProperty(DELETEREPLICAPROP, ZkStateReader.COLLECTION_PROP) {
+ @Override
+ public ZkWriteCommand buildWriteCommand(SolrCloudManager scm, ClusterState cs, ZkNodeProps message) {
+ return new ReplicaMutator(scm).deleteReplicaProperty(cs, message);
+ }
+ },
+ ReplicaSetState(null, ZkStateReader.COLLECTION_PROP) {
+ @Override
+ public ZkWriteCommand buildWriteCommand(SolrCloudManager scm, ClusterState cs, ZkNodeProps message) {
+ return new ReplicaMutator(scm).setState(cs, message);
+ }
+ },
+ SliceAddReplica(ADDREPLICA, ZkStateReader.COLLECTION_PROP) {
+ @Override
+ public ZkWriteCommand buildWriteCommand(SolrCloudManager scm, ClusterState cs, ZkNodeProps message) {
+ return new SliceMutator(scm).addReplica(cs, message);
+ }
+ },
+ SliceAddRoutingRule(null, ZkStateReader.COLLECTION_PROP) {
+ @Override
+ public ZkWriteCommand buildWriteCommand(SolrCloudManager scm, ClusterState cs, ZkNodeProps message) {
+ return new SliceMutator(scm).addRoutingRule(cs, message);
+ }
+ },
+ SliceRemoveReplica(null, ZkStateReader.COLLECTION_PROP) {
+ @Override
+ public ZkWriteCommand buildWriteCommand(SolrCloudManager scm, ClusterState cs, ZkNodeProps message) {
+ return new SliceMutator(scm).removeReplica(cs, message);
+ }
+ },
+ SliceRemoveRoutingRule(null, ZkStateReader.COLLECTION_PROP) {
+ @Override
+ public ZkWriteCommand buildWriteCommand(SolrCloudManager scm, ClusterState cs, ZkNodeProps message) {
+ return new SliceMutator(scm).removeRoutingRule(cs, message);
+ }
+ },
+ SliceSetShardLeader(null, ZkStateReader.COLLECTION_PROP) {
+ @Override
+ public ZkWriteCommand buildWriteCommand(SolrCloudManager scm, ClusterState cs, ZkNodeProps message) {
+ return new SliceMutator(scm).setShardLeader(cs, message);
+ }
+ },
+ SliceUpdateShardState(null, ZkStateReader.COLLECTION_PROP) {
+ @Override
+ public ZkWriteCommand buildWriteCommand(SolrCloudManager scm, ClusterState cs, ZkNodeProps message) {
+ return new SliceMutator(scm).updateShardState(cs, message);
+ }
+ };
+
+ private static final EnumMap<CollectionParams.CollectionAction, MutatingCommand> actionsToCommands;
+
+ static {
+ actionsToCommands = new EnumMap<>(CollectionParams.CollectionAction.class);
+ for (MutatingCommand mc : MutatingCommand.values()) {
+ if (mc.collectionAction != null) {
+ actionsToCommands.put(mc.collectionAction, mc);
+ }
+ }
+ }
+
+ private final CollectionParams.CollectionAction collectionAction;
+ private final String collectionNameParamName;
+
+ MutatingCommand(CollectionParams.CollectionAction collectionAction, String collectionNameParamName) {
+ this.collectionAction = collectionAction;
+ this.collectionNameParamName = collectionNameParamName;
+ }
+
+ /**
+ * mutating commands that return a single ZkWriteCommand override this method
+ */
+ public abstract ZkWriteCommand buildWriteCommand(SolrCloudManager scm, ClusterState cs, ZkNodeProps message);
+
+ public String getCollectionName(ZkNodeProps message) {
+ return message.getStr(collectionNameParamName);
+ }
+
+ /**
+ * @return the {@link MutatingCommand} corresponding to the passed {@link org.apache.solr.common.params.CollectionParams.CollectionAction} or
+ * {@code null} if no cluster state update command is defined for that action (given that {@link org.apache.solr.common.params.CollectionParams.CollectionAction}
+ * are used for the Collection API and only some are used for the cluster state updates, this is expected).
+ */
+ public static MutatingCommand getCommandFor(CollectionParams.CollectionAction collectionAction) {
+ return actionsToCommands.get(collectionAction);
+ }
+
+ /**
+ * Given only one command creates a collection {@link #ClusterCreateCollection}, the default implementation is provided here.
+ */
+ public boolean isCollectionCreation() {
+ return false;
+ }
+ }
+
+ /**
+ * Instances of this class are the fundamental building block of the CAS (Compare and Swap) update approach. These instances
+ * accept an initial cluster state (as present in Zookeeper basically) and apply to it a set of modifications that are
+ * then attempted to be written back to Zookeeper {@link ZkUpdateApplicator is driving this process}.
+ * If the update fails (due to a concurrent update), the Zookeeper content is read again, the changes (updates) are
+ * applied to it again and a new write attempt is made. This guarantees than an update does not overwrite data just
+ * written by a concurrent update happening from the same or from another node.
+ */
+ interface StateChangeCalculator {
+ String getCollectionName();
+
+ /**
+ * @return {@code true} if this updater is computing updates for creating a collection that does not exist yet.
+ */
+ boolean isCollectionCreation();
+
+ /**
+ * Given an initial {@link ClusterState}, computes after applying updates the cluster state to be written to state.json
+ * (made available through {@link #getUpdatedClusterState()}) as well as the list of per replica operations (made available
+ * through {@link #getPerReplicaStatesOps()}). Any or both of these methods will return {@code null} if there is no
+ * corresponding update to apply.
+ */
+ void computeUpdates(ClusterState currentState);
+
+ /**
+ * Method can only be called after {@link #computeUpdates} has been called.
+ * @return the new state to write into {@code state.json} or {@code null} if no update needed.
+ */
+ ClusterState getUpdatedClusterState();
+
+ /**
+ * Method can only be called after {@link #computeUpdates} has been called.
+ * @return {@code null} when there are no per replica state ops
+ */
+ List<PerReplicaStatesOps> getPerReplicaStatesOps();
+ }
+
+ /**
+ * This class is passed a {@link StateChangeCalculator} targeting a single collection that is able to apply an update to an
+ * initial cluster state and return the updated cluster state. The {@link StateChangeCalculator} is used (possibly multiple times)
+ * to do a Compare And Swap (a.k.a conditional update or CAS) of the collection's {@code state.json} Zookeeper file.<p>
+ *
+ * When there are per replica states to update, they are attempted once (they do their own Compare And Swap), before
+ * the (potentially multiple) attempts to update the {@code state.json} file. This conforms to the strategy in place
+ * when {@code state.json} updates are sent to the Overseer to do. See {@link ZkStateWriter#writePendingUpdates}.
+ */
+ static private class ZkUpdateApplicator {
+ /**
+ * When trying to update a {@code state.json} file that keeps getting changed by concurrent updater, the number of attempts
+ * made before giving up. This is likely way too high, if we get to 50 failed attempts something else went wrong.
+ * To be reconsidered once Collection API commands are distributed as well.
+ */
+ public static final int CAS_MAX_ATTEMPTS = 50;
+
+ private final ZkStateReader zkStateReader;
+ private final StateChangeCalculator updater;
+
+ static void applyUpdate(ZkStateReader zkStateReader, StateChangeCalculator updater) throws KeeperException, InterruptedException {
+ ZkUpdateApplicator zua = new ZkUpdateApplicator(zkStateReader, updater);
+ zua.applyUpdate();
+ }
+
+ private ZkUpdateApplicator(ZkStateReader zkStateReader, StateChangeCalculator updater) {
+ this.zkStateReader = zkStateReader;
+ this.updater = updater;
+ }
+
+ /**
+ * By delegating work to {@link PerReplicaStatesOps} for per replica state updates, and using optimistic locking
+ * (with retries) to directly update the content of {@code state.json}, updates Zookeeper with the changes computed
+ * by the {@link StateChangeCalculator}.
+ */
+ private void applyUpdate() throws KeeperException, InterruptedException {
+ /* Initial slightly naive implementation (later on we should consider some caching between updates...).
+ * For updates:
+ * - Read the state.json file from Zookeeper
+ * - Run the updater to execute the changes on top of that file
+ * - Compare and Swap the file with the new version (fail if something else changed ZK in the meantime)
+ * - Retry a few times all above steps if update is failing.
+ *
+ * For creations:
+ * - Build the state.json file using the updater
+ * - Try to write it to Zookeeper (do not overwrite if it exists)
+ * - Fail (without retries) if write failed.
+ */
+
+ // Note we DO NOT track nor use the live nodes in the cluster state.
+ // That may means the two abstractions (collection metadata vs. nodes) should be separated.
+ // For now trying to diverge as little as possible from existing data structures and code given the need to
+ // support both the old way (Overseer) and new way (distributed) of handling cluster state update.
+ final Set<String> liveNodes = Collections.emptySet();
+
+ // Per Replica States updates are done before all other updates and not subject to the number of attempts of CAS
+ // made here, given they have their own CAS strategy and implementation (see PerReplicaStatesOps.persist()).
+ boolean firstAttempt = true;
+
+ // When there are multiple retries of state.json write and the cluster state gets updated over and over again with
+ // the changes done in the per replica states, we avoid refetching those multiple times.
+ PerReplicaStates fetchedPerReplicaStates = null;
+
+ // Later on (when Collection API commands are distributed) we will have to rely on the version of state.json
+ // to implement the replacement of Collection API locking. Then we should not blindly retry cluster state updates
+ // as we do here but instead intelligently fail (or retry completely) the Collection API call when seeing that
+ // state.json was changed by a concurrent command execution.
+ // The loop below is ok for distributing cluster state updates from Overseer to all nodes while Collection API
+ // commands are still executed on the Overseer and manage their locking the old fashioned way.
+ for (int attempt = 0; attempt < CAS_MAX_ATTEMPTS; attempt++) {
+ // Start by reading the current state.json (if this is an update).
+ // TODO Eventually rethink the way each node manages and caches its copy of the cluster state. Knowing about all collections in the cluster might not be needed.
+ ClusterState initialClusterState;
+ if (updater.isCollectionCreation()) {
+ initialClusterState = new ClusterState(liveNodes, Collections.emptyMap());
+ } else {
+ // Get the state for existing data in ZK (and if no data exists we should fail)
+ initialClusterState = fetchStateForCollection();
+ }
+
+ // Apply the desired changes. Note that the cluster state passed to the chain of mutators is totally up to date
+ // (it's read from ZK just above). So assumptions made in the mutators (like SliceMutator.removeReplica() deleting
+ // the whole collection if it's not found) are ok. Actually in the removeReplica case, the collection will always
+ // exist otherwise the call to fetchStateForCollection() above would have failed.
+ updater.computeUpdates(initialClusterState);
+
+ ClusterState updatedState = updater.getUpdatedClusterState();
+ List<PerReplicaStatesOps> allStatesOps = updater.getPerReplicaStatesOps();
+
+ if (firstAttempt && allStatesOps != null) {
+ // Do the per replica states updates (if any) before the state.json update (if any)
+ firstAttempt = false;
+
+ // The parent node of the per replica state nodes happens to be the node of state.json.
+ String prsParentNode = ZkStateReader.getCollectionPath(updater.getCollectionName());
+
+ for (PerReplicaStatesOps prso : allStatesOps) {
+ prso.persist(prsParentNode, zkStateReader.getZkClient());
+ }
+ }
+
+ if (updatedState == null) {
+ // No update to state.json needed
+ return;
+ }
+
+ // Get the latest version of the collection from the cluster state first.
+ // There is no notion of "cached" here (the boolean passed below) as we the updatedState is based on CollectionRef
+ DocCollection docCollection = updatedState.getCollectionOrNull(updater.getCollectionName(), true);
+
+ // If we did update per replica states and we're also updating state.json, update the content of state.json to reflect
+ // the changes made to replica states. Not strictly necessary (the state source of truth is in per replica states), but nice to have...
+ if (allStatesOps != null) {
+ if (docCollection != null) {
+ // Fetch the per replica states updates done previously or skip fetching if we already have them
+ fetchedPerReplicaStates = PerReplicaStates.fetch(docCollection.getZNode(), zkStateReader.getZkClient(), fetchedPerReplicaStates);
+ // Transpose the per replica states into the cluster state
+ updatedState = updatedState.copyWith(updater.getCollectionName(), docCollection.copyWith(fetchedPerReplicaStates));
+ }
+ }
+
+ try {
+ // Try to do a conditional update (a.k.a. CAS: compare and swap).
+ doStateDotJsonCasUpdate(updatedState);
+ return; // state.json updated successfully.
+ } catch (KeeperException.BadVersionException bve) {
+ if (updater.isCollectionCreation()) {
+ // Not expecting to see this exception when creating new state.json fails, so throwing it up the food chain.
+ throw bve;
+ }
+ }
+ // We've tried to update an existing state.json and got a BadVersionException. We'll try again a few times.
+ // When only two threads compete, no point in waiting: if we lost this time we'll get it next time right away.
+ // But if more threads compete, then waiting a bit (random delay) can improve our chances. The delay should likely
+ // be proportional to the time between reading the cluster state and updating it. We can measure it in the loop above.
+ // With "per replica states" collections, concurrent attempts of even just two threads are expected to be extremely rare.
+ }
+
+ // We made quite a few attempts but failed repeatedly. This is pretty bad but we can't loop trying forever.
+ // Offering a job to the Overseer wouldn't usually fail if the ZK queue can be written to (but the Overseer can then
+ // loop forever attempting the update).
+ // We do want whoever called us to fail right away rather than to wait for a cluster change and timeout because it
+ // didn't happen. Likely need to review call by call what is the appropriate behaviour, especially once Collection
+ // API is distributed (because then the Collection API call will fail if the underlying cluster state update cannot
+ // be done, and that's a desirable thing).
+ throw new KeeperException.BadVersionException(ZkStateReader.getCollectionPath(updater.getCollectionName()));
+ }
+
+ /**
+ * After the computing of the new {@link ClusterState} containing all needed updates to the collection based on what the
+ * {@link StateChangeCalculator} computed, this method does an update in ZK to the collection's {@code state.json}. It is the
+ * equivalent of Overseer's {@link ZkStateWriter#writePendingUpdates} (in its actions related to {@code state.json}
+ * as opposed to the per replica states).
+ * <p>
+ * Note that in a similar way to what happens in {@link ZkStateWriter#writePendingUpdates}, collection delete is handled
+ * as a special case. (see comment on {@link DistributedClusterStateUpdater.StateChangeRecorder.RecordedMutationsPlayer}
+ * on why the code has to be duplicated)<p>
+ *
+ * <b>Note for the future:</b> Given this method is where the actually write to ZK is done, that's the place where we
+ * can rebuild a DocCollection with updated zk version. Eventually if we maintain a cache of recently used collections,
+ * we want to capture the updated collection and put it in the cache to avoid reading it again (unless it changed,
+ * the CAS will fail and we will refresh).<p>
+ *
+ * This could serve as the basis for a strategy where each node does not need any view of all collections in the cluster
+ * but only a cache of recently used collections (possibly not even needing watches on them, but we'll discuss this later).
+ */
+ private void doStateDotJsonCasUpdate(ClusterState updatedState) throws KeeperException, InterruptedException {
+ String jsonPath = ZkStateReader.getCollectionPath(updater.getCollectionName());
+
+ // Collection delete
+ if (!updatedState.hasCollection(updater.getCollectionName())) {
+ // We do not have a collection znode version to test we delete the right version of state.json. But this doesn't really matter:
+ // if we had one, and the delete failed (because state.json got updated in the meantime), we would re-read the collection
+ // state, update our version, run the CAS delete again and it will pass. Which means that one way or another, deletes are final.
+ // I hope nobody deletes a collection then creates a new one with the same name immediately (although the creation should fail
+ // if the znode still exists, so the creation would only succeed after the delete made it, and we're ok).
+ // With Overseer based updates the same behavior can be observed: a collection update is enqueued followed by the
+ // collection delete before the update was executed.
+ log.debug("going to recursively delete state.json at {}", jsonPath);
+ zkStateReader.getZkClient().clean(jsonPath);
+ } else {
+ // Collection update or creation
+ DocCollection collection = updatedState.getCollection(updater.getCollectionName());
+ byte[] stateJson = Utils.toJSON(singletonMap(updater.getCollectionName(), collection));
+
+ if (updater.isCollectionCreation()) {
+ // The state.json file does not exist yet (more precisely it is assumed not to exist)
+ log.debug("going to create collection {}", jsonPath);
+ zkStateReader.getZkClient().create(jsonPath, stateJson, CreateMode.PERSISTENT, true);
+ } else {
+ // We're updating an existing state.json
+ if (log.isDebugEnabled()) {
+ log.debug("going to update collection {} version: {}", jsonPath, collection.getZNodeVersion());
+ }
+ zkStateReader.getZkClient().setData(jsonPath, stateJson, collection.getZNodeVersion(), true);
+ }
+ }
+ }
+
+ /**
+ * Creates a {@link ClusterState} with the state of an existing single collection, with no live nodes information.
+ * Eventually this state should be reused across calls if it is fresh enough... (we have to deal anyway with failures
+ * of conditional updates so trying to use non fresh data is ok, a second attempt will be made)
+ */
+ private ClusterState fetchStateForCollection() throws KeeperException, InterruptedException {
+ String collectionStatePath = ZkStateReader.getCollectionPath(updater.getCollectionName());
+ Stat stat = new Stat();
+ byte[] data = zkStateReader.getZkClient().getData(collectionStatePath, null, stat, true);
+ ClusterState clusterState = ClusterState.createFromJson(stat.getVersion(), data, Collections.emptySet());
+ return clusterState;
+ }
+ }
+
+ /**
+ * Class handling the distributed updates of collection's Zookeeper files {@code state.json} based on multiple updates
+ * applied to a single collection (as is sometimes done by *Cmd classes implementing the Collection API commands).<p>
+ * Previously these updates were sent one by one to Overseer and then grouped by org.apache.solr.cloud.Overseer.ClusterStateUpdater.
+ * <p>
+ * Records desired changes to {@code state.json} files in Zookeeper (as are done by the family of mutator classes such as
+ * {@link org.apache.solr.cloud.overseer.ClusterStateMutator}, {@link org.apache.solr.cloud.overseer.CollectionMutator}
+ * etc.) in order to be able to later execute them on the actual content of the {@code state.json} files using optimistic
+ * locking (and retry a few times if the optimistic locking failed).
+ * <p>
+ * Instances are <b>not</b> thread safe.
+ */
+ public static class StateChangeRecorder {
+ final List<Pair<MutatingCommand, ZkNodeProps>> mutations;
+ /**
+ * The collection name for which are all recorded commands
+ */
+ final String collectionName;
+ /**
+ * {@code true} if recorded commands assume creation of the collection {@code state.json} file.<br>
+ * {@code false} if an existing {@code state.json} is to be updated.<p>
+ * <p>
+ * This variable is used for defensive programming and catching issues. It might be removed once we're done removing and testing
+ * the distribution of the cluster state update updates.
+ */
+ final boolean isCollectionCreation;
+
+ /**
+ * For collection creation recording, there should be only one actual creation (and it should be the first recorded command
+ */
+ boolean creationCommandRecorded = false;
+
+ private StateChangeRecorder(String collectionName, boolean isCollectionCreation) {
+ if (collectionName == null) {
+ final String err = "Internal bug. collectionName=null (isCollectionCreation=" + isCollectionCreation + ")";
+ log.error(err);
+ throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, err);
+ }
+ mutations = new LinkedList<>();
+ this.collectionName = collectionName;
+ this.isCollectionCreation = isCollectionCreation;
+ }
+
+ /**
+ * Records a mutation method and its parameters so that it can be executed later to modify the corresponding Zookeeper state.
+ * Note the message is identical to the one used for communicating with Overseer (at least initially) so it also contains
+ * the action in parameter {@link org.apache.solr.cloud.Overseer#QUEUE_OPERATION}, but that value is ignored here
+ * in favor of the value passed in {@code command}.
+ *
+ * @param message the parameters associated with the command that are kept in the recorded mutations to be played
+ * later. Note that this call usually replaces a call to {@link org.apache.solr.cloud.Overseer#offerStateUpdate(byte[])}
+ * that is passed a <b>copy</b> of the data!<br>
+ * This means that if {@code message} passed in here is reused before the recorded commands are replayed,
+ * things will break! Need to make sure all places calling this method do not reuse the data passed in
+ * (otherwise need to make a copy).
+ */
+ public void record(MutatingCommand command, ZkNodeProps message) {
+ if (isCollectionCreation && !creationCommandRecorded) {
+ // First received command should be collection creation
+ if (!command.isCollectionCreation()) {
+ final String err = "Internal bug. Creation of collection " + collectionName + " unexpected command " + command.name();
+ log.error(err);
+ throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, err);
+ }
+ creationCommandRecorded = true;
+ } else {
+ // If collection creation already received or not expected, should not get (another) one
+ if (command.isCollectionCreation()) {
+ final String err = "Internal bug. Creation of collection " + collectionName + " unexpected command " +
+ command.name() + " (isCollectionCreation=" + isCollectionCreation + ", creationCommandRecorded=" + creationCommandRecorded + ")";
+ log.error(err);
+ throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, err);
+ }
+ }
+
+ if (!collectionName.equals(command.getCollectionName(message))) {
+ // All recorded commands must be for same collection
+ final String err = "Internal bug. State change for collection " + collectionName +
+ " received command " + command + " for collection " + command.getCollectionName(message);
+ log.error(err);
+ throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, err);
+ }
+
+ mutations.add(new Pair<>(command, message));
+ }
+
+ /**
+ * This class allows taking the initial (passed in) cluster state, applying to it cluster mutations and returning the resulting
+ * cluster state.
+ * <p>
+ * It is used to be able to try to apply multiple times a set of changes to cluster state when the Compare And Swap (conditional
+ * update) fails due to concurrent modification.
+ * <p>
+ * For each mutation, a {@link ZkWriteCommand} is first created (capturing how the mutation impacts the cluster state), this is
+ * the equivalent of what the Overseer is doing in ClusterStateUpdater.processMessage().<p>
+ * <p>
+ * Then, a new {@link ClusterState} is built by replacing the existing collection by its new value as computed in the
+ * {@link ZkWriteCommand}. This is done by Overseer in {@link ZkStateWriter#enqueueUpdate} (and {@link ZkStateWriter} is hard
+ * tu reuse because although it contains the logic for doing the update that would be needed here, it is coupled with the
+ * actual instance of {@link ClusterState} being maintained, the stream of updates to be applied to it and applying
+ * the per replica state changes).
+ */
+ private static class RecordedMutationsPlayer implements StateChangeCalculator {
+ private final SolrCloudManager scm;
+ private final String collectionName;
+ private final boolean isCollectionCreation;
+ final List<Pair<MutatingCommand, ZkNodeProps>> mutations;
+
+ // null means no update to state.json needed. Set in computeUpdates()
+ private ClusterState computedState = null;
+
+ // null means no updates needed to the per replica state znodes. Set in computeUpdates()
+ private List<PerReplicaStatesOps> replicaOpsList = null;
+
+ RecordedMutationsPlayer(SolrCloudManager scm, String collectionName, boolean isCollectionCreation, List<Pair<MutatingCommand, ZkNodeProps>> mutations) {
+ this.scm = scm;
+ this.collectionName = collectionName;
+ this.isCollectionCreation = isCollectionCreation;
+ this.mutations = mutations;
+ }
+
+ @Override
+ public String getCollectionName() {
+ return collectionName;
+ }
+
+ @Override
+ public boolean isCollectionCreation() {
+ return isCollectionCreation;
+ }
+
+ @Override
+ public void computeUpdates(ClusterState clusterState) {
+ boolean hasJsonUpdates = false;
+ List<PerReplicaStatesOps> perReplicaStateOps = new LinkedList<>();
+ for (Pair<MutatingCommand, ZkNodeProps> mutation : mutations) {
+ MutatingCommand mutatingCommand = mutation.first();
+ ZkNodeProps message = mutation.second();
+ try {
+ ZkWriteCommand zkcmd = mutatingCommand.buildWriteCommand(scm, clusterState, message);
+ if (zkcmd != ZkStateWriter.NO_OP) {
+ hasJsonUpdates = true;
+ clusterState = clusterState.copyWith(zkcmd.name, zkcmd.collection);
+ }
+ if (zkcmd.ops != null && zkcmd.ops.get() != null) {
+ perReplicaStateOps.add(zkcmd.ops);
+ }
+ } catch (Exception e) {
+ // Seems weird to skip rather than fail, but that's what Overseer is doing (see ClusterStateUpdater.processQueueItem()).
+ // Maybe in the new distributed update world we should make the caller fail? (something Overseer cluster state updater can't do)
+ // To be reconsidered once Collection API commands are distributed because then cluster updates are done synchronously and
+ // have the opportunity to make the Collection API call fail directly.
+ log.error("Distributed cluster state update could not process the current clusterstate state update message, skipping the message: {}", message, e);
+ }
+ }
+
+ computedState = hasJsonUpdates ? clusterState : null;
+ replicaOpsList = perReplicaStateOps.isEmpty() ? null : perReplicaStateOps;
+ }
+
+ @Override
+ public ClusterState getUpdatedClusterState() {
+ return computedState;
+ }
+
+ @Override
+ public List<PerReplicaStatesOps> getPerReplicaStatesOps() {
+ return replicaOpsList;
+ }
+ }
+
+ /**
+ * Using optimistic locking (and retries when needed) updates Zookeeper with the changes previously recorded by calls
+ * to {@link #record(MutatingCommand, ZkNodeProps)}.
+ */
+ public void executeStateUpdates(SolrCloudManager scm, ZkStateReader zkStateReader) throws KeeperException, InterruptedException {
+ if (log.isDebugEnabled()) {
+ log.debug("Executing updates for collection " + collectionName + ", is creation=" + isCollectionCreation + ", " + mutations.size() + " recorded mutations.", new Exception("StackTraceOnly")); // nowarn
+ }
+ if (mutations.isEmpty()) {
+ final String err = "Internal bug. Unexpected empty set of mutations to apply for collection " + collectionName;
+ log.error(err);
+ throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, err);
+ }
+
+ RecordedMutationsPlayer mutationPlayer = new RecordedMutationsPlayer(scm, collectionName, isCollectionCreation, mutations);
+ ZkUpdateApplicator.applyUpdate(zkStateReader, mutationPlayer);
+
+ // TODO update stats here for the various commands executed successfully or not?
+ // This would replace the stats about cluster state updates that the Collection API currently makes available using
+ // the OVERSEERSTATUS command, but obviously would be per node and will not have stats about queues (since there
+ // will be no queues). Would be useful in some tests though, for example TestSkipOverseerOperations.
+ // Probably better to rethink what types of stats are expected from a distributed system rather than trying to present
+ // those previously provided by a central server in the system (the Overseer).
+ }
+ }
+
+ /**
+ * This class handles the changes to be made as a result of a {@link OverseerAction#DOWNNODE} event.<p>
+ *
+ * Instances of this class deal with a single collection. Static method {@link #executeNodeDownStateUpdate} is the entry point
+ * dealing with a node going down and processing all collections.
+ */
+ private static class CollectionNodeDownChangeCalculator implements StateChangeCalculator {
+ private final String collectionName;
+ private final String nodeName;
+
+ // null means no update to state.json needed. Set in computeUpdates()
+ private ClusterState computedState = null;
+
+ // null means no updates needed to the per replica state znodes. Set in computeUpdates()
+ private List<PerReplicaStatesOps> replicaOpsList = null;
+
+ /**
+ * Entry point to mark all replicas of all collections present on a single node as being DOWN (because the node is down)
+ */
+ public static void executeNodeDownStateUpdate(String nodeName, ZkStateReader zkStateReader) {
+ // This code does a version of what NodeMutator.downNode() is doing. We can't assume we have a cache of the collections,
+ // so we're going to read all of them from ZK, fetch the state.json for each and if it has any replicas on the
+ // failed node, do an update (conditional of course) of the state.json
+
+ // For Per Replica States collections there is still a need to read state.json, but the update of state.json is replaced
+ // by a few znode deletions and creations. Might be faster or slower overall, depending on the number of impacted
+ // replicas of such a collection and the total size of that collection's state.json.
+
+ // Note code here also has to duplicate some of the work done in ZkStateReader because ZkStateReader couples reading of
+ // the cluster state and maintaining a cached copy of the cluster state. Something likely to be refactored later (once
+ // Overseer is totally removed and Zookeeper access patterns become clearer).
+
+ log.debug("DownNode state change invoked for node: {}", nodeName);
+
+ try {
+ final List<String> collectionNames = zkStateReader.getZkClient().getChildren(COLLECTIONS_ZKNODE, null, true);
+
+ // Collections are totally independent of each other. Multiple threads could share the load here (need a ZK connection for each though).
+ for (String collectionName : collectionNames) {
+ CollectionNodeDownChangeCalculator collectionUpdater = new CollectionNodeDownChangeCalculator(collectionName, nodeName);
+ ZkUpdateApplicator.applyUpdate(zkStateReader, collectionUpdater);
+ }
+ } catch (Exception e) {
+ if (e instanceof InterruptedException) {
+ Thread.currentThread().interrupt();
+ }
+ // Overseer behavior is to log an error and carry on when a message fails. See Overseer.ClusterStateUpdater.processQueueItem()
+ log.error("Could not successfully process DOWNNODE, giving up", e);
+ }
+ }
+
+ private CollectionNodeDownChangeCalculator(String collectionName, String nodeName) {
+ this.collectionName = collectionName;
+ this.nodeName = nodeName;
+ }
+
+ @Override
+ public String getCollectionName() {
+ return collectionName;
+ }
+
+ @Override
+ public boolean isCollectionCreation() {
+ return false;
+ }
+
+ @Override
+ public void computeUpdates(ClusterState clusterState) {
+ final DocCollection docCollection = clusterState.getCollectionOrNull(collectionName);
+ Optional<ZkWriteCommand> result = docCollection != null ? NodeMutator.computeCollectionUpdate(nodeName, collectionName, docCollection) : Optional.empty();
+
+ if (docCollection == null) {
+ // This is possible but should be rare. Logging warn in case it is seen often and likely a sign of another issue
+ log.warn("Processing DOWNNODE, collection " + collectionName + " disappeared during iteration"); // nowarn
+ }
+
+ if (result.isPresent()) {
+ ZkWriteCommand zkcmd = result.get();
+ computedState = (zkcmd != ZkStateWriter.NO_OP) ? clusterState.copyWith(zkcmd.name, zkcmd.collection) : null;
+ replicaOpsList = (zkcmd.ops != null && zkcmd.ops.get() != null) ? Collections.singletonList(zkcmd.ops) : null;
+ } else {
+ computedState = null;
+ replicaOpsList = null;
+ }
+ }
+
+ @Override
+ public ClusterState getUpdatedClusterState() {
+ return computedState;
+ }
+
+ @Override
+ public List<PerReplicaStatesOps> getPerReplicaStatesOps() {
+ return replicaOpsList;
+ }
+ }
+}
diff --git a/solr/core/src/java/org/apache/solr/cloud/Overseer.java b/solr/core/src/java/org/apache/solr/cloud/Overseer.java
index a3fd760..e131d8c 100644
--- a/solr/core/src/java/org/apache/solr/cloud/Overseer.java
+++ b/solr/core/src/java/org/apache/solr/cloud/Overseer.java
@@ -596,8 +596,6 @@ public class Overseer implements SolrCloseable {
private OverseerThread updaterThread;
- private OverseerThread triggerThread;
-
private final ZkStateReader reader;
private final HttpShardHandler shardHandler;
@@ -616,6 +614,7 @@ public class Overseer implements SolrCloseable {
private volatile boolean systemCollCompatCheck = true;
private CloudConfig config;
+ private final DistributedClusterStateUpdater distributedClusterStateUpdater;
// overseer not responsible for closing reader
public Overseer(HttpShardHandler shardHandler,
@@ -629,6 +628,7 @@ public class Overseer implements SolrCloseable {
this.zkController = zkController;
this.stats = new Stats();
this.config = config;
+ this.distributedClusterStateUpdater = new DistributedClusterStateUpdater(config.getDistributedClusterStateUpdates());
this.solrMetricsContext = new SolrMetricsContext(zkController.getCoreContainer().getMetricManager(), SolrInfoBean.Group.overseer.toString(), metricTag);
}
@@ -650,7 +650,9 @@ public class Overseer implements SolrCloseable {
ThreadGroup ccTg = new ThreadGroup("Overseer collection creation process.");
- OverseerNodePrioritizer overseerPrioritizer = new OverseerNodePrioritizer(reader, getStateUpdateQueue(), adminPath, shardHandler.getShardHandlerFactory());
+ // Below is the only non test usage of the "cluster state update" queue even when distributed cluster state updates are enabled.
+ // That queue is used to tell the Overseer to quit. As long as we have an Overseer, we need to support this.
+ OverseerNodePrioritizer overseerPrioritizer = new OverseerNodePrioritizer(reader, this, adminPath, shardHandler.getShardHandlerFactory());
overseerCollectionConfigSetProcessor = new OverseerCollectionConfigSetProcessor(reader, id, shardHandler, adminPath, stats, Overseer.this, overseerPrioritizer, solrMetricsContext);
ccThread = new OverseerThread(ccTg, overseerCollectionConfigSetProcessor, "OverseerCollectionConfigSetProcessor-" + id);
ccThread.setDaemon(true);
@@ -814,6 +816,10 @@ public class Overseer implements SolrCloseable {
return zkController.getSolrCloudManager();
}
+ public DistributedClusterStateUpdater getDistributedClusterStateUpdater() {
+ return distributedClusterStateUpdater;
+ }
+
/**
* For tests.
*
@@ -824,15 +830,6 @@ public class Overseer implements SolrCloseable {
return updaterThread;
}
- /**
- * For tests.
- * @lucene.internal
- * @return trigger thread
- */
- public synchronized OverseerThread getTriggerThread() {
- return triggerThread;
- }
-
public synchronized void close() {
if (this.id != null) {
log.info("Overseer (id={}) closing", id);
@@ -863,10 +860,6 @@ public class Overseer implements SolrCloseable {
IOUtils.closeQuietly(ccThread);
ccThread.interrupt();
}
- if (triggerThread != null) {
- IOUtils.closeQuietly(triggerThread);
- triggerThread.interrupt();
- }
if (updaterThread != null) {
try {
updaterThread.join();
@@ -877,14 +870,8 @@ public class Overseer implements SolrCloseable {
ccThread.join();
} catch (InterruptedException e) {}
}
- if (triggerThread != null) {
- try {
- triggerThread.join();
- } catch (InterruptedException e) {}
- }
updaterThread = null;
ccThread = null;
- triggerThread = null;
}
/**
@@ -904,6 +891,17 @@ public class Overseer implements SolrCloseable {
* @return a {@link ZkDistributedQueue} object
*/
ZkDistributedQueue getStateUpdateQueue() {
+ if (distributedClusterStateUpdater.isDistributedStateUpdate()) {
+ throw new IllegalStateException("Cluster state is done in a distributed way, should not try to access ZK queue");
+ }
+ return getStateUpdateQueue(new Stats());
+ }
+
+ /**
+ * Separated into its own method from {@link #getStateUpdateQueue()} that does the same thing because this one is legit
+ * to call even when cluster state updates are distributed whereas the other one is not.
+ */
+ ZkDistributedQueue getOverseerQuitNotificationQueue() {
return getStateUpdateQueue(new Stats());
}
@@ -1064,6 +1062,14 @@ public class Overseer implements SolrCloseable {
}
public void offerStateUpdate(byte[] data) throws KeeperException, InterruptedException {
+ // When cluster state update is distributed, the Overseer cluster state update queue should only ever receive QUIT messages.
+ // These go to sendQuitToOverseer for execution path clarity.
+ if (distributedClusterStateUpdater.isDistributedStateUpdate()) {
+ final ZkNodeProps message = ZkNodeProps.load(data);
+ final String operation = message.getStr(QUEUE_OPERATION);
+ log.error("Received unexpected message on Overseer cluster state updater for " + operation + " when distributed updates are configured"); // nowarn
+ throw new RuntimeException("Message " + operation + " offered to state update queue when distributed state update is configured.");
+ }
if (zkController.getZkClient().isClosed()) {
throw new AlreadyClosedException();
}
@@ -1080,7 +1086,16 @@ public class Overseer implements SolrCloseable {
public interface Message {
ClusterState run(ClusterState clusterState, Overseer overseer) throws Exception;
-
}
+ /**
+ * This method enqueues a QUIT message to the overseer of given id.
+ * Effect is similar to building the message then calling {@link #offerStateUpdate} but this method can legitimately be called
+ * when cluster state update is distributed (and Overseer cluster state updater not really used) while {@link #offerStateUpdate} is not.
+ * Note that sending "QUIT" to overseer is not a cluster state update and was likely added to this queue because it was simpler.
+ */
+ public void sendQuitToOverseer(String overseerId) throws KeeperException, InterruptedException {
+ getOverseerQuitNotificationQueue().offer(
+ Utils.toJSON(new ZkNodeProps(Overseer.QUEUE_OPERATION, OverseerAction.QUIT.toLower(), ID, overseerId)));
+ }
}
diff --git a/solr/core/src/java/org/apache/solr/cloud/OverseerNodePrioritizer.java b/solr/core/src/java/org/apache/solr/cloud/OverseerNodePrioritizer.java
index d532f03..a027a05 100644
--- a/solr/core/src/java/org/apache/solr/cloud/OverseerNodePrioritizer.java
+++ b/solr/core/src/java/org/apache/solr/cloud/OverseerNodePrioritizer.java
@@ -20,9 +20,7 @@ import java.lang.invoke.MethodHandles;
import java.util.List;
import java.util.Map;
-import org.apache.solr.cloud.overseer.OverseerAction;
import org.apache.solr.common.cloud.SolrZkClient;
-import org.apache.solr.common.cloud.ZkNodeProps;
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.params.CoreAdminParams;
import org.apache.solr.common.params.CoreAdminParams.CoreAdminAction;
@@ -35,8 +33,6 @@ import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import static org.apache.solr.common.params.CommonParams.ID;
-
/**
* Responsible for prioritization of Overseer nodes, for example with the
* ADDROLE collection command.
@@ -49,13 +45,16 @@ public class OverseerNodePrioritizer {
private final String adminPath;
private final ShardHandlerFactory shardHandlerFactory;
- private ZkDistributedQueue stateUpdateQueue;
+ /**
+ * Only used to send QUIT to the overseer
+ */
+ private final Overseer overseer;
- public OverseerNodePrioritizer(ZkStateReader zkStateReader, ZkDistributedQueue stateUpdateQueue, String adminPath, ShardHandlerFactory shardHandlerFactory) {
+ public OverseerNodePrioritizer(ZkStateReader zkStateReader, Overseer overseer, String adminPath, ShardHandlerFactory shardHandlerFactory) {
this.zkStateReader = zkStateReader;
this.adminPath = adminPath;
this.shardHandlerFactory = shardHandlerFactory;
- this.stateUpdateQueue = stateUpdateQueue;
+ this.overseer = overseer;
}
public synchronized void prioritizeOverseerNodes(String overseerId) throws Exception {
@@ -95,10 +94,7 @@ public class OverseerNodePrioritizer {
invokeOverseerOp(electionNodes.get(1), "rejoin");//ask second inline to go behind
}
//now ask the current leader to QUIT , so that the designate can takeover
- stateUpdateQueue.offer(
- Utils.toJSON(new ZkNodeProps(Overseer.QUEUE_OPERATION, OverseerAction.QUIT.toLower(),
- ID, OverseerTaskProcessor.getLeaderId(zkStateReader.getZkClient()))));
-
+ overseer.sendQuitToOverseer(OverseerTaskProcessor.getLeaderId(zkStateReader.getZkClient()));
}
private void invokeOverseerOp(String electionNode, String op) {
diff --git a/solr/core/src/java/org/apache/solr/cloud/RefreshCollectionMessage.java b/solr/core/src/java/org/apache/solr/cloud/RefreshCollectionMessage.java
index 2f221af..c2053f9 100644
--- a/solr/core/src/java/org/apache/solr/cloud/RefreshCollectionMessage.java
+++ b/solr/core/src/java/org/apache/solr/cloud/RefreshCollectionMessage.java
@@ -24,7 +24,6 @@ import org.apache.zookeeper.data.Stat;
/**
* Refresh the Cluster State for a given collection
- *
*/
public class RefreshCollectionMessage implements Overseer.Message {
public final String collection;
@@ -44,7 +43,7 @@ public class RefreshCollectionMessage implements Overseer.Message {
//our state is up to date
return clusterState;
} else {
- coll = ZkStateReader.getCollectionLive(overseer.getZkStateReader(), collection);
+ coll = overseer.getZkStateReader().getCollectionLive(collection);
return clusterState.copyWith(collection, coll);
}
}
diff --git a/solr/core/src/java/org/apache/solr/cloud/ShardLeaderElectionContext.java b/solr/core/src/java/org/apache/solr/cloud/ShardLeaderElectionContext.java
index 68b062e..9856198 100644
--- a/solr/core/src/java/org/apache/solr/cloud/ShardLeaderElectionContext.java
+++ b/solr/core/src/java/org/apache/solr/cloud/ShardLeaderElectionContext.java
@@ -51,16 +51,17 @@ final class ShardLeaderElectionContext extends ShardLeaderElectionContextBase {
private final CoreContainer cc;
private final SyncStrategy syncStrategy;
+ private final DistributedClusterStateUpdater distributedClusterStateUpdater;
private volatile boolean isClosed = false;
public ShardLeaderElectionContext(LeaderElector leaderElector,
final String shardId, final String collection,
final String coreNodeName, ZkNodeProps props, ZkController zkController, CoreContainer cc) {
- super(leaderElector, shardId, collection, coreNodeName, props,
- zkController);
+ super(leaderElector, shardId, collection, coreNodeName, props, zkController);
this.cc = cc;
- syncStrategy = new SyncStrategy(cc);
+ this.syncStrategy = new SyncStrategy(cc);
+ this.distributedClusterStateUpdater = zkController.getDistributedClusterStateUpdater();
}
@Override
@@ -116,7 +117,13 @@ final class ShardLeaderElectionContext extends ShardLeaderElectionContextBase {
// Clear the leader in clusterstate. We only need to worry about this if there is actually more than one replica.
ZkNodeProps m = new ZkNodeProps(Overseer.QUEUE_OPERATION, OverseerAction.LEADER.toLower(),
ZkStateReader.SHARD_ID_PROP, shardId, ZkStateReader.COLLECTION_PROP, collection);
- zkController.getOverseer().getStateUpdateQueue().offer(Utils.toJSON(m));
+
+ if (distributedClusterStateUpdater.isDistributedStateUpdate()) {
+ distributedClusterStateUpdater.doSingleStateUpdate(DistributedClusterStateUpdater.MutatingCommand.SliceSetShardLeader, m,
+ zkController.getSolrCloudManager(), zkStateReader);
+ } else {
+ zkController.getOverseer().getStateUpdateQueue().offer(Utils.toJSON(m));
+ }
}
if (!weAreReplacement) {
diff --git a/solr/core/src/java/org/apache/solr/cloud/ShardLeaderElectionContextBase.java b/solr/core/src/java/org/apache/solr/cloud/ShardLeaderElectionContextBase.java
index 531294a..2118ad0 100644
--- a/solr/core/src/java/org/apache/solr/cloud/ShardLeaderElectionContextBase.java
+++ b/solr/core/src/java/org/apache/solr/cloud/ShardLeaderElectionContextBase.java
@@ -175,8 +175,13 @@ class ShardLeaderElectionContextBase extends ElectionContext {
assert zkController != null;
assert zkController.getOverseer() != null;
DocCollection coll = zkStateReader.getCollection(this.collection);
- if (coll == null || ZkController.sendToOverseer(coll, id)) {
- zkController.getOverseer().offerStateUpdate(Utils.toJSON(m));
+ if (coll == null || ZkController.updateStateDotJson(coll, id)) {
+ if (zkController.getDistributedClusterStateUpdater().isDistributedStateUpdate()) {
+ zkController.getDistributedClusterStateUpdater().doSingleStateUpdate(DistributedClusterStateUpdater.MutatingCommand.SliceSetShardLeader, m,
+ zkController.getSolrCloudManager(), zkStateReader);
+ } else {
+ zkController.getOverseer().offerStateUpdate(Utils.toJSON(m));
+ }
} else {
PerReplicaStates prs = PerReplicaStates.fetch(coll.getZNode(), zkClient, coll.getPerReplicaStates());
PerReplicaStatesOps.flipLeader(zkStateReader.getClusterState().getCollection(collection).getSlice(shardId).getReplicaNames(), id, prs)
diff --git a/solr/core/src/java/org/apache/solr/cloud/ZkController.java b/solr/core/src/java/org/apache/solr/cloud/ZkController.java
index f5d6928..232b045 100644
--- a/solr/core/src/java/org/apache/solr/cloud/ZkController.java
+++ b/solr/core/src/java/org/apache/solr/cloud/ZkController.java
@@ -196,6 +196,8 @@ public class ZkController implements Closeable {
private final CloudConfig cloudConfig;
private final NodesSysPropsCacher sysPropsCacher;
+ private final DistributedClusterStateUpdater distributedClusterStateUpdater;
+
private LeaderElector overseerElector;
private Map<String, ReplicateFromLeader> replicateFromLeaders = new ConcurrentHashMap<>();
@@ -285,6 +287,9 @@ public class ZkController implements Closeable {
this.cloudConfig = cloudConfig;
+ // Use the configured way to do cluster state update (Overseer queue vs distributed)
+ distributedClusterStateUpdater = new DistributedClusterStateUpdater(cloudConfig.getDistributedClusterStateUpdates());
+
this.genericCoreNodeNames = cloudConfig.getGenericCoreNodeNames();
// be forgiving and strip this off leading/trailing slashes
@@ -464,7 +469,11 @@ public class ZkController implements Closeable {
init();
- this.overseerJobQueue = overseer.getStateUpdateQueue();
+ if (distributedClusterStateUpdater.isDistributedStateUpdate()) {
+ this.overseerJobQueue = null;
+ } else {
+ this.overseerJobQueue = overseer.getStateUpdateQueue();
+ }
this.overseerCollectionQueue = overseer.getCollectionQueue(zkClient);
this.overseerConfigSetQueue = overseer.getConfigSetQueue(zkClient);
this.sysPropsCacher = new NodesSysPropsCacher(getSolrCloudManager().getNodeStateProvider(),
@@ -757,6 +766,10 @@ public class ZkController implements Closeable {
return zkStateReader.getClusterState();
}
+ public DistributedClusterStateUpdater getDistributedClusterStateUpdater() {
+ return distributedClusterStateUpdater;
+ }
+
public SolrCloudManager getSolrCloudManager() {
if (cloudManager != null) {
return cloudManager;
@@ -1533,7 +1546,7 @@ public class ZkController implements Closeable {
String coreNodeName = cd.getCloudDescriptor().getCoreNodeName();
Map<String,Object> props = new HashMap<>();
- props.put(Overseer.QUEUE_OPERATION, "state");
+ props.put(Overseer.QUEUE_OPERATION, OverseerAction.STATE.toLower());
props.put(ZkStateReader.STATE_PROP, state.toString());
props.put(ZkStateReader.CORE_NAME_PROP, cd.getName());
props.put(ZkStateReader.ROLES_PROP, cd.getCloudDescriptor().getRoles());
@@ -1585,8 +1598,13 @@ public class ZkController implements Closeable {
cd.getCloudDescriptor().setLastPublished(state);
}
DocCollection coll = zkStateReader.getCollection(collection);
- if (forcePublish || sendToOverseer(coll, coreNodeName)) {
- overseerJobQueue.offer(Utils.toJSON(m));
+ if (forcePublish || updateStateDotJson(coll, coreNodeName)) {
+ if (distributedClusterStateUpdater.isDistributedStateUpdate()) {
+ distributedClusterStateUpdater.doSingleStateUpdate(DistributedClusterStateUpdater.MutatingCommand.ReplicaSetState, m,
+ getSolrCloudManager(), zkStateReader);
+ } else {
+ overseerJobQueue.offer(Utils.toJSON(m));
+ }
} else {
if (log.isDebugEnabled()) {
log.debug("bypassed overseer for message : {}", Utils.toJSONString(m));
@@ -1601,9 +1619,9 @@ public class ZkController implements Closeable {
}
/**
- * Whether a message needs to be sent to overseer or not
+ * Returns {@code true} if a message needs to be sent to overseer (or done in a distributed way) to update state.json for the collection
*/
- static boolean sendToOverseer(DocCollection coll, String replicaName) {
+ static boolean updateStateDotJson(DocCollection coll, String replicaName) {
if (coll == null) return true;
if (!coll.isPerReplicaState()) return true;
Replica r = coll.getReplica(replicaName);
@@ -1663,22 +1681,20 @@ public class ZkController implements Closeable {
}
CloudDescriptor cloudDescriptor = cd.getCloudDescriptor();
if (removeCoreFromZk) {
- ZkNodeProps m = new ZkNodeProps(Overseer.QUEUE_OPERATION,
- OverseerAction.DELETECORE.toLower(), ZkStateReader.CORE_NAME_PROP, coreName,
+ ZkNodeProps m = new ZkNodeProps(Overseer.QUEUE_OPERATION, OverseerAction.DELETECORE.toLower(),
+ ZkStateReader.CORE_NAME_PROP, coreName,
ZkStateReader.NODE_NAME_PROP, getNodeName(),
ZkStateReader.COLLECTION_PROP, cloudDescriptor.getCollectionName(),
ZkStateReader.CORE_NODE_NAME_PROP, coreNodeName);
- overseerJobQueue.offer(Utils.toJSON(m));
+ if (distributedClusterStateUpdater.isDistributedStateUpdate()) {
+ distributedClusterStateUpdater.doSingleStateUpdate(DistributedClusterStateUpdater.MutatingCommand.SliceRemoveReplica, m,
+ getSolrCloudManager(), zkStateReader);
+ } else {
+ overseerJobQueue.offer(Utils.toJSON(m));
+ }
}
}
- public void createCollection(String collection) throws Exception {
- ZkNodeProps m = new ZkNodeProps(Overseer.QUEUE_OPERATION,
- CollectionParams.CollectionAction.CREATE.toLower(), ZkStateReader.NODE_NAME_PROP, getNodeName(),
- ZkStateReader.COLLECTION_PROP, collection);
- overseerJobQueue.offer(Utils.toJSON(m));
- }
-
public ZkStateReader getZkStateReader() {
return zkStateReader;
}
@@ -2036,6 +2052,9 @@ public class ZkController implements Closeable {
}
public ZkDistributedQueue getOverseerJobQueue() {
+ if (distributedClusterStateUpdater.isDistributedStateUpdate()) {
+ throw new IllegalStateException("Cluster is configured with distributed state update, not expecting the queue to be retrieved");
+ }
return overseerJobQueue;
}
@@ -2632,17 +2651,26 @@ public class ZkController implements Closeable {
*/
public void publishNodeAsDown(String nodeName) {
log.info("Publish node={} as DOWN", nodeName);
- ZkNodeProps m = new ZkNodeProps(Overseer.QUEUE_OPERATION, OverseerAction.DOWNNODE.toLower(),
- ZkStateReader.NODE_NAME_PROP, nodeName);
- try {
- overseer.getStateUpdateQueue().offer(Utils.toJSON(m));
- } catch (AlreadyClosedException e) {
- log.info("Not publishing node as DOWN because a resource required to do so is already closed.");
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- log.debug("Publish node as down was interrupted.");
- } catch (KeeperException e) {
- log.warn("Could not publish node as down: ", e);
+ if (distributedClusterStateUpdater.isDistributedStateUpdate()) {
+ // Note that with the current implementation, when distributed cluster state updates are enabled, we mark the node
+ // down synchronously from this thread, whereas the Overseer cluster state update frees this thread right away and
+ // the Overseer will async mark the node down but updating all affected collections.
+ // If this is an issue (i.e. takes too long), then the call below should be executed from another thread so that
+ // the calling thread can immediately return.
+ distributedClusterStateUpdater.executeNodeDownStateUpdate(nodeName, zkStateReader);
+ } else {
+ ZkNodeProps m = new ZkNodeProps(Overseer.QUEUE_OPERATION, OverseerAction.DOWNNODE.toLower(),
+ ZkStateReader.NODE_NAME_PROP, nodeName);
+ try {
+ overseer.getStateUpdateQueue().offer(Utils.toJSON(m));
+ } catch (AlreadyClosedException e) {
+ log.info("Not publishing node as DOWN because a resource required to do so is already closed.");
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ log.debug("Publish node as down was interrupted.");
+ } catch (KeeperException e) {
+ log.warn("Could not publish node as down: ", e);
+ }
}
}
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/AddReplicaCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/AddReplicaCmd.java
index 6e34b0f..a655b76 100644
--- a/solr/core/src/java/org/apache/solr/cloud/api/collections/AddReplicaCmd.java
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/AddReplicaCmd.java
@@ -43,6 +43,7 @@ import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;
import org.apache.solr.client.solrj.cloud.SolrCloudManager;
import org.apache.solr.cloud.ActiveReplicaWatcher;
+import org.apache.solr.cloud.DistributedClusterStateUpdater;
import org.apache.solr.cloud.Overseer;
import org.apache.solr.cloud.api.collections.OverseerCollectionMessageHandler.ShardRequestTracker;
import org.apache.solr.common.SolrCloseableLatch;
@@ -197,8 +198,6 @@ public class AddReplicaCmd implements OverseerCollectionMessageHandler.Cmd {
}
private ModifiableSolrParams getReplicaParams(ClusterState clusterState, ZkNodeProps message, @SuppressWarnings({"rawtypes"})NamedList results, String collectionName, DocCollection coll, boolean skipCreateReplicaInClusterState, String asyncId, ShardHandler shardHandler, CreateReplica createReplica) throws IOException, InterruptedException, KeeperException {
- ModifiableSolrParams params = new ModifiableSolrParams();
-
ZkStateReader zkStateReader = ocmh.zkStateReader;
if (!skipCreateReplicaInClusterState) {
ZkNodeProps props = new ZkNodeProps(
@@ -212,12 +211,19 @@ public class AddReplicaCmd implements OverseerCollectionMessageHandler.Cmd {
if (createReplica.coreNodeName != null) {
props = props.plus(ZkStateReader.CORE_NODE_NAME_PROP, createReplica.coreNodeName);
}
- try {
- ocmh.overseer.offerStateUpdate(Utils.toJSON(props));
- } catch (Exception e) {
- throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Exception updating Overseer state queue", e);
+ if (ocmh.getDistributedClusterStateUpdater().isDistributedStateUpdate()) {
+ ocmh.getDistributedClusterStateUpdater().doSingleStateUpdate(DistributedClusterStateUpdater.MutatingCommand.SliceAddReplica, props,
+ ocmh.cloudManager, ocmh.zkStateReader);
+ } else {
+ try {
+ ocmh.overseer.offerStateUpdate(Utils.toJSON(props));
+ } catch (Exception e) {
+ throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Exception updating Overseer state queue", e);
+ }
}
}
+
+ ModifiableSolrParams params = new ModifiableSolrParams();
params.set(CoreAdminParams.CORE_NODE_NAME,
ocmh.waitToSeeReplicasInState(collectionName, Collections.singleton(createReplica.coreName)).get(createReplica.coreName).getName());
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/AliasCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/AliasCmd.java
index 611bd2d..fb41150 100644
--- a/solr/core/src/java/org/apache/solr/cloud/api/collections/AliasCmd.java
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/AliasCmd.java
@@ -74,7 +74,7 @@ abstract class AliasCmd implements OverseerCollectionMessageHandler.Cmd {
new LocalSolrQueryRequest(null, createReqParams),
null,
ocmh.overseer.getCoreContainer().getCollectionsHandler());
- createMsgMap.put(Overseer.QUEUE_OPERATION, "create");
+ createMsgMap.put(Overseer.QUEUE_OPERATION, CollectionParams.CollectionAction.CREATE.toLower());
NamedList results = new NamedList();
try {
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateCollectionCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateCollectionCmd.java
index d95501e..c10e9ca 100644
--- a/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateCollectionCmd.java
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateCollectionCmd.java
@@ -38,6 +38,7 @@ import org.apache.solr.client.solrj.cloud.DistribStateManager;
import org.apache.solr.client.solrj.cloud.NotEmptyException;
import org.apache.solr.client.solrj.cloud.SolrCloudManager;
import org.apache.solr.client.solrj.cloud.VersionedData;
+import org.apache.solr.cloud.DistributedClusterStateUpdater;
import org.apache.solr.cloud.Overseer;
import org.apache.solr.cloud.RefreshCollectionMessage;
import org.apache.solr.cloud.ZkController;
@@ -145,17 +146,35 @@ public class CreateCollectionCmd implements OverseerCollectionMessageHandler.Cmd
createCollectionZkNode(stateManager, collectionName, collectionParams);
+ // Note that in code below there are two main execution paths: Overseer based cluster state updates and distributed
+ // cluster state updates (look for isDistributedStateUpdate() conditions).
+ //
+ // PerReplicaStates (PRS) collections follow a hybrid approach. Even when the cluster is Overseer cluster state update based,
+ // these collections are created locally then the cluster state updater is notified (look for usage of RefreshCollectionMessage).
+ // This explains why PRS collections have less diverging execution paths between distributed or Overseer based cluster state updates.
+
if (isPRS) {
// In case of a PRS collection, create the collection structure directly instead of resubmitting
// to the overseer queue.
// TODO: Consider doing this for all collections, not just the PRS collections.
+ // TODO comment above achieved by switching the cluster to distributed state updates
+
+ // This code directly updates Zookeeper by creating the collection state.json. It is compatible with both distributed
+ // cluster state updates and Overseer based cluster state updates.
ZkWriteCommand command = new ClusterStateMutator(ocmh.cloudManager).createCollection(clusterState, message);
byte[] data = Utils.toJSON(Collections.singletonMap(collectionName, command.collection));
ocmh.zkStateReader.getZkClient().create(collectionPath, data, CreateMode.PERSISTENT, true);
clusterState = clusterState.copyWith(collectionName, command.collection);
newColl = command.collection;
} else {
- ocmh.overseer.offerStateUpdate(Utils.toJSON(message));
+ if (ocmh.getDistributedClusterStateUpdater().isDistributedStateUpdate()) {
+ // The message has been crafted by CollectionsHandler.CollectionOperation.CREATE_OP and defines the QUEUE_OPERATION
+ // to be CollectionParams.CollectionAction.CREATE.
+ ocmh.getDistributedClusterStateUpdater().doSingleStateUpdate(DistributedClusterStateUpdater.MutatingCommand.ClusterCreateCollection, message,
+ ocmh.cloudManager, ocmh.zkStateReader);
+ } else {
+ ocmh.overseer.offerStateUpdate(Utils.toJSON(message));
+ }
// wait for a while until we see the collection
TimeOut waitUntil = new TimeOut(30, TimeUnit.SECONDS, timeSource);
@@ -169,14 +188,13 @@ public class CreateCollectionCmd implements OverseerCollectionMessageHandler.Cmd
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Could not fully create collection: " + collectionName);
}
- // refresh cluster state
+ // refresh cluster state (value read below comes from Zookeeper watch firing following the update done previously,
+ // be it by Overseer or by this thread when updates are distributed)
clusterState = ocmh.cloudManager.getClusterStateProvider().getClusterState();
newColl = clusterState.getCollection(collectionName);
-
}
-
- List<ReplicaPosition> replicaPositions = null;
+ final List<ReplicaPosition> replicaPositions;
try {
replicaPositions = buildReplicaPositions(ocmh.overseer.getCoreContainer(), ocmh.cloudManager, clusterState, newColl,
message, shardNames);
@@ -199,6 +217,17 @@ public class CreateCollectionCmd implements OverseerCollectionMessageHandler.Cmd
}
Map<String,ShardRequest> coresToCreate = new LinkedHashMap<>();
ShardHandler shardHandler = ocmh.shardHandlerFactory.getShardHandler();
+ final DistributedClusterStateUpdater.StateChangeRecorder scr;
+
+ // PRS collections update Zookeeper directly, so even if we run in distributed state update,
+ // there's nothing to update in state.json for such collection in the loop over replica positions below.
+ if (!isPRS && ocmh.getDistributedClusterStateUpdater().isDistributedStateUpdate()) {
+ // The collection got created. Now we're adding replicas (and will update ZK only once when done adding).
+ scr = ocmh.getDistributedClusterStateUpdater().createStateChangeRecorder(collectionName, false);;
+ } else {
+ scr = null;
+ }
+
for (ReplicaPosition replicaPosition : replicaPositions) {
String nodeName = replicaPosition.node;
@@ -226,6 +255,8 @@ public class CreateCollectionCmd implements OverseerCollectionMessageHandler.Cmd
// In case of a PRS collection, execute the ADDREPLICA directly instead of resubmitting
// to the overseer queue.
// TODO: Consider doing this for all collections, not just the PRS collections.
+
+ // This PRS specific code is compatible with both Overseer and distributed cluster state update strategies
ZkWriteCommand command = new SliceMutator(ocmh.cloudManager).addReplica(clusterState, props);
byte[] data = Utils.toJSON(Collections.singletonMap(collectionName, command.collection));
// log.info("collection updated : {}", new String(data, StandardCharsets.UTF_8));
@@ -233,7 +264,11 @@ public class CreateCollectionCmd implements OverseerCollectionMessageHandler.Cmd
clusterState = clusterState.copyWith(collectionName, command.collection);
newColl = command.collection;
} else {
- ocmh.overseer.offerStateUpdate(Utils.toJSON(props));
+ if (ocmh.getDistributedClusterStateUpdater().isDistributedStateUpdate()) {
+ scr.record(DistributedClusterStateUpdater.MutatingCommand.SliceAddReplica, props);
+ } else {
+ ocmh.overseer.offerStateUpdate(Utils.toJSON(props));
+ }
}
// Need to create new params for each request
@@ -266,14 +301,20 @@ public class CreateCollectionCmd implements OverseerCollectionMessageHandler.Cmd
coresToCreate.put(coreName, sreq);
}
- // wait for all replica entries to be created
- Map<String, Replica> replicas ;
+ // PRS collections did their own thing and we didn't create a StateChangeRecorder for them
+ if (!isPRS && ocmh.getDistributedClusterStateUpdater().isDistributedStateUpdate()) {
+ // Add the replicas to the collection state (all at once after the loop above)
+ scr.executeStateUpdates(ocmh.cloudManager, ocmh.zkStateReader);
+ }
+
+ final Map<String, Replica> replicas;
if (isPRS) {
replicas = new ConcurrentHashMap<>();
newColl.getSlices().stream().flatMap(slice -> slice.getReplicas().stream())
.filter(r -> coresToCreate.containsKey(r.getCoreName())) // Only the elements that were asked for...
.forEach(r -> replicas.putIfAbsent(r.getCoreName(), r)); // ...get added to the map
} else {
+ // wait for all replica entries to be created and visible in local cluster state (updated by ZK watches)
replicas = ocmh.waitToSeeReplicasInState(collectionName, coresToCreate.keySet());
}
@@ -286,7 +327,7 @@ public class CreateCollectionCmd implements OverseerCollectionMessageHandler.Cmd
shardRequestTracker.processResponses(results, shardHandler, false, null, Collections.emptySet());
@SuppressWarnings({"rawtypes"})
boolean failure = results.get("failure") != null && ((SimpleOrderedMap)results.get("failure")).size() > 0;
- if(isPRS) {
+ if (isPRS) {
TimeOut timeout = new TimeOut(Integer.getInteger("solr.waitToSeeReplicasInStateTimeoutSeconds", 120), TimeUnit.SECONDS, timeSource); // could be a big cluster
PerReplicaStates prs = PerReplicaStates.fetch(collectionPath, ocmh.zkStateReader.getZkClient(), null);
while (!timeout.hasTimedOut()) {
@@ -299,9 +340,14 @@ public class CreateCollectionCmd implements OverseerCollectionMessageHandler.Cmd
} else {
failure = true;
}
- // Now ask Overseer to fetch the latest state of collection
- // from ZK
- ocmh.overseer.submit(new RefreshCollectionMessage(collectionName));
+ // When cluster state updates are distributed, Overseer state updater is not used and doesn't have to be notified
+ // of a new collection created elsewhere (which is how all collections are created).
+ // Note it is likely possibly to skip the the whole if (isPRS) bloc, but keeping distributed state updates as
+ // close in behavior to Overseer state updates for now.
+ if (!ocmh.getDistributedClusterStateUpdater().isDistributedStateUpdate()) {
+ // Now ask Overseer to fetch the latest state of collection from ZK
+ ocmh.overseer.submit(new RefreshCollectionMessage(collectionName));
+ }
}
if (failure) {
// Let's cleanup as we hit an exception
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateShardCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateShardCmd.java
index 989003a..04eb0f9 100644
--- a/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateShardCmd.java
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateShardCmd.java
@@ -21,6 +21,7 @@ import java.lang.invoke.MethodHandles;
import java.util.HashMap;
import java.util.Map;
+import org.apache.solr.cloud.DistributedClusterStateUpdater;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.cloud.ClusterState;
import org.apache.solr.common.cloud.DocCollection;
@@ -78,10 +79,17 @@ public class CreateShardCmd implements OverseerCollectionMessageHandler.Cmd {
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, NRT_REPLICAS + " + " + TLOG_REPLICAS + " must be greater than 0");
}
- //ZkStateReader zkStateReader = ocmh.zkStateReader;
- ocmh.overseer.offerStateUpdate(Utils.toJSON(message));
- // wait for a while until we see the shard
- //ocmh.waitForNewShard(collectionName, sliceName);
+ if (ocmh.getDistributedClusterStateUpdater().isDistributedStateUpdate()) {
+ // The message has been crafted by CollectionsHandler.CollectionOperation.CREATESHARD_OP and defines the QUEUE_OPERATION
+ // to be CollectionParams.CollectionAction.CREATESHARD.
+ // Likely a bug here (distributed or Overseer based) as we use the collection alias name and not the real name?
+ ocmh.getDistributedClusterStateUpdater().doSingleStateUpdate(DistributedClusterStateUpdater.MutatingCommand.CollectionCreateShard, message,
+ ocmh.cloudManager, ocmh.zkStateReader);
+ } else {
+ // message contains extCollectionName that might be an alias. Unclear (to me) how this works in that case.
+ ocmh.overseer.offerStateUpdate(Utils.toJSON(message));
+ }
+
// wait for a while until we see the shard and update the local view of the cluster state
clusterState = ocmh.waitForNewShard(collectionName, sliceName);
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteCollectionCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteCollectionCmd.java
index 3b7eb93..4d2129f 100644
--- a/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteCollectionCmd.java
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteCollectionCmd.java
@@ -28,6 +28,7 @@ import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
+import org.apache.solr.cloud.DistributedClusterStateUpdater;
import org.apache.solr.cloud.Overseer;
import org.apache.solr.common.NonExistentCoreException;
import org.apache.solr.common.SolrException;
@@ -41,7 +42,6 @@ import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.params.CoreAdminParams;
import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.util.NamedList;
-import org.apache.solr.common.util.TimeSource;
import org.apache.solr.common.util.Utils;
import org.apache.solr.core.SolrInfoBean;
import org.apache.solr.core.snapshots.SolrSnapshotManager;
@@ -63,11 +63,9 @@ public class DeleteCollectionCmd implements OverseerCollectionMessageHandler.Cmd
private static final Set<String> okayExceptions = Collections.singleton(NonExistentCoreException.class.getName());
private final OverseerCollectionMessageHandler ocmh;
- private final TimeSource timeSource;
public DeleteCollectionCmd(OverseerCollectionMessageHandler ocmh) {
this.ocmh = ocmh;
- this.timeSource = ocmh.cloudManager.getTimeSource();
}
@Override
@@ -150,7 +148,12 @@ public class DeleteCollectionCmd implements OverseerCollectionMessageHandler.Cmd
}
ZkNodeProps m = new ZkNodeProps(Overseer.QUEUE_OPERATION, DELETE.toLower(), NAME, collection);
- ocmh.overseer.offerStateUpdate(Utils.toJSON(m));
+ if (ocmh.getDistributedClusterStateUpdater().isDistributedStateUpdate()) {
+ ocmh.getDistributedClusterStateUpdater().doSingleStateUpdate(DistributedClusterStateUpdater.MutatingCommand.ClusterDeleteCollection, m,
+ ocmh.cloudManager, ocmh.zkStateReader);
+ } else {
+ ocmh.overseer.offerStateUpdate(Utils.toJSON(m));
+ }
// wait for a while until we don't see the collection
zkStateReader.waitForState(collection, 60, TimeUnit.SECONDS, Objects::isNull);
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteShardCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteShardCmd.java
index ff7edfa..79948c2 100644
--- a/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteShardCmd.java
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteShardCmd.java
@@ -34,6 +34,7 @@ import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
+import org.apache.solr.cloud.DistributedClusterStateUpdater;
import org.apache.solr.cloud.Overseer;
import org.apache.solr.cloud.overseer.OverseerAction;
import org.apache.solr.common.SolrException;
@@ -97,7 +98,19 @@ public class DeleteShardCmd implements OverseerCollectionMessageHandler.Cmd {
propMap.put(sliceId, Slice.State.CONSTRUCTION.toString());
propMap.put(ZkStateReader.COLLECTION_PROP, collectionName);
ZkNodeProps m = new ZkNodeProps(propMap);
- ocmh.overseer.offerStateUpdate(Utils.toJSON(m));
+ if (ocmh.getDistributedClusterStateUpdater().isDistributedStateUpdate()) {
+ // In this DeleteShardCmd.call() method there are potentially two cluster state updates. This is the first one.
+ // Even though the code of this method does not wait for it to complete, it does call the Collection API before
+ // it issues the second state change below. The collection API will be doing its own state change(s), and these will
+ // happen after this one (given it's for the same collection). Therefore we persist this state change
+ // immediately and do not group it with the one done further down.
+ // Once the Collection API is also distributed (and not only the cluster state updates), we will likely be able
+ // to batch more/all cluster state updates done by this command (DeleteShardCmd). TODO SOLR-15146
+ ocmh.getDistributedClusterStateUpdater().doSingleStateUpdate(DistributedClusterStateUpdater.MutatingCommand.SliceUpdateShardState, m,
+ ocmh.cloudManager, ocmh.zkStateReader);
+ } else {
+ ocmh.overseer.offerStateUpdate(Utils.toJSON(m));
+ }
}
String asyncId = message.getStr(ASYNC);
@@ -144,7 +157,12 @@ public class DeleteShardCmd implements OverseerCollectionMessageHandler.Cmd {
ZkNodeProps m = new ZkNodeProps(Overseer.QUEUE_OPERATION, DELETESHARD.toLower(), ZkStateReader.COLLECTION_PROP,
collectionName, ZkStateReader.SHARD_ID_PROP, sliceId);
ZkStateReader zkStateReader = ocmh.zkStateReader;
- ocmh.overseer.offerStateUpdate(Utils.toJSON(m));
+ if (ocmh.getDistributedClusterStateUpdater().isDistributedStateUpdate()) {
+ ocmh.getDistributedClusterStateUpdater().doSingleStateUpdate(DistributedClusterStateUpdater.MutatingCommand.CollectionDeleteShard, m,
+ ocmh.cloudManager, ocmh.zkStateReader);
+ } else {
+ ocmh.overseer.offerStateUpdate(Utils.toJSON(m));
+ }
zkStateReader.waitForState(collectionName, 45, TimeUnit.SECONDS, (c) -> c.getSlice(sliceId) == null);
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/MigrateCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/MigrateCmd.java
index 2b094b2..1b4b019 100644
--- a/solr/core/src/java/org/apache/solr/cloud/api/collections/MigrateCmd.java
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/MigrateCmd.java
@@ -24,6 +24,7 @@ import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.solr.client.solrj.request.CoreAdminRequest;
+import org.apache.solr.cloud.DistributedClusterStateUpdater;
import org.apache.solr.cloud.Overseer;
import org.apache.solr.cloud.api.collections.OverseerCollectionMessageHandler.ShardRequestTracker;
import org.apache.solr.cloud.overseer.OverseerAction;
@@ -201,7 +202,12 @@ public class MigrateCmd implements OverseerCollectionMessageHandler.Cmd {
"targetCollection", targetCollection.getName(),
"expireAt", RoutingRule.makeExpiryAt(timeout));
log.info("Adding routing rule: {}", m);
- ocmh.overseer.offerStateUpdate(Utils.toJSON(m));
+ if (ocmh.getDistributedClusterStateUpdater().isDistributedStateUpdate()) {
+ ocmh.getDistributedClusterStateUpdater().doSingleStateUpdate(DistributedClusterStateUpdater.MutatingCommand.SliceAddRoutingRule, m,
+ ocmh.cloudManager, ocmh.zkStateReader);
+ } else {
+ ocmh.overseer.offerStateUpdate(Utils.toJSON(m));
+ }
// wait for a while until we see the new rule
log.info("Waiting to see routing rule updated in clusterstate");
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/OverseerCollectionMessageHandler.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/OverseerCollectionMessageHandler.java
index 53a02fa..c66a65f 100644
--- a/solr/core/src/java/org/apache/solr/cloud/api/collections/OverseerCollectionMessageHandler.java
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/OverseerCollectionMessageHandler.java
@@ -29,6 +29,7 @@ import org.apache.solr.client.solrj.impl.HttpSolrClient;
import org.apache.solr.client.solrj.request.AbstractUpdateRequest;
import org.apache.solr.client.solrj.request.UpdateRequest;
import org.apache.solr.client.solrj.response.UpdateResponse;
+import org.apache.solr.cloud.DistributedClusterStateUpdater;
import org.apache.solr.cloud.LockTree;
import org.apache.solr.cloud.Overseer;
import org.apache.solr.cloud.OverseerMessageHandler;
@@ -149,6 +150,7 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler,
String myId;
Stats stats;
TimeSource timeSource;
+ private final DistributedClusterStateUpdater distributedClusterStateUpdater;
// Set that tracks collections that are currently being processed by a running task.
// This is used for handling mutual exclusion of the tasks.
@@ -186,6 +188,7 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler,
this.myId = myId;
this.stats = stats;
this.overseer = overseer;
+ this.distributedClusterStateUpdater = overseer.getDistributedClusterStateUpdater();
this.cloudManager = overseer.getSolrCloudManager();
this.timeSource = cloudManager.getTimeSource();
this.isClosed = false;
@@ -267,6 +270,10 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler,
return new OverseerSolrResponse(results);
}
+ DistributedClusterStateUpdater getDistributedClusterStateUpdater() {
+ return distributedClusterStateUpdater;
+ }
+
@SuppressForbidden(reason = "Needs currentTimeMillis for mock requests")
@SuppressWarnings({"unchecked"})
private void mockOperation(ClusterState state, ZkNodeProps message, @SuppressWarnings({"rawtypes"})NamedList results) throws InterruptedException {
@@ -328,23 +335,31 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler,
private void processReplicaAddPropertyCommand(ClusterState clusterState, ZkNodeProps message, @SuppressWarnings({"rawtypes"})NamedList results)
throws Exception {
checkRequired(message, COLLECTION_PROP, SHARD_ID_PROP, REPLICA_PROP, PROPERTY_PROP, PROPERTY_VALUE_PROP);
- SolrZkClient zkClient = zkStateReader.getZkClient();
Map<String, Object> propMap = new HashMap<>();
propMap.put(Overseer.QUEUE_OPERATION, ADDREPLICAPROP.toLower());
propMap.putAll(message.getProperties());
ZkNodeProps m = new ZkNodeProps(propMap);
- overseer.offerStateUpdate(Utils.toJSON(m));
+ if (distributedClusterStateUpdater.isDistributedStateUpdate()) {
+ distributedClusterStateUpdater.doSingleStateUpdate(DistributedClusterStateUpdater.MutatingCommand.ReplicaAddReplicaProperty, m,
+ cloudManager, zkStateReader);
+ } else {
+ overseer.offerStateUpdate(Utils.toJSON(m));
+ }
}
private void processReplicaDeletePropertyCommand(ClusterState clusterState, ZkNodeProps message, @SuppressWarnings({"rawtypes"})NamedList results)
throws Exception {
checkRequired(message, COLLECTION_PROP, SHARD_ID_PROP, REPLICA_PROP, PROPERTY_PROP);
- SolrZkClient zkClient = zkStateReader.getZkClient();
Map<String, Object> propMap = new HashMap<>();
propMap.put(Overseer.QUEUE_OPERATION, DELETEREPLICAPROP.toLower());
propMap.putAll(message.getProperties());
ZkNodeProps m = new ZkNodeProps(propMap);
- overseer.offerStateUpdate(Utils.toJSON(m));
+ if (distributedClusterStateUpdater.isDistributedStateUpdate()) {
+ distributedClusterStateUpdater.doSingleStateUpdate(DistributedClusterStateUpdater.MutatingCommand.ReplicaDeleteReplicaProperty, m,
+ cloudManager, zkStateReader);
+ } else {
+ overseer.offerStateUpdate(Utils.toJSON(m));
+ }
}
private void balanceProperty(ClusterState clusterState, ZkNodeProps message, @SuppressWarnings({"rawtypes"})NamedList results) throws Exception {
@@ -353,11 +368,15 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler,
"The '" + COLLECTION_PROP + "' and '" + PROPERTY_PROP +
"' parameters are required for the BALANCESHARDUNIQUE operation, no action taken");
}
- SolrZkClient zkClient = zkStateReader.getZkClient();
Map<String, Object> m = new HashMap<>();
m.put(Overseer.QUEUE_OPERATION, BALANCESHARDUNIQUE.toLower());
m.putAll(message.getProperties());
- overseer.offerStateUpdate(Utils.toJSON(m));
+ if (distributedClusterStateUpdater.isDistributedStateUpdate()) {
+ distributedClusterStateUpdater.doSingleStateUpdate(DistributedClusterStateUpdater.MutatingCommand.BalanceShardsUnique, new ZkNodeProps(m),
+ cloudManager, zkStateReader);
+ } else {
+ overseer.offerStateUpdate(Utils.toJSON(m));
+ }
}
/**
@@ -424,7 +443,12 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler,
ZkStateReader.NODE_NAME_PROP, replica.getNodeName(),
ZkStateReader.COLLECTION_PROP, collectionName,
ZkStateReader.CORE_NODE_NAME_PROP, replicaName);
- overseer.offerStateUpdate(Utils.toJSON(m));
+ if (distributedClusterStateUpdater.isDistributedStateUpdate()) {
+ distributedClusterStateUpdater.doSingleStateUpdate(DistributedClusterStateUpdater.MutatingCommand.SliceRemoveReplica, m,
+ cloudManager, zkStateReader);
+ } else {
+ overseer.offerStateUpdate(Utils.toJSON(m));
+ }
}
void checkRequired(ZkNodeProps message, String... props) {
@@ -554,7 +578,13 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler,
reloadCollection(null, new ZkNodeProps(NAME, collectionName), results);
}
- overseer.offerStateUpdate(Utils.toJSON(message));
+ if (distributedClusterStateUpdater.isDistributedStateUpdate()) {
+ // Apply the state update right away. The wait will still be useful for the change to be visible in the local cluster state (watchers have fired).
+ distributedClusterStateUpdater.doSingleStateUpdate(DistributedClusterStateUpdater.MutatingCommand.CollectionModifyCollection, message,
+ cloudManager, zkStateReader);
+ } else {
+ overseer.offerStateUpdate(Utils.toJSON(message));
+ }
try {
zkStateReader.waitForState(collectionName, 30, TimeUnit.SECONDS, c -> {
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/OverseerStatusCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/OverseerStatusCmd.java
index 7bc51c9..f0a04d2 100644
--- a/solr/core/src/java/org/apache/solr/cloud/api/collections/OverseerStatusCmd.java
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/OverseerStatusCmd.java
@@ -60,6 +60,16 @@ public class OverseerStatusCmd implements OverseerCollectionMessageHandler.Cmd {
zkStateReader.getZkClient().getData("/overseer/collection-queue-work",null, stat, true);
results.add("overseer_collection_queue_size", stat.getNumChildren());
+ // Overseer reported stats below are tracked in the Overseer cluster state updater when it performs certain operations.
+ // Sharing the ocmh.stats variable between the cluster state updater and the Collection API (this command) is by the way
+ // about the only thing that ties the cluster state updater to the collection api message handler and that takes
+ // advantage of the fact that both run on the same node (the Overseer node). (recently added PerReplicaStates also
+ // take advantage of this through method Overseer.submit()).
+ // When distributed updates are enabled, cluster state updates are not done by the Overseer (it doesn't even see them)
+ // and therefore can't report them. The corresponding data in OVERSEERSTATUS (all data built below) is no longer returned.
+ // This means top level keys "overseer_operations", "collection_operations", "overseer_queue", "overseer_internal_queue"
+ // and "collection_queue" are either empty or do not contain all expected information when cluster state updates are distributed.
+
@SuppressWarnings({"rawtypes"})
NamedList overseerStats = new NamedList();
@SuppressWarnings({"rawtypes"})
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/ReindexCollectionCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/ReindexCollectionCmd.java
index c45c772..30cfa03 100644
--- a/solr/core/src/java/org/apache/solr/cloud/api/collections/ReindexCollectionCmd.java
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/ReindexCollectionCmd.java
@@ -40,6 +40,7 @@ import org.apache.solr.client.solrj.impl.HttpSolrClient;
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
import org.apache.solr.client.solrj.request.QueryRequest;
import org.apache.solr.client.solrj.response.QueryResponse;
+import org.apache.solr.cloud.DistributedClusterStateUpdater;
import org.apache.solr.cloud.Overseer;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.cloud.ClusterState;
@@ -370,7 +371,12 @@ public class ReindexCollectionCmd implements OverseerCollectionMessageHandler.Cm
Overseer.QUEUE_OPERATION, CollectionParams.CollectionAction.MODIFYCOLLECTION.toLower(),
ZkStateReader.COLLECTION_PROP, collection,
ZkStateReader.READ_ONLY, "true");
- ocmh.overseer.offerStateUpdate(Utils.toJSON(cmd));
+ if (ocmh.getDistributedClusterStateUpdater().isDistributedStateUpdate()) {
+ ocmh.getDistributedClusterStateUpdater().doSingleStateUpdate(DistributedClusterStateUpdater.MutatingCommand.CollectionModifyCollection, cmd,
+ ocmh.cloudManager, ocmh.zkStateReader);
+ } else {
+ ocmh.overseer.offerStateUpdate(Utils.toJSON(cmd));
+ }
TestInjection.injectReindexLatch();
@@ -477,14 +483,24 @@ public class ReindexCollectionCmd implements OverseerCollectionMessageHandler.Cm
Overseer.QUEUE_OPERATION, CollectionParams.CollectionAction.MODIFYCOLLECTION.toLower(),
ZkStateReader.COLLECTION_PROP, collection,
ZkStateReader.READ_ONLY, null);
- ocmh.overseer.offerStateUpdate(Utils.toJSON(props));
+ if (ocmh.getDistributedClusterStateUpdater().isDistributedStateUpdate()) {
+ ocmh.getDistributedClusterStateUpdater().doSingleStateUpdate(DistributedClusterStateUpdater.MutatingCommand.CollectionModifyCollection, props,
+ ocmh.cloudManager, ocmh.zkStateReader);
+ } else {
+ ocmh.overseer.offerStateUpdate(Utils.toJSON(props));
+ }
}
// 9. set FINISHED state on the target and clear the state on the source
ZkNodeProps props = new ZkNodeProps(
Overseer.QUEUE_OPERATION, CollectionParams.CollectionAction.MODIFYCOLLECTION.toLower(),
ZkStateReader.COLLECTION_PROP, targetCollection,
REINDEXING_STATE, State.FINISHED.toLower());
- ocmh.overseer.offerStateUpdate(Utils.toJSON(props));
+ if (ocmh.getDistributedClusterStateUpdater().isDistributedStateUpdate()) {
+ ocmh.getDistributedClusterStateUpdater().doSingleStateUpdate(DistributedClusterStateUpdater.MutatingCommand.CollectionModifyCollection, props,
+ ocmh.cloudManager, ocmh.zkStateReader);
+ } else {
+ ocmh.overseer.offerStateUpdate(Utils.toJSON(props));
+ }
reindexingState.put(STATE, State.FINISHED.toLower());
reindexingState.put(PHASE, "done");
@@ -804,7 +820,12 @@ public class ReindexCollectionCmd implements OverseerCollectionMessageHandler.Cm
Overseer.QUEUE_OPERATION, CollectionParams.CollectionAction.MODIFYCOLLECTION.toLower(),
ZkStateReader.COLLECTION_PROP, collection,
ZkStateReader.READ_ONLY, null);
- ocmh.overseer.offerStateUpdate(Utils.toJSON(props));
+ if (ocmh.getDistributedClusterStateUpdater().isDistributedStateUpdate()) {
+ ocmh.getDistributedClusterStateUpdater().doSingleStateUpdate(DistributedClusterStateUpdater.MutatingCommand.CollectionModifyCollection, props,
+ ocmh.cloudManager, ocmh.zkStateReader);
+ } else {
+ ocmh.overseer.offerStateUpdate(Utils.toJSON(props));
+ }
removeReindexingState(collection);
}
}
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/RestoreCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/RestoreCmd.java
index 48a584b..774e054 100644
--- a/solr/core/src/java/org/apache/solr/cloud/api/collections/RestoreCmd.java
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/RestoreCmd.java
@@ -17,7 +17,26 @@
package org.apache.solr.cloud.api.collections;
+import java.io.Closeable;
+import java.io.IOException;
+import java.lang.invoke.MethodHandles;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import org.apache.solr.cloud.DistributedClusterStateUpdater;
import org.apache.solr.cloud.Overseer;
import org.apache.solr.cloud.api.collections.OverseerCollectionMessageHandler.ShardRequestTracker;
import org.apache.solr.cloud.overseer.OverseerAction;
@@ -48,25 +67,6 @@ import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.Closeable;
-import java.io.IOException;
-import java.lang.invoke.MethodHandles;
-import java.net.URI;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Objects;
-import java.util.Optional;
-import java.util.Set;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-
import static org.apache.solr.common.cloud.ZkStateReader.*;
import static org.apache.solr.common.params.CollectionParams.CollectionAction.CREATE;
import static org.apache.solr.common.params.CollectionParams.CollectionAction.CREATESHARD;
@@ -343,7 +343,12 @@ public class RestoreCmd implements OverseerCollectionMessageHandler.Cmd {
propMap.put(shard.getName(), Slice.State.CONSTRUCTION.toString());
}
propMap.put(ZkStateReader.COLLECTION_PROP, restoreCollection.getName());
- ocmh.overseer.offerStateUpdate(Utils.toJSON(new ZkNodeProps(propMap)));
+ if (ocmh.getDistributedClusterStateUpdater().isDistributedStateUpdate()) {
+ ocmh.getDistributedClusterStateUpdater().doSingleStateUpdate(DistributedClusterStateUpdater.MutatingCommand.SliceUpdateShardState, new ZkNodeProps(propMap),
+ ocmh.cloudManager, ocmh.zkStateReader);
+ } else {
+ ocmh.overseer.offerStateUpdate(Utils.toJSON(new ZkNodeProps(propMap)));
+ }
}
private List<ReplicaPosition> getReplicaPositions(DocCollection restoreCollection, List<String> nodeList, ClusterState clusterState, List<String> sliceNames) throws IOException, InterruptedException {
@@ -461,7 +466,12 @@ public class RestoreCmd implements OverseerCollectionMessageHandler.Cmd {
for (Slice shard : restoreCollection.getSlices()) {
propMap.put(shard.getName(), Slice.State.ACTIVE.toString());
}
- ocmh.overseer.offerStateUpdate((Utils.toJSON(new ZkNodeProps(propMap))));
+ if (ocmh.getDistributedClusterStateUpdater().isDistributedStateUpdate()) {
+ ocmh.getDistributedClusterStateUpdater().doSingleStateUpdate(DistributedClusterStateUpdater.MutatingCommand.SliceUpdateShardState, new ZkNodeProps(propMap),
+ ocmh.cloudManager, ocmh.zkStateReader);
+ } else {
+ ocmh.overseer.offerStateUpdate(Utils.toJSON(new ZkNodeProps(propMap)));
+ }
}
private void addReplicasToShards(@SuppressWarnings({"rawtypes"}) NamedList results,
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/SplitShardCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/SplitShardCmd.java
index ff9df3b..2c0ad69 100644
--- a/solr/core/src/java/org/apache/solr/cloud/api/collections/SplitShardCmd.java
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/SplitShardCmd.java
@@ -23,6 +23,7 @@ import org.apache.solr.client.solrj.cloud.NodeStateProvider;
import org.apache.solr.client.solrj.cloud.SolrCloudManager;
import org.apache.solr.client.solrj.cloud.VersionedData;
import org.apache.solr.client.solrj.request.CoreAdminRequest;
+import org.apache.solr.cloud.DistributedClusterStateUpdater;
import org.apache.solr.cloud.Overseer;
import org.apache.solr.cloud.api.collections.OverseerCollectionMessageHandler.ShardRequestTracker;
import org.apache.solr.cloud.overseer.OverseerAction;
@@ -245,7 +246,7 @@ public class SplitShardCmd implements OverseerCollectionMessageHandler.Cmd {
// delete the shards
log.info("Sub-shard: {} already exists therefore requesting its deletion", subSlice);
Map<String, Object> propMap = new HashMap<>();
- propMap.put(Overseer.QUEUE_OPERATION, "deleteshard");
+ propMap.put(Overseer.QUEUE_OPERATION, DELETESHARD.toLower());
propMap.put(COLLECTION_PROP, collectionName);
propMap.put(SHARD_ID_PROP, subSlice);
ZkNodeProps m = new ZkNodeProps(propMap);
@@ -288,7 +289,12 @@ public class SplitShardCmd implements OverseerCollectionMessageHandler.Cmd {
propMap.put("shard_parent_node", nodeName);
propMap.put("shard_parent_zk_session", leaderZnodeStat.getEphemeralOwner());
- ocmh.overseer.offerStateUpdate(Utils.toJSON(new ZkNodeProps(propMap)));
+ if (ocmh.getDistributedClusterStateUpdater().isDistributedStateUpdate()) {
+ ocmh.getDistributedClusterStateUpdater().doSingleStateUpdate(DistributedClusterStateUpdater.MutatingCommand.CollectionCreateShard, new ZkNodeProps(propMap),
+ ocmh.cloudManager, ocmh.zkStateReader);
+ } else {
+ ocmh.overseer.offerStateUpdate(Utils.toJSON(new ZkNodeProps(propMap)));
+ }
// wait until we are able to see the new shard in cluster state and refresh the local view of the cluster state
clusterState = ocmh.waitForNewShard(collectionName, subSlice);
@@ -442,6 +448,13 @@ public class SplitShardCmd implements OverseerCollectionMessageHandler.Cmd {
t.stop();
t = timings.sub("createReplicaPlaceholders");
+ final DistributedClusterStateUpdater.StateChangeRecorder scr;
+ boolean hasRecordedDistributedUpdate = false;
+ if (ocmh.getDistributedClusterStateUpdater().isDistributedStateUpdate()) {
+ scr = ocmh.getDistributedClusterStateUpdater().createStateChangeRecorder(collectionName, false);
+ } else {
+ scr = null;
+ }
for (ReplicaPosition replicaPosition : replicaPositions) {
String sliceName = replicaPosition.shard;
String subShardNodeName = replicaPosition.node;
@@ -462,7 +475,12 @@ public class SplitShardCmd implements OverseerCollectionMessageHandler.Cmd {
ZkStateReader.STATE_PROP, Replica.State.DOWN.toString(),
ZkStateReader.NODE_NAME_PROP, subShardNodeName,
CommonAdminParams.WAIT_FOR_FINAL_STATE, Boolean.toString(waitForFinalState));
- ocmh.overseer.offerStateUpdate(Utils.toJSON(props));
+ if (ocmh.getDistributedClusterStateUpdater().isDistributedStateUpdate()) {
+ hasRecordedDistributedUpdate = true;
+ scr.record(DistributedClusterStateUpdater.MutatingCommand.SliceAddReplica, props);
+ } else {
+ ocmh.overseer.offerStateUpdate(Utils.toJSON(props));
+ }
HashMap<String, Object> propMap = new HashMap<>();
propMap.put(Overseer.QUEUE_OPERATION, ADDREPLICA.toLower());
@@ -488,6 +506,12 @@ public class SplitShardCmd implements OverseerCollectionMessageHandler.Cmd {
replicas.add(propMap);
}
+ if (hasRecordedDistributedUpdate && ocmh.getDistributedClusterStateUpdater().isDistributedStateUpdate()) {
+ // Actually add the replicas to the collection state. Note that when Overseer takes care of the state,
+ // there is no wait here for the state update to be visible, but with distributed state update done synchronously
+ // we wait (we could in theory create a thread and have it do the work if we REALLY needed, but we likely don't).
+ scr.executeStateUpdates(ocmh.cloudManager, ocmh.zkStateReader);
+ }
t.stop();
assert TestInjection.injectSplitFailureBeforeReplicaCreation();
@@ -504,12 +528,17 @@ public class SplitShardCmd implements OverseerCollectionMessageHandler.Cmd {
}
propMap.put(ZkStateReader.COLLECTION_PROP, collectionName);
ZkNodeProps m = new ZkNodeProps(propMap);
- ocmh.overseer.offerStateUpdate(Utils.toJSON(m));
+ if (ocmh.getDistributedClusterStateUpdater().isDistributedStateUpdate()) {
+ ocmh.getDistributedClusterStateUpdater().doSingleStateUpdate(DistributedClusterStateUpdater.MutatingCommand.SliceUpdateShardState, m,
+ ocmh.cloudManager, ocmh.zkStateReader);
+ } else {
+ ocmh.overseer.offerStateUpdate(Utils.toJSON(m));
+ }
if (leaderZnodeStat == null) {
// the leader is not live anymore, fail the split!
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "The shard leader node: " + parentShardLeader.getNodeName() + " is not live anymore!");
- } else if (ephemeralOwner != leaderZnodeStat.getEphemeralOwner()) {
+ } else {
// there's a new leader, fail the split!
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
"The zk session id for the shard leader node: " + parentShardLeader.getNodeName() + " has changed from "
@@ -538,7 +567,12 @@ public class SplitShardCmd implements OverseerCollectionMessageHandler.Cmd {
}
propMap.put(ZkStateReader.COLLECTION_PROP, collectionName);
ZkNodeProps m = new ZkNodeProps(propMap);
- ocmh.overseer.offerStateUpdate(Utils.toJSON(m));
+ if (ocmh.getDistributedClusterStateUpdater().isDistributedStateUpdate()) {
+ ocmh.getDistributedClusterStateUpdater().doSingleStateUpdate(DistributedClusterStateUpdater.MutatingCommand.SliceUpdateShardState, m,
+ ocmh.cloudManager, ocmh.zkStateReader);
+ } else {
+ ocmh.overseer.offerStateUpdate(Utils.toJSON(m));
+ }
} else {
log.info("Requesting shard state be set to 'recovery'");
Map<String, Object> propMap = new HashMap<>();
@@ -548,7 +582,12 @@ public class SplitShardCmd implements OverseerCollectionMessageHandler.Cmd {
}
propMap.put(ZkStateReader.COLLECTION_PROP, collectionName);
ZkNodeProps m = new ZkNodeProps(propMap);
- ocmh.overseer.offerStateUpdate(Utils.toJSON(m));
+ if (ocmh.getDistributedClusterStateUpdater().isDistributedStateUpdate()) {
+ ocmh.getDistributedClusterStateUpdater().doSingleStateUpdate(DistributedClusterStateUpdater.MutatingCommand.SliceUpdateShardState, m,
+ ocmh.cloudManager, ocmh.zkStateReader);
+ } else {
+ ocmh.overseer.offerStateUpdate(Utils.toJSON(m));
+ }
}
t = timings.sub("createCoresForReplicas");
@@ -709,7 +748,12 @@ public class SplitShardCmd implements OverseerCollectionMessageHandler.Cmd {
if (sendUpdateState) {
try {
ZkNodeProps m = new ZkNodeProps(propMap);
- ocmh.overseer.offerStateUpdate(Utils.toJSON(m));
+ if (ocmh.getDistributedClusterStateUpdater().isDistributedStateUpdate()) {
+ ocmh.getDistributedClusterStateUpdater().doSingleStateUpdate(DistributedClusterStateUpdater.MutatingCommand.SliceUpdateShardState, m,
+ ocmh.cloudManager, ocmh.zkStateReader);
+ } else {
+ ocmh.overseer.offerStateUpdate(Utils.toJSON(m));
+ }
} catch (Exception e) {
// don't give up yet - just log the error, we may still be able to clean up
log.warn("Cleanup failed after failed split of {}/{}: (slice state changes)", collectionName, parentShard, e);
@@ -724,7 +768,7 @@ public class SplitShardCmd implements OverseerCollectionMessageHandler.Cmd {
}
log.debug("- sub-shard: {} exists therefore requesting its deletion", subSlice);
HashMap<String, Object> props = new HashMap<>();
- props.put(Overseer.QUEUE_OPERATION, "deleteshard");
+ props.put(Overseer.QUEUE_OPERATION, DELETESHARD.toLower());
props.put(COLLECTION_PROP, collectionName);
props.put(SHARD_ID_PROP, subSlice);
ZkNodeProps m = new ZkNodeProps(props);
diff --git a/solr/core/src/java/org/apache/solr/cloud/overseer/CollectionMutator.java b/solr/core/src/java/org/apache/solr/cloud/overseer/CollectionMutator.java
index 073bdaf..94c89a0 100644
--- a/solr/core/src/java/org/apache/solr/cloud/overseer/CollectionMutator.java
+++ b/solr/core/src/java/org/apache/solr/cloud/overseer/CollectionMutator.java
@@ -26,14 +26,12 @@ import org.apache.solr.client.solrj.cloud.DistribStateManager;
import org.apache.solr.client.solrj.cloud.SolrCloudManager;
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
import org.apache.solr.common.cloud.*;
-import org.apache.solr.common.util.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.apache.solr.common.cloud.ZkStateReader.COLLECTION_PROP;
import static org.apache.solr.common.cloud.ZkStateReader.NRT_REPLICAS;
import static org.apache.solr.common.cloud.ZkStateReader.REPLICATION_FACTOR;
-import static org.apache.solr.common.params.CommonParams.NAME;
public class CollectionMutator {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
@@ -155,24 +153,9 @@ public class CollectionMutator {
}
public static DocCollection updateSlice(String collectionName, DocCollection collection, Slice slice) {
- DocCollection newCollection = null;
- Map<String, Slice> slices;
-
- if (collection == null) {
- // when updateSlice is called on a collection that doesn't exist, it's currently when a core is publishing itself
- // without explicitly creating a collection. In this current case, we assume custom sharding with an "implicit" router.
- slices = new LinkedHashMap<>(1);
- slices.put(slice.getName(), slice);
- Map<String, Object> props = new HashMap<>(1);
- props.put(DocCollection.DOC_ROUTER, Utils.makeMap(NAME, ImplicitDocRouter.NAME));
- newCollection = new DocCollection(collectionName, slices, props, new ImplicitDocRouter());
- } else {
- slices = new LinkedHashMap<>(collection.getSlicesMap()); // make a shallow copy
- slices.put(slice.getName(), slice);
- newCollection = collection.copyWithSlices(slices);
- }
-
- return newCollection;
+ Map<String, Slice> slices = new LinkedHashMap<>(collection.getSlicesMap()); // make a shallow copy
+ slices.put(slice.getName(), slice);
+ return collection.copyWithSlices(slices);
}
static boolean checkCollectionKeyExistence(ZkNodeProps message) {
diff --git a/solr/core/src/java/org/apache/solr/cloud/overseer/NodeMutator.java b/solr/core/src/java/org/apache/solr/cloud/overseer/NodeMutator.java
index 48e0a16..adf146d 100644
--- a/solr/core/src/java/org/apache/solr/cloud/overseer/NodeMutator.java
+++ b/solr/core/src/java/org/apache/solr/cloud/overseer/NodeMutator.java
@@ -18,6 +18,7 @@ package org.apache.solr.cloud.overseer;
import java.lang.invoke.MethodHandles;
import java.util.ArrayList;
+import java.util.Optional;
import java.util.Collection;
import java.util.LinkedHashMap;
import java.util.List;
@@ -39,57 +40,74 @@ public class NodeMutator {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
public List<ZkWriteCommand> downNode(ClusterState clusterState, ZkNodeProps message) {
- List<ZkWriteCommand> zkWriteCommands = new ArrayList<>();
String nodeName = message.getStr(ZkStateReader.NODE_NAME_PROP);
log.debug("DownNode state invoked for node: {}", nodeName);
+ List<ZkWriteCommand> zkWriteCommands = new ArrayList<>();
+
Map<String, DocCollection> collections = clusterState.getCollectionsMap();
for (Map.Entry<String, DocCollection> entry : collections.entrySet()) {
- List<String> downedReplicas = new ArrayList<>();
-
- String collection = entry.getKey();
+ String collectionName = entry.getKey();
DocCollection docCollection = entry.getValue();
- Map<String,Slice> slicesCopy = new LinkedHashMap<>(docCollection.getSlicesMap());
-
- boolean needToUpdateCollection = false;
- for (Entry<String, Slice> sliceEntry : slicesCopy.entrySet()) {
- Slice slice = sliceEntry.getValue();
- Map<String, Replica> newReplicas = slice.getReplicasCopy();
-
- Collection<Replica> replicas = slice.getReplicas();
- for (Replica replica : replicas) {
- String rNodeName = replica.getNodeName();
- if (rNodeName == null) {
- throw new RuntimeException("Replica without node name! " + replica);
- }
- if (rNodeName.equals(nodeName)) {
- log.debug("Update replica state for {} to {}", replica, Replica.State.DOWN);
- Map<String, Object> props = replica.shallowCopy();
- Replica newReplica = new Replica(replica.getName(), replica.node, replica.collection, slice.getName(), replica.core,
- Replica.State.DOWN, replica.type, props);
- newReplicas.put(replica.getName(), newReplica);
- needToUpdateCollection = true;
- downedReplicas.add(replica.getName());
- }
- }
+ Optional<ZkWriteCommand> zkWriteCommand = computeCollectionUpdate(nodeName, collectionName, docCollection);
- Slice newSlice = new Slice(slice.getName(), newReplicas, slice.shallowCopy(),collection);
- slicesCopy.put(slice.getName(), newSlice);
+ if (zkWriteCommand.isPresent()) {
+ zkWriteCommands.add(zkWriteCommand.get());
}
+ }
+
+ return zkWriteCommands;
+ }
+
+ /**
+ * Returns the write command needed to update the replicas of a given collection given the identity of a node being down.
+ * @return An optional with the write command or an empty one if the collection does not need any state modification.
+ * The returned write command might be for per replica state updates or for an update to state.json, depending on the
+ * configuration of the collection.
+ */
+ public static Optional<ZkWriteCommand> computeCollectionUpdate(String nodeName, String collectionName, DocCollection docCollection) {
+ boolean needToUpdateCollection = false;
+ List<String> downedReplicas = new ArrayList<>();
+ Map<String,Slice> slicesCopy = new LinkedHashMap<>(docCollection.getSlicesMap());
- if (needToUpdateCollection) {
- if (docCollection.isPerReplicaState()) {
- zkWriteCommands.add(new ZkWriteCommand(collection, docCollection.copyWithSlices(slicesCopy),
- PerReplicaStatesOps.downReplicas(downedReplicas, docCollection.getPerReplicaStates()), false));
- } else {
- zkWriteCommands.add(new ZkWriteCommand(collection, docCollection.copyWithSlices(slicesCopy)));
+ for (Entry<String, Slice> sliceEntry : slicesCopy.entrySet()) {
+ Slice slice = sliceEntry.getValue();
+ Map<String, Replica> newReplicas = slice.getReplicasCopy();
+
+ Collection<Replica> replicas = slice.getReplicas();
+ for (Replica replica : replicas) {
+ String rNodeName = replica.getNodeName();
+ if (rNodeName == null) {
+ throw new RuntimeException("Replica without node name! " + replica);
+ }
+ if (rNodeName.equals(nodeName)) {
+ log.debug("Update replica state for {} to {}", replica, Replica.State.DOWN);
+ Map<String, Object> props = replica.shallowCopy();
+ Replica newReplica = new Replica(replica.getName(), replica.node, replica.collection, slice.getName(), replica.core,
+ Replica.State.DOWN, replica.type, props);
+ newReplicas.put(replica.getName(), newReplica);
+ needToUpdateCollection = true;
+ downedReplicas.add(replica.getName());
}
}
+
+ Slice newSlice = new Slice(slice.getName(), newReplicas, slice.shallowCopy(),collectionName);
+ slicesCopy.put(slice.getName(), newSlice);
}
- return zkWriteCommands;
+ if (needToUpdateCollection) {
+ if (docCollection.isPerReplicaState()) {
+ return Optional.of(new ZkWriteCommand(collectionName, docCollection.copyWithSlices(slicesCopy),
+ PerReplicaStatesOps.downReplicas(downedReplicas, docCollection.getPerReplicaStates()), false));
+ } else {
+ return Optional.of(new ZkWriteCommand(collectionName, docCollection.copyWithSlices(slicesCopy)));
+ }
+ } else {
+ // No update needed for this collection
+ return Optional.empty();
+ }
}
}
diff --git a/solr/core/src/java/org/apache/solr/cloud/overseer/SliceMutator.java b/solr/core/src/java/org/apache/solr/cloud/overseer/SliceMutator.java
index 2594ee4..97285cc 100644
--- a/solr/core/src/java/org/apache/solr/cloud/overseer/SliceMutator.java
+++ b/solr/core/src/java/org/apache/solr/cloud/overseer/SliceMutator.java
@@ -229,7 +229,7 @@ public class SliceMutator {
DocCollection collection = clusterState.getCollection(collectionName);
Slice slice = collection.getSlice(shard);
if (slice == null) {
- throw new RuntimeException("Overseer.addRoutingRule unknown collection: " + collectionName + " slice:" + shard);
+ throw new RuntimeException("Overseer.addRoutingRule collection: " + collectionName + ", unknown slice: " + shard);
}
Map<String, RoutingRule> routingRules = slice.getRoutingRules();
diff --git a/solr/core/src/java/org/apache/solr/cloud/overseer/ZkStateWriter.java b/solr/core/src/java/org/apache/solr/cloud/overseer/ZkStateWriter.java
index 978b209..fd2e020 100644
--- a/solr/core/src/java/org/apache/solr/cloud/overseer/ZkStateWriter.java
+++ b/solr/core/src/java/org/apache/solr/cloud/overseer/ZkStateWriter.java
@@ -28,6 +28,7 @@ import org.apache.solr.cloud.Stats;
import org.apache.solr.common.cloud.ClusterState;
import org.apache.solr.common.cloud.DocCollection;
import org.apache.solr.common.cloud.PerReplicaStates;
+import org.apache.solr.common.cloud.PerReplicaStatesOps;
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.util.Utils;
import org.apache.zookeeper.CreateMode;
@@ -59,7 +60,7 @@ public class ZkStateWriter {
/**
* Represents a no-op {@link ZkWriteCommand} which will result in no modification to cluster state
*/
- public static ZkWriteCommand NO_OP = ZkWriteCommand.noop();
+ public static ZkWriteCommand NO_OP = ZkWriteCommand.NO_OP;
protected final ZkStateReader reader;
protected final Stats stats;
@@ -217,14 +218,17 @@ public class ZkStateWriter {
ZkWriteCommand cmd = entry.getValue();
DocCollection c = cmd.collection;
- if(cmd.ops != null && cmd.ops.isPreOp()) {
+ // Update the Per Replica State znodes if needed
+ if (cmd.ops != null) {
cmd.ops.persist(path, reader.getZkClient());
clusterState = clusterState.copyWith(name,
cmd.collection.copyWith(PerReplicaStates.fetch(cmd.collection.getZNode(), reader.getZkClient(), null)));
}
- if (!cmd.persistCollState) continue;
+
+ // Update the state.json file if needed
+ if (!cmd.persistJsonState) continue;
if (c == null) {
- // let's clean up the state.json of this collection only, the rest should be clean by delete collection cmd
+ // let's clean up the state.json of this collection only, the rest should be cleaned by delete collection cmd
log.debug("going to delete state.json {}", path);
reader.getZkClient().clean(path);
} else {
@@ -243,13 +247,18 @@ public class ZkStateWriter {
clusterState = clusterState.copyWith(name, newCollection);
}
}
- if(cmd.ops != null && !cmd.ops.isPreOp()) {
- cmd.ops.persist(path, reader.getZkClient());
- DocCollection currentCollState = clusterState.getCollection(cmd.name);
- if ( currentCollState != null) {
- clusterState = clusterState.copyWith(name,
- currentCollState.copyWith(PerReplicaStates.fetch(currentCollState.getZNode(), reader.getZkClient(), null)));
- }
+
+ // When dealing with a per replica collection that did not do any update to the per replica states znodes but did
+ // update state.json, we add then remove a dummy node to change the cversion of the parent znode.
+ // This is not needed by Solr, there's no code watching the children and not watching the state.json node itself.
+ // It would be useful for external code watching the collection's Zookeeper state.json node children but not the node itself.
+ if (cmd.ops == null && cmd.isPerReplicaStateCollection) {
+ PerReplicaStatesOps.touchChildren().persist(path, reader.getZkClient());
+ DocCollection currentCollState = clusterState.getCollection(cmd.name);
+ if (currentCollState != null) {
+ clusterState = clusterState.copyWith(name,
+ currentCollState.copyWith(PerReplicaStates.fetch(currentCollState.getZNode(), reader.getZkClient(), null)));
+ }
}
}
diff --git a/solr/core/src/java/org/apache/solr/cloud/overseer/ZkWriteCommand.java b/solr/core/src/java/org/apache/solr/cloud/overseer/ZkWriteCommand.java
index 39d953c..57fc3a5 100644
--- a/solr/core/src/java/org/apache/solr/cloud/overseer/ZkWriteCommand.java
+++ b/solr/core/src/java/org/apache/solr/cloud/overseer/ZkWriteCommand.java
@@ -20,51 +20,34 @@ import org.apache.solr.common.cloud.DocCollection;
import org.apache.solr.common.cloud.PerReplicaStatesOps;
public class ZkWriteCommand {
+ /**
+ * Single NO_OP instance, can be compared with ==
+ */
+ static final ZkWriteCommand NO_OP = new ZkWriteCommand(null, null);
public final String name;
public final DocCollection collection;
+ public final boolean isPerReplicaStateCollection;
- public final boolean noop;
// persist the collection state. If this is false, it means the collection state is not modified
- public final boolean persistCollState;
+ public final boolean persistJsonState;
public final PerReplicaStatesOps ops;
- public ZkWriteCommand(String name, DocCollection collection, PerReplicaStatesOps replicaOps, boolean persistCollState) {
- boolean isPerReplicaState = collection.isPerReplicaState();
- this.name = name;
- this.collection = collection;
- this.noop = false;
- this.ops = isPerReplicaState ? replicaOps : null;
- this.persistCollState = isPerReplicaState ? persistCollState : true;
- }
- public ZkWriteCommand(String name, DocCollection collection) {
+ public ZkWriteCommand(String name, DocCollection collection, PerReplicaStatesOps replicaOps, boolean persistJsonState) {
+ isPerReplicaStateCollection = collection != null && collection.isPerReplicaState();
this.name = name;
this.collection = collection;
- this.noop = false;
- persistCollState = true;
- this.ops = collection != null && collection.isPerReplicaState() ?
- PerReplicaStatesOps.touchChildren():
- null;
+ this.ops = replicaOps;
+ this.persistJsonState = persistJsonState || !isPerReplicaStateCollection; // Always persist for non "per replica state" collections
}
- /**
- * Returns a no-op
- */
- protected ZkWriteCommand() {
- this.noop = true;
- this.name = null;
- this.collection = null;
- this.ops = null;
- persistCollState = true;
- }
-
- public static ZkWriteCommand noop() {
- return new ZkWriteCommand();
+ public ZkWriteCommand(String name, DocCollection collection) {
+ this(name, collection, null, true);
}
@Override
public String toString() {
- return getClass().getSimpleName() + ": " + (noop ? "no-op" : name + "=" + collection);
+ return getClass().getSimpleName() + ": " + (this == NO_OP ? "no-op" : name + "=" + collection);
}
}
diff --git a/solr/core/src/java/org/apache/solr/core/CloudConfig.java b/solr/core/src/java/org/apache/solr/core/CloudConfig.java
index c8efbbb..440f128 100644
--- a/solr/core/src/java/org/apache/solr/core/CloudConfig.java
+++ b/solr/core/src/java/org/apache/solr/core/CloudConfig.java
@@ -48,10 +48,12 @@ public class CloudConfig {
private final String pkiHandlerPublicKeyPath;
+ private final boolean useDistributedClusterStateUpdates;
+
CloudConfig(String zkHost, int zkClientTimeout, int hostPort, String hostName, String hostContext, boolean useGenericCoreNames,
int leaderVoteWait, int leaderConflictResolveWait, String zkCredentialsProviderClass, String zkACLProviderClass,
int createCollectionWaitTimeTillActive, boolean createCollectionCheckLeaderActive, String pkiHandlerPrivateKeyPath,
- String pkiHandlerPublicKeyPath) {
+ String pkiHandlerPublicKeyPath, boolean useDistributedClusterStateUpdates) {
this.zkHost = zkHost;
this.zkClientTimeout = zkClientTimeout;
this.hostPort = hostPort;
@@ -66,6 +68,7 @@ public class CloudConfig {
this.createCollectionCheckLeaderActive = createCollectionCheckLeaderActive;
this.pkiHandlerPrivateKeyPath = pkiHandlerPrivateKeyPath;
this.pkiHandlerPublicKeyPath = pkiHandlerPublicKeyPath;
+ this.useDistributedClusterStateUpdates = useDistributedClusterStateUpdates;
if (this.hostPort == -1)
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "'hostPort' must be configured to run SolrCloud");
@@ -129,6 +132,10 @@ public class CloudConfig {
return pkiHandlerPublicKeyPath;
}
+ public boolean getDistributedClusterStateUpdates() {
+ return useDistributedClusterStateUpdates;
+ }
+
public static class CloudConfigBuilder {
private static final int DEFAULT_ZK_CLIENT_TIMEOUT = 45000;
@@ -151,6 +158,7 @@ public class CloudConfig {
private boolean createCollectionCheckLeaderActive = DEFAULT_CREATE_COLLECTION_CHECK_LEADER_ACTIVE;
private String pkiHandlerPrivateKeyPath;
private String pkiHandlerPublicKeyPath;
+ private boolean useDistributedClusterStateUpdates = false;
public CloudConfigBuilder(String hostName, int hostPort) {
this(hostName, hostPort, null);
@@ -217,10 +225,15 @@ public class CloudConfig {
return this;
}
+ public CloudConfigBuilder setUseDistributedClusterStateUpdates(boolean useDistributedClusterStateUpdates) {
+ this.useDistributedClusterStateUpdates = useDistributedClusterStateUpdates;
+ return this;
+ }
+
public CloudConfig build() {
return new CloudConfig(zkHost, zkClientTimeout, hostPort, hostName, hostContext, useGenericCoreNames, leaderVoteWait,
leaderConflictResolveWait, zkCredentialsProviderClass, zkACLProviderClass, createCollectionWaitTimeTillActive,
- createCollectionCheckLeaderActive, pkiHandlerPrivateKeyPath, pkiHandlerPublicKeyPath);
+ createCollectionCheckLeaderActive, pkiHandlerPrivateKeyPath, pkiHandlerPublicKeyPath, useDistributedClusterStateUpdates);
}
}
}
diff --git a/solr/core/src/java/org/apache/solr/core/SolrXmlConfig.java b/solr/core/src/java/org/apache/solr/core/SolrXmlConfig.java
index b222c27..c922982 100644
--- a/solr/core/src/java/org/apache/solr/core/SolrXmlConfig.java
+++ b/solr/core/src/java/org/apache/solr/core/SolrXmlConfig.java
@@ -485,6 +485,9 @@ public class SolrXmlConfig {
case "pkiHandlerPublicKeyPath":
builder.setPkiHandlerPublicKeyPath(value);
break;
+ case "distributedClusterStateUpdates":
+ builder.setUseDistributedClusterStateUpdates(Boolean.parseBoolean(value));
+ break;
default:
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Unknown configuration parameter in <solrcloud> section of solr.xml: " + name);
}
diff --git a/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java b/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java
index 54aaf28..3c51a6c 100644
--- a/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java
+++ b/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java
@@ -28,6 +28,7 @@ import org.apache.solr.client.solrj.request.CollectionAdminRequest;
import org.apache.solr.client.solrj.request.CoreAdminRequest.RequestSyncShard;
import org.apache.solr.client.solrj.response.RequestStatusState;
import org.apache.solr.client.solrj.util.SolrIdentifierValidator;
+import org.apache.solr.cloud.DistributedClusterStateUpdater;
import org.apache.solr.cloud.OverseerSolrResponse;
import org.apache.solr.cloud.OverseerSolrResponseSerializer;
import org.apache.solr.cloud.OverseerTaskQueue;
@@ -125,13 +126,12 @@ public class CollectionsHandler extends RequestHandlerBase implements Permission
protected final CoreContainer coreContainer;
private final CollectionHandlerApi v2Handler;
+ private final DistributedClusterStateUpdater distributedClusterStateUpdater;
public CollectionsHandler() {
- super();
// Unlike most request handlers, CoreContainer initialization
// should happen in the constructor...
- this.coreContainer = null;
- v2Handler = new CollectionHandlerApi(this);
+ this(null);
}
@@ -143,6 +143,17 @@ public class CollectionsHandler extends RequestHandlerBase implements Permission
public CollectionsHandler(final CoreContainer coreContainer) {
this.coreContainer = coreContainer;
v2Handler = new CollectionHandlerApi(this);
+ // Get the state change factory to know if need to enqueue to Overseer or process distributed.
+ // Some SolrCloud tests do not need Zookeeper and end up with a null cloudConfig in NodeConfig (because
+ // TestHarness.buildTestNodeConfig() uses the zkHost to decide it's SolrCloud).
+ // These tests do not use Zookeeper and do not do state updates (see subclasses of TestBaseStatsCacheCloud).
+ // Some non SolrCloud tests do not even pass a config at all, so let be cautious here (code is not pretty).
+ // We do want to initialize here and not do it lazy to not deal with synchronization for actual prod code.
+ if (coreContainer == null || coreContainer.getConfig() == null || coreContainer.getConfig().getCloudConfig() == null) {
+ distributedClusterStateUpdater = null;
+ } else {
+ distributedClusterStateUpdater = new DistributedClusterStateUpdater(coreContainer.getConfig().getCloudConfig().getDistributedClusterStateUpdates());
+ }
}
@Override
@@ -246,7 +257,8 @@ public class CollectionsHandler extends RequestHandlerBase implements Permission
rsp.setException(exp);
}
- //TODO yuck; shouldn't create-collection at the overseer do this? (conditionally perhaps)
+ // Even if Overseer does wait for the collection to be created, it sees a different cluster state than this node,
+ // so this wait is required to make sure the local node Zookeeper watches fired and now see the collection.
if (action.equals(CollectionAction.CREATE) && asyncId == null) {
if (rsp.getException() == null) {
waitForActiveCollection(zkProps.getStr(NAME), cores, overseerResponse);
@@ -254,8 +266,16 @@ public class CollectionsHandler extends RequestHandlerBase implements Permission
}
} else {
- // submits and doesn't wait for anything (no response)
- coreContainer.getZkController().getOverseer().offerStateUpdate(Utils.toJSON(props));
+ if (distributedClusterStateUpdater.isDistributedStateUpdate()) {
+ DistributedClusterStateUpdater.MutatingCommand command = DistributedClusterStateUpdater.MutatingCommand.getCommandFor(operation.action);
+ ZkNodeProps message = new ZkNodeProps(props);
+ // We do the state change synchronously but do not wait for it to be visible in this node's cluster state updated via ZK watches
+ distributedClusterStateUpdater.doSingleStateUpdate(command, message,
+ coreContainer.getZkController().getSolrCloudManager(), coreContainer.getZkController().getZkStateReader());
+ } else {
+ // submits and doesn't wait for anything (no response)
+ coreContainer.getZkController().getOverseer().offerStateUpdate(Utils.toJSON(props));
+ }
}
}
diff --git a/solr/core/src/java/org/apache/solr/update/processor/DistributedZkUpdateProcessor.java b/solr/core/src/java/org/apache/solr/update/processor/DistributedZkUpdateProcessor.java
index aa7c61e..ece4f53 100644
--- a/solr/core/src/java/org/apache/solr/update/processor/DistributedZkUpdateProcessor.java
+++ b/solr/core/src/java/org/apache/solr/update/processor/DistributedZkUpdateProcessor.java
@@ -33,6 +33,7 @@ import java.util.concurrent.locks.ReentrantLock;
import org.apache.solr.client.solrj.request.UpdateRequest;
import org.apache.solr.cloud.CloudDescriptor;
+import org.apache.solr.cloud.DistributedClusterStateUpdater;
import org.apache.solr.cloud.Overseer;
import org.apache.solr.cloud.ZkController;
import org.apache.solr.cloud.ZkShardTerms;
@@ -48,6 +49,7 @@ import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.cloud.RoutingRule;
import org.apache.solr.common.cloud.Slice;
import org.apache.solr.common.cloud.ZkCoreNodeProps;
+import org.apache.solr.common.cloud.ZkNodeProps;
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.cloud.ZooKeeperException;
import org.apache.solr.common.params.CommonParams;
@@ -97,6 +99,7 @@ public class DistributedZkUpdateProcessor extends DistributedUpdateProcessor {
// this is set to true in the constructor if the next processors in the chain
// are custom and may modify the SolrInputDocument racing with its serialization for replication
private final boolean cloneRequiredOnLeader;
+ private final DistributedClusterStateUpdater distributedClusterStateUpdater;
//used for keeping track of replicas that have processed an add/update from the leader
private RollupRequestReplicationTracker rollupReplicationTracker = null;
@@ -110,6 +113,7 @@ public class DistributedZkUpdateProcessor extends DistributedUpdateProcessor {
CoreContainer cc = req.getCore().getCoreContainer();
cloudDesc = req.getCore().getCoreDescriptor().getCloudDescriptor();
zkController = cc.getZkController();
+ distributedClusterStateUpdater = zkController.getDistributedClusterStateUpdater();
cmdDistrib = new SolrCmdDistributor(cc.getUpdateShardHandler());
cloneRequiredOnLeader = isCloneRequiredOnLeader(next);
collection = cloudDesc.getCollectionName();
@@ -936,7 +940,13 @@ public class DistributedZkUpdateProcessor extends DistributedUpdateProcessor {
ZkStateReader.COLLECTION_PROP, collection,
ZkStateReader.SHARD_ID_PROP, myShardId,
"routeKey", routeKey + "!");
- zkController.getOverseer().offerStateUpdate(Utils.toJSON(map));
+ if (distributedClusterStateUpdater.isDistributedStateUpdate()) {
+ ZkNodeProps message = new ZkNodeProps(map);
+ distributedClusterStateUpdater.doSingleStateUpdate(DistributedClusterStateUpdater.MutatingCommand.SliceRemoveRoutingRule, message,
+ zkController.getOverseer().getSolrCloudManager(), zkController.getOverseer().getZkStateReader());
+ } else {
+ zkController.getOverseer().offerStateUpdate(Utils.toJSON(map));
+ }
} catch (KeeperException e) {
log.warn("Exception while removing routing rule for route key: {}", routeKey, e);
} catch (Exception e) {
diff --git a/solr/core/src/test/org/apache/solr/cloud/CreateCollectionCleanupTest.java b/solr/core/src/test/org/apache/solr/cloud/CreateCollectionCleanupTest.java
index 11e0055..5370713 100644
--- a/solr/core/src/test/org/apache/solr/cloud/CreateCollectionCleanupTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/CreateCollectionCleanupTest.java
@@ -56,6 +56,7 @@ public class CreateCollectionCleanupTest extends SolrCloudTestCase {
" <int name=\"distribUpdateConnTimeout\">${distribUpdateConnTimeout:45000}</int>\n" +
" <int name=\"distribUpdateSoTimeout\">${distribUpdateSoTimeout:340000}</int>\n" +
" <int name=\"createCollectionWaitTimeTillActive\">${createCollectionWaitTimeTillActive:10}</int>\n" +
+ " <str name=\"distributedClusterStateUpdates\">${solr.distributedClusterStateUpdates:false}</str> \n" +
" </solrcloud>\n" +
" \n" +
"</solr>\n";
@@ -66,6 +67,7 @@ public class CreateCollectionCleanupTest extends SolrCloudTestCase {
configureCluster(1)
.addConfig("conf1", TEST_PATH().resolve("configsets").resolve("cloud-minimal").resolve("conf"))
.withSolrXml(CLOUD_SOLR_XML_WITH_10S_CREATE_COLL_WAIT)
+ .useOtherClusterStateUpdateStrategy()
.configure();
}
diff --git a/solr/core/src/test/org/apache/solr/cloud/DeleteReplicaTest.java b/solr/core/src/test/org/apache/solr/cloud/DeleteReplicaTest.java
index e721517..de1d19a 100644
--- a/solr/core/src/test/org/apache/solr/cloud/DeleteReplicaTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/DeleteReplicaTest.java
@@ -77,6 +77,7 @@ public class DeleteReplicaTest extends SolrCloudTestCase {
// these tests need to be isolated, so we dont share the minicluster
configureCluster(4)
.addConfig("conf", configset("cloud-minimal"))
+ .useOtherClusterStateUpdateStrategy() // Some tests (this one) use "the other" cluster state update strategy to increase coverage
.configure();
}
@@ -231,7 +232,8 @@ public class DeleteReplicaTest extends SolrCloudTestCase {
! r.equals(shard.getLeader())));
JettySolrRunner replicaJetty = cluster.getReplicaJetty(replica);
- ZkStateReaderAccessor accessor = new ZkStateReaderAccessor(replicaJetty.getCoreContainer().getZkController().getZkStateReader());
+ ZkController replicaZkController = replicaJetty.getCoreContainer().getZkController();
+ ZkStateReaderAccessor accessor = new ZkStateReaderAccessor(replicaZkController.getZkStateReader());
final long preDeleteWatcherCount = countUnloadCoreOnDeletedWatchers
(accessor.getStateWatchers(collectionName));
@@ -243,7 +245,14 @@ public class DeleteReplicaTest extends SolrCloudTestCase {
ZkStateReader.COLLECTION_PROP, collectionName,
ZkStateReader.CORE_NODE_NAME_PROP, replica.getName());
- cluster.getOpenOverseer().getStateUpdateQueue().offer(Utils.toJSON(m));
+ if (replicaZkController.getDistributedClusterStateUpdater().isDistributedStateUpdate()) {
+ cluster.getOpenOverseer().getDistributedClusterStateUpdater().doSingleStateUpdate(
+ DistributedClusterStateUpdater.MutatingCommand.SliceRemoveReplica, m,
+ cluster.getOpenOverseer().getSolrCloudManager(),
+ cluster.getOpenOverseer().getZkStateReader());
+ } else {
+ cluster.getOpenOverseer().getStateUpdateQueue().offer(Utils.toJSON(m));
+ }
waitForState("Timeout waiting for replica get deleted", collectionName,
(liveNodes, collectionState) -> collectionState.getSlice("shard1").getReplicas().size() == 2);
@@ -301,13 +310,22 @@ public class DeleteReplicaTest extends SolrCloudTestCase {
log.info("Running delete core {}",cd);
try {
+ ZkController replica1ZkController = replica1Jetty.getCoreContainer().getZkController();
ZkNodeProps m = new ZkNodeProps(
Overseer.QUEUE_OPERATION, OverseerAction.DELETECORE.toLower(),
ZkStateReader.CORE_NAME_PROP, replica1.getCoreName(),
ZkStateReader.NODE_NAME_PROP, replica1.getNodeName(),
ZkStateReader.COLLECTION_PROP, collectionName,
ZkStateReader.CORE_NODE_NAME_PROP, replica1.getName());
- cluster.getOpenOverseer().getStateUpdateQueue().offer(Utils.toJSON(m));
+
+ if (replica1ZkController.getDistributedClusterStateUpdater().isDistributedStateUpdate()) {
+ cluster.getOpenOverseer().getDistributedClusterStateUpdater().doSingleStateUpdate(
+ DistributedClusterStateUpdater.MutatingCommand.SliceRemoveReplica, m,
+ cluster.getOpenOverseer().getSolrCloudManager(),
+ cluster.getOpenOverseer().getZkStateReader());
+ } else {
+ cluster.getOpenOverseer().getStateUpdateQueue().offer(Utils.toJSON(m));
+ }
boolean replicaDeleted = false;
TimeOut timeOut = new TimeOut(20, TimeUnit.SECONDS, TimeSource.NANO_TIME);
@@ -345,8 +363,14 @@ public class DeleteReplicaTest extends SolrCloudTestCase {
try {
replica1Jetty.stop();
waitForNodeLeave(replica1JettyNodeName);
- waitForState("Expected replica:"+replica1+" get down", collectionName, (liveNodes, collectionState)
- -> collectionState.getSlice("shard1").getReplica(replica1.getName()).getState() == DOWN);
+
+ // There is a race condition: the replica might be marked down before we get here, in which case we never get notified
+ // So we check before waiting... Not eliminating but significantly reducing the race window - eliminating would require
+ // deeper changes in the code where the watcher is set.
+ if (getCollectionState(collectionName).getSlice("shard1").getReplica(replica1.getName()).getState() != DOWN) {
+ waitForState("Expected replica:" + replica1 + " get down", collectionName, (liveNodes, collectionState)
+ -> collectionState.getSlice("shard1").getReplica(replica1.getName()).getState() == DOWN);
+ }
replica1Jetty.start();
waitingForReplicaGetDeleted.acquire();
} finally {
diff --git a/solr/core/src/test/org/apache/solr/cloud/DeleteShardTest.java b/solr/core/src/test/org/apache/solr/cloud/DeleteShardTest.java
index 9f3f9a4..5c3aa01 100644
--- a/solr/core/src/test/org/apache/solr/cloud/DeleteShardTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/DeleteShardTest.java
@@ -94,13 +94,22 @@ public class DeleteShardTest extends SolrCloudTestCase {
CloudSolrClient client = cluster.getSolrClient();
// TODO can this be encapsulated better somewhere?
- DistributedQueue inQueue = cluster.getJettySolrRunner(0).getCoreContainer().getZkController().getOverseer().getStateUpdateQueue();
Map<String, Object> propMap = new HashMap<>();
propMap.put(Overseer.QUEUE_OPERATION, OverseerAction.UPDATESHARDSTATE.toLower());
propMap.put(slice, state.toString());
propMap.put(ZkStateReader.COLLECTION_PROP, collection);
ZkNodeProps m = new ZkNodeProps(propMap);
- inQueue.offer(Utils.toJSON(m));
+
+ final Overseer overseer = cluster.getOpenOverseer();
+ if (overseer.getDistributedClusterStateUpdater().isDistributedStateUpdate()) {
+ overseer.getDistributedClusterStateUpdater().doSingleStateUpdate(
+ DistributedClusterStateUpdater.MutatingCommand.SliceUpdateShardState, m,
+ cluster.getOpenOverseer().getSolrCloudManager(),
+ cluster.getOpenOverseer().getZkStateReader());
+ } else {
+ DistributedQueue inQueue = cluster.getJettySolrRunner(0).getCoreContainer().getZkController().getOverseer().getStateUpdateQueue();
+ inQueue.offer(Utils.toJSON(m));
+ }
waitForState("Expected shard " + slice + " to be in state " + state.toString(), collection, (n, c) -> {
return c.getSlice(slice).getState() == state;
diff --git a/solr/core/src/test/org/apache/solr/cloud/MockSolrSource.java b/solr/core/src/test/org/apache/solr/cloud/MockSolrSource.java
index 7281396..093fe1c 100644
--- a/solr/core/src/test/org/apache/solr/cloud/MockSolrSource.java
+++ b/solr/core/src/test/org/apache/solr/cloud/MockSolrSource.java
@@ -26,8 +26,17 @@ public class MockSolrSource {
public static ZkController makeSimpleMock(Overseer overseer, ZkStateReader reader, SolrZkClient zkClient) {
ZkController zkControllerMock = mock(ZkController.class);
- if (overseer == null) overseer = mock(Overseer.class);
-
+ final DistributedClusterStateUpdater distributedClusterStateUpdater;
+ if (overseer == null) {
+ // When no overseer is passed, the Overseer queue does nothing. Replicate this in how we handle distributed state
+ // updates by doing nothing as well...
+ distributedClusterStateUpdater = mock(DistributedClusterStateUpdater.class);
+ overseer = mock(Overseer.class);
+ when(overseer.getDistributedClusterStateUpdater()).thenReturn(distributedClusterStateUpdater);
+ } else {
+ // Use the same configuration for state updates as the Overseer.
+ distributedClusterStateUpdater = overseer.getDistributedClusterStateUpdater();
+ }
if (reader != null && zkClient == null) {
zkClient = reader.getZkClient();
@@ -38,11 +47,11 @@ public class MockSolrSource {
when(reader.getZkClient()).thenReturn(zkClient);
}
-
when(zkControllerMock.getOverseer()).thenReturn(overseer);
when(zkControllerMock.getZkStateReader()).thenReturn(reader);
when(zkControllerMock.getZkClient()).thenReturn(zkClient);
when(zkControllerMock.getOverseer()).thenReturn(overseer);
+ when(zkControllerMock.getDistributedClusterStateUpdater()).thenReturn(distributedClusterStateUpdater);
return zkControllerMock;
}
}
diff --git a/solr/core/src/test/org/apache/solr/cloud/OverseerCollectionConfigSetProcessorTest.java b/solr/core/src/test/org/apache/solr/cloud/OverseerCollectionConfigSetProcessorTest.java
index 40538c5..132d069 100644
--- a/solr/core/src/test/org/apache/solr/cloud/OverseerCollectionConfigSetProcessorTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/OverseerCollectionConfigSetProcessorTest.java
@@ -107,6 +107,8 @@ public class OverseerCollectionConfigSetProcessorTest extends SolrTestCaseJ4 {
private static OverseerTaskQueue workQueueMock;
private static OverseerTaskQueue stateUpdateQueueMock;
private static Overseer overseerMock;
+ private static DistributedClusterStateUpdater distributedClusterStateUpdater;
+ private static DistributedClusterStateUpdater.StateChangeRecorder stateChangeRecorder;
private static ZkController zkControllerMock;
private static SolrCloudManager cloudDataProviderMock;
private static ClusterStateProvider clusterStateProviderMock;
@@ -152,7 +154,7 @@ public class OverseerCollectionConfigSetProcessorTest extends SolrTestCaseJ4 {
DistributedMap completedMap,
DistributedMap failureMap,
SolrMetricsContext solrMetricsContext) {
- super(zkStateReader, myId, shardHandlerFactory, adminPath, new Stats(), overseer, new OverseerNodePrioritizer(zkStateReader, overseer.getStateUpdateQueue(), adminPath, shardHandlerFactory), workQueue, runningMap, completedMap, failureMap, solrMetricsContext);
+ super(zkStateReader, myId, shardHandlerFactory, adminPath, new Stats(), overseer, new OverseerNodePrioritizer(zkStateReader, overseer, adminPath, shardHandlerFactory), workQueue, runningMap, completedMap, failureMap, solrMetricsContext);
}
@Override
@@ -177,6 +179,8 @@ public class OverseerCollectionConfigSetProcessorTest extends SolrTestCaseJ4 {
clusterStateMock = mock(ClusterState.class);
solrZkClientMock = mock(SolrZkClient.class);
overseerMock = mock(Overseer.class);
+ distributedClusterStateUpdater = mock(DistributedClusterStateUpdater.class);
+ stateChangeRecorder = mock(DistributedClusterStateUpdater.StateChangeRecorder.class);
zkControllerMock = mock(ZkController.class);
cloudDataProviderMock = mock(SolrCloudManager.class);
objectCache = new ObjectCache();
@@ -204,6 +208,8 @@ public class OverseerCollectionConfigSetProcessorTest extends SolrTestCaseJ4 {
clusterStateMock = null;
solrZkClientMock = null;
overseerMock = null;
+ distributedClusterStateUpdater = null;
+ stateChangeRecorder = null;
zkControllerMock = null;
cloudDataProviderMock = null;
clusterStateProviderMock = null;
@@ -232,6 +238,8 @@ public class OverseerCollectionConfigSetProcessorTest extends SolrTestCaseJ4 {
reset(clusterStateMock);
reset(solrZkClientMock);
reset(overseerMock);
+ reset(distributedClusterStateUpdater);
+ reset(stateChangeRecorder);
reset(zkControllerMock);
reset(cloudDataProviderMock);
objectCache.clear();
@@ -259,7 +267,7 @@ public class OverseerCollectionConfigSetProcessorTest extends SolrTestCaseJ4 {
}
@SuppressWarnings("unchecked")
- protected Set<String> commonMocks(int liveNodesCount) throws Exception {
+ protected Set<String> commonMocks(int liveNodesCount, boolean distributedClusterStateUpdates) throws Exception {
when(shardHandlerFactoryMock.getShardHandler()).thenReturn(shardHandlerMock);
when(workQueueMock.peekTopN(anyInt(), any(), anyLong())).thenAnswer(invocation -> {
Object result;
@@ -379,6 +387,8 @@ public class OverseerCollectionConfigSetProcessorTest extends SolrTestCaseJ4 {
when(overseerMock.getZkController()).thenReturn(zkControllerMock);
when(overseerMock.getSolrCloudManager()).thenReturn(cloudDataProviderMock);
when(overseerMock.getCoreContainer()).thenReturn(coreContainerMock);
+ when(overseerMock.getDistributedClusterStateUpdater()).thenReturn(distributedClusterStateUpdater);
+ when(distributedClusterStateUpdater.createStateChangeRecorder(any(), anyBoolean())).thenReturn(stateChangeRecorder);
when(coreContainerMock.getUpdateShardHandler()).thenReturn(updateShardHandlerMock);
when(coreContainerMock.getPlacementPluginFactory()).thenReturn(placementPluginFactoryMock);
when(updateShardHandlerMock.getDefaultHttpClient()).thenReturn(httpClientMock);
@@ -450,21 +460,45 @@ public class OverseerCollectionConfigSetProcessorTest extends SolrTestCaseJ4 {
when(overseerMock.getStateUpdateQueue(any())).thenReturn(stateUpdateQueueMock);
when(overseerMock.getStateUpdateQueue()).thenReturn(stateUpdateQueueMock);
-
- Mockito.doAnswer(
- new Answer<Void>() {
- public Void answer(InvocationOnMock invocation) {
- try {
- handleCreateCollMessage(invocation.getArgument(0));
- stateUpdateQueueMock.offer(invocation.getArgument(0));
- } catch (KeeperException e) {
- throw new RuntimeException(e);
- } catch (InterruptedException e) {
- throw new RuntimeException(e);
+
+ // Selecting the cluster state update strategy: Overseer when distributedClusterStateUpdates is false, otherwise distributed updates.
+ when(distributedClusterStateUpdater.isDistributedStateUpdate()).thenReturn(distributedClusterStateUpdates);
+
+ if (distributedClusterStateUpdates) {
+ // Mocking for state change via distributed updates. There are two types of updates done in CreateCollectionCmd:
+ // 1. Single line recording and executing a command
+ Mockito.doAnswer(
+ new Answer<Void>() {
+ public Void answer(InvocationOnMock invocation) {
+ handleCreateCollMessageProps(invocation.getArgument(1));
+ return null;
+ }}).when(distributedClusterStateUpdater).doSingleStateUpdate(any(), any(), any(), any());
+
+ // 2. Recording a command to be executed as part of a batch of commands
+ Mockito.doAnswer(
+ new Answer<Void>() {
+ public Void answer(InvocationOnMock invocation) {
+ handleCreateCollMessageProps(invocation.getArgument(1));
+ return null;
+ }}).when(stateChangeRecorder).record(any(), any());
+ } else {
+ // Mocking for state change via the Overseer queue
+ Mockito.doAnswer(
+ new Answer<Void>() {
+ public Void answer(InvocationOnMock invocation) {
+ try {
+ handleCreateCollMessage(invocation.getArgument(0));
+ stateUpdateQueueMock.offer(invocation.getArgument(0));
+ } catch (KeeperException e) {
+ throw new RuntimeException(e);
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ return null;
}
- return null;
- }}).when(overseerMock).offerStateUpdate(any());
-
+ }).when(overseerMock).offerStateUpdate(any());
+ }
+
when(zkControllerMock.getZkClient()).thenReturn(solrZkClientMock);
when(cloudManagerMock.getDistribStateManager()).thenReturn(distribStateManagerMock);
@@ -520,9 +554,12 @@ public class OverseerCollectionConfigSetProcessorTest extends SolrTestCaseJ4 {
}
private void handleCreateCollMessage(byte[] bytes) {
+ handleCreateCollMessageProps(ZkNodeProps.load(bytes));
+ }
+
+ private void handleCreateCollMessageProps(ZkNodeProps props) {
log.info("track created replicas / collections");
try {
- ZkNodeProps props = ZkNodeProps.load(bytes);
if (CollectionParams.CollectionAction.CREATE.isEqual(props.getStr("operation"))) {
String collName = props.getStr("name");
if (collName != null) collectionsSet.put(collName, new ClusterState.CollectionRef(
@@ -732,12 +769,11 @@ public class OverseerCollectionConfigSetProcessorTest extends SolrTestCaseJ4 {
SEND_NULL
}
protected void testTemplate(Integer numberOfNodes, Integer numberOfNodesToCreateOn, CreateNodeListOptions createNodeListOption, Integer replicationFactor,
- Integer numberOfSlices,
- boolean collectionExceptedToBeCreated) throws Exception {
+ Integer numberOfSlices, boolean collectionExceptedToBeCreated, boolean distributedClusterStateUpdates) throws Exception {
assertTrue("Wrong usage of testTemplate. numberOfNodesToCreateOn " + numberOfNodesToCreateOn + " is not allowed to be higher than numberOfNodes " + numberOfNodes, numberOfNodes.intValue() >= numberOfNodesToCreateOn.intValue());
assertTrue("Wrong usage of testTemplage. createNodeListOption has to be " + CreateNodeListOptions.SEND + " when numberOfNodes and numberOfNodesToCreateOn are unequal", ((createNodeListOption == CreateNodeListOptions.SEND) || (numberOfNodes.intValue() == numberOfNodesToCreateOn.intValue())));
- Set<String> liveNodes = commonMocks(numberOfNodes);
+ Set<String> liveNodes = commonMocks(numberOfNodes, distributedClusterStateUpdates);
List<String> createNodeList = new ArrayList<>();
int i = 0;
for (String node : liveNodes) {
@@ -774,140 +810,250 @@ public class OverseerCollectionConfigSetProcessorTest extends SolrTestCaseJ4 {
createNodeList, dontShuffleCreateNodeSet);
}
}
- @Test
- public void testNoReplicationEqualNumberOfSlicesPerNode() throws Exception {
+
+ // Tests below are being run twice: once with Overseer based updates and once with distributed updates.
+ // This is done explicitly here because these tests use mocks than can be configured directly.
+ // Tests not using mocks (most other tests) but using the MiniSolrCloudCluster are randomized to sometimes use Overseer
+ // and sometimes distributed state updates (but not both for a given test and a given test seed).
+ // See the SolrCloudTestCase.Builder constructor and the rest of the Builder class.
+
+ @Test
+ public void testNoReplicationEqualNumberOfSlicesPerNodeOverseer() throws Exception {
+ testNoReplicationEqualNumberOfSlicesPerNodeInternal(false);
+ }
+
+ @Test
+ public void testNoReplicationEqualNumberOfSlicesPerNodeDistributedUpdates() throws Exception {
+ testNoReplicationEqualNumberOfSlicesPerNodeInternal(true);
+ }
+
+ private void testNoReplicationEqualNumberOfSlicesPerNodeInternal(boolean distributedClusterStateUpdates) throws Exception {
Integer numberOfNodes = 4;
Integer numberOfNodesToCreateOn = 4;
CreateNodeListOptions createNodeListOptions = CreateNodeListOptions.DONT_SEND;
Integer replicationFactor = 1;
Integer numberOfSlices = 8;
testTemplate(numberOfNodes, numberOfNodesToCreateOn, createNodeListOptions, replicationFactor, numberOfSlices,
- true);
+ true, distributedClusterStateUpdates);
+ }
+
+ @Test
+ public void testReplicationEqualNumberOfSlicesPerNodeOverseer() throws Exception {
+ testReplicationEqualNumberOfSlicesPerNodeInternal(false);
}
-
@Test
- public void testReplicationEqualNumberOfSlicesPerNode() throws Exception {
+ public void testReplicationEqualNumberOfSlicesPerNodeDistributedUpdates() throws Exception {
+ testReplicationEqualNumberOfSlicesPerNodeInternal(true);
+ }
+
+ private void testReplicationEqualNumberOfSlicesPerNodeInternal(boolean distributedClusterStateUpdates) throws Exception {
Integer numberOfNodes = 4;
Integer numberOfNodesToCreateOn = 4;
CreateNodeListOptions createNodeListOptions = CreateNodeListOptions.DONT_SEND;
Integer replicationFactor = 2;
Integer numberOfSlices = 4;
testTemplate(numberOfNodes, numberOfNodesToCreateOn, createNodeListOptions, replicationFactor, numberOfSlices,
- true);
+ true, distributedClusterStateUpdates);
}
-
+
+ @Test
+ public void testNoReplicationEqualNumberOfSlicesPerNodeSendCreateNodesEqualToLiveNodesOverseer() throws Exception {
+ testNoReplicationEqualNumberOfSlicesPerNodeSendCreateNodesEqualToLiveNodesInternal(false);
+ }
+
@Test
- public void testNoReplicationEqualNumberOfSlicesPerNodeSendCreateNodesEqualToLiveNodes() throws Exception {
+ public void testNoReplicationEqualNumberOfSlicesPerNodeSendCreateNodesEqualToLiveNodesDistributedUpdates() throws Exception {
+ testNoReplicationEqualNumberOfSlicesPerNodeSendCreateNodesEqualToLiveNodesInternal(true);
+ }
+
+ private void testNoReplicationEqualNumberOfSlicesPerNodeSendCreateNodesEqualToLiveNodesInternal(boolean distributedClusterStateUpdates) throws Exception {
Integer numberOfNodes = 4;
Integer numberOfNodesToCreateOn = 4;
CreateNodeListOptions createNodeListOptions = CreateNodeListOptions.SEND;
Integer replicationFactor = 1;
Integer numberOfSlices = 8;
testTemplate(numberOfNodes, numberOfNodesToCreateOn, createNodeListOptions, replicationFactor, numberOfSlices,
- true);
+ true, distributedClusterStateUpdates);
}
-
+
@Test
- public void testReplicationEqualNumberOfSlicesPerNodeSendCreateNodesEqualToLiveNodes() throws Exception {
+ public void testReplicationEqualNumberOfSlicesPerNodeSendCreateNodesEqualToLiveNodesOverseer() throws Exception {
+ testReplicationEqualNumberOfSlicesPerNodeSendCreateNodesEqualToLiveNodesInternal(false);
+ }
+
+ @Test
+ public void testReplicationEqualNumberOfSlicesPerNodeSendCreateNodesEqualToLiveNodesDistributedUpdates() throws Exception {
+ testReplicationEqualNumberOfSlicesPerNodeSendCreateNodesEqualToLiveNodesInternal(true);
+ }
+
+ private void testReplicationEqualNumberOfSlicesPerNodeSendCreateNodesEqualToLiveNodesInternal(boolean distributedClusterStateUpdates) throws Exception {
Integer numberOfNodes = 4;
Integer numberOfNodesToCreateOn = 4;
CreateNodeListOptions createNodeListOptions = CreateNodeListOptions.SEND;
Integer replicationFactor = 2;
Integer numberOfSlices = 4;
testTemplate(numberOfNodes, numberOfNodesToCreateOn, createNodeListOptions, replicationFactor, numberOfSlices,
- true);
+ true, distributedClusterStateUpdates);
}
-
+
+ @Test
+ public void testNoReplicationEqualNumberOfSlicesPerNodeSendNullCreateNodesOverseer() throws Exception {
+ testNoReplicationEqualNumberOfSlicesPerNodeSendNullCreateNodesInternal(false);
+ }
+
@Test
- public void testNoReplicationEqualNumberOfSlicesPerNodeSendNullCreateNodes() throws Exception {
+ public void testNoReplicationEqualNumberOfSlicesPerNodeSendNullCreateNodesDistributedUpdates() throws Exception {
+ testNoReplicationEqualNumberOfSlicesPerNodeSendNullCreateNodesInternal(true);
+ }
+
+ private void testNoReplicationEqualNumberOfSlicesPerNodeSendNullCreateNodesInternal(boolean distributedClusterStateUpdates) throws Exception {
Integer numberOfNodes = 4;
Integer numberOfNodesToCreateOn = 4;
CreateNodeListOptions createNodeListOptions = CreateNodeListOptions.SEND_NULL;
Integer replicationFactor = 1;
Integer numberOfSlices = 8;
testTemplate(numberOfNodes, numberOfNodesToCreateOn, createNodeListOptions, replicationFactor, numberOfSlices,
- true);
+ true, distributedClusterStateUpdates);
}
-
+
+ @Test
+ public void testReplicationEqualNumberOfSlicesPerNodeSendNullCreateNodesOverseer() throws Exception {
+ testReplicationEqualNumberOfSlicesPerNodeSendNullCreateNodesInternal(false);
+ }
+
@Test
- public void testReplicationEqualNumberOfSlicesPerNodeSendNullCreateNodes() throws Exception {
+ public void testReplicationEqualNumberOfSlicesPerNodeSendNullCreateNodesDistributedUpdates() throws Exception {
+ testReplicationEqualNumberOfSlicesPerNodeSendNullCreateNodesInternal(true);
+ }
+
+ private void testReplicationEqualNumberOfSlicesPerNodeSendNullCreateNodesInternal(boolean distributedClusterStateUpdates) throws Exception {
Integer numberOfNodes = 4;
Integer numberOfNodesToCreateOn = 4;
CreateNodeListOptions createNodeListOptions = CreateNodeListOptions.SEND_NULL;
Integer replicationFactor = 2;
Integer numberOfSlices = 4;
testTemplate(numberOfNodes, numberOfNodesToCreateOn, createNodeListOptions, replicationFactor, numberOfSlices,
- true);
- }
-
+ true, distributedClusterStateUpdates);
+ }
+
+ @Test
+ public void testNoReplicationUnequalNumberOfSlicesPerNodeOverseer() throws Exception {
+ testNoReplicationUnequalNumberOfSlicesPerNodeInternal(false);
+ }
+
@Test
- public void testNoReplicationUnequalNumberOfSlicesPerNode() throws Exception {
+ public void testNoReplicationUnequalNumberOfSlicesPerNodeDistributedUpdates() throws Exception {
+ testNoReplicationUnequalNumberOfSlicesPerNodeInternal(true);
+ }
+
+ private void testNoReplicationUnequalNumberOfSlicesPerNodeInternal(boolean distributedClusterStateUpdates) throws Exception {
Integer numberOfNodes = 4;
Integer numberOfNodesToCreateOn = 4;
CreateNodeListOptions createNodeListOptions = CreateNodeListOptions.DONT_SEND;
Integer replicationFactor = 1;
Integer numberOfSlices = 6;
testTemplate(numberOfNodes, numberOfNodesToCreateOn, createNodeListOptions, replicationFactor, numberOfSlices,
- true);
+ true, distributedClusterStateUpdates);
}
-
+
+ @Test
+ public void testReplicationUnequalNumberOfSlicesPerNodeOverseer() throws Exception {
+ testReplicationUnequalNumberOfSlicesPerNodeInternal(false);
+ }
+
@Test
- public void testReplicationUnequalNumberOfSlicesPerNode() throws Exception {
+ public void testReplicationUnequalNumberOfSlicesPerNodeDistributedUpdates() throws Exception {
+ testReplicationUnequalNumberOfSlicesPerNodeInternal(true);
+ }
+
+ private void testReplicationUnequalNumberOfSlicesPerNodeInternal(boolean distributedClusterStateUpdates) throws Exception {
Integer numberOfNodes = 4;
Integer numberOfNodesToCreateOn = 4;
CreateNodeListOptions createNodeListOptions = CreateNodeListOptions.DONT_SEND;
Integer replicationFactor = 2;
Integer numberOfSlices = 3;
testTemplate(numberOfNodes, numberOfNodesToCreateOn, createNodeListOptions, replicationFactor, numberOfSlices,
- true);
+ true, distributedClusterStateUpdates);
}
-
+
@Test
- public void testNoReplicationLimitedNodesToCreateOn()
- throws Exception {
+ public void testNoReplicationLimitedNodesToCreateOnOverseer() throws Exception {
+ testNoReplicationLimitedNodesToCreateOnInternal(false);
+ }
+
+ @Test
+ public void testNoReplicationLimitedNodesToCreateOnDistributedUpdates() throws Exception {
+ testNoReplicationLimitedNodesToCreateOnInternal(true);
+ }
+
+ private void testNoReplicationLimitedNodesToCreateOnInternal(boolean distributedClusterStateUpdates) throws Exception {
Integer numberOfNodes = 4;
Integer numberOfNodesToCreateOn = 2;
CreateNodeListOptions createNodeListOptions = CreateNodeListOptions.SEND;
Integer replicationFactor = 1;
Integer numberOfSlices = 6;
testTemplate(numberOfNodes, numberOfNodesToCreateOn, createNodeListOptions, replicationFactor, numberOfSlices,
- true);
+ true, distributedClusterStateUpdates);
}
-
+
+ @Test
+ public void testReplicationLimitedNodesToCreateOnOverseer() throws Exception {
+ testReplicationLimitedNodesToCreateOnInternal(false);
+ }
+
@Test
- public void testReplicationLimitedNodesToCreateOn()
- throws Exception {
+ public void testReplicationLimitedNodesToCreateOnDistributedUpdates() throws Exception {
+ testReplicationLimitedNodesToCreateOnInternal(true);
+ }
+
+ private void testReplicationLimitedNodesToCreateOnInternal(boolean distributedClusterStateUpdates) throws Exception {
Integer numberOfNodes = 4;
Integer numberOfNodesToCreateOn = 2;
CreateNodeListOptions createNodeListOptions = CreateNodeListOptions.SEND;
Integer replicationFactor = 2;
Integer numberOfSlices = 3;
testTemplate(numberOfNodes, numberOfNodesToCreateOn, createNodeListOptions, replicationFactor, numberOfSlices,
- true);
+ true, distributedClusterStateUpdates);
}
@Test
- public void testNoReplicationCollectionNotCreatedDueToMaxShardsPerNodeAndNodesToCreateOnLimits()
- throws Exception {
+ public void testNoReplicationCollectionNotCreatedDueToMaxShardsPerNodeAndNodesToCreateOnLimitsOverseer() throws Exception {
+ testNoReplicationCollectionNotCreatedDueToMaxShardsPerNodeAndNodesToCreateOnLimitsInternal(false);
+ }
+
+ @Test
+ public void testNoReplicationCollectionNotCreatedDueToMaxShardsPerNodeAndNodesToCreateOnLimitsDistributedUpdates() throws Exception {
+ testNoReplicationCollectionNotCreatedDueToMaxShardsPerNodeAndNodesToCreateOnLimitsInternal(true);
+ }
+
+ private void testNoReplicationCollectionNotCreatedDueToMaxShardsPerNodeAndNodesToCreateOnLimitsInternal(boolean distributedClusterStateUpdates) throws Exception {
Integer numberOfNodes = 4;
Integer numberOfNodesToCreateOn = 3;
CreateNodeListOptions createNodeListOptions = CreateNodeListOptions.SEND;
Integer replicationFactor = 1;
Integer numberOfSlices = 8;
testTemplate(numberOfNodes, numberOfNodesToCreateOn, createNodeListOptions, replicationFactor, numberOfSlices,
- false);
+ false, distributedClusterStateUpdates);
}
-
+
+ @Test
+ public void testReplicationCollectionNotCreatedDueToMaxShardsPerNodeAndNodesToCreateOnLimitsOverseer() throws Exception {
+ testReplicationCollectionNotCreatedDueToMaxShardsPerNodeAndNodesToCreateOnLimitsInternal(false);
+ }
+
@Test
- public void testReplicationCollectionNotCreatedDueToMaxShardsPerNodeAndNodesToCreateOnLimits()
- throws Exception {
+ public void testReplicationCollectionNotCreatedDueToMaxShardsPerNodeAndNodesToCreateOnLimitsDistributedUpdates() throws Exception {
+ testReplicationCollectionNotCreatedDueToMaxShardsPerNodeAndNodesToCreateOnLimitsInternal(true);
+ }
+
+ private void testReplicationCollectionNotCreatedDueToMaxShardsPerNodeAndNodesToCreateOnLimitsInternal(boolean distributedClusterStateUpdates) throws Exception {
Integer numberOfNodes = 4;
Integer numberOfNodesToCreateOn = 3;
CreateNodeListOptions createNodeListOptions = CreateNodeListOptions.SEND;
Integer replicationFactor = 2;
Integer numberOfSlices = 4;
testTemplate(numberOfNodes, numberOfNodesToCreateOn, createNodeListOptions, replicationFactor, numberOfSlices,
- false);
+ false, distributedClusterStateUpdates);
}
}
diff --git a/solr/core/src/test/org/apache/solr/cloud/OverseerRolesTest.java b/solr/core/src/test/org/apache/solr/cloud/OverseerRolesTest.java
index e6d26f9..05974a3 100644
--- a/solr/core/src/test/org/apache/solr/cloud/OverseerRolesTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/OverseerRolesTest.java
@@ -26,10 +26,7 @@ import java.util.function.Predicate;
import org.apache.solr.client.solrj.embedded.JettySolrRunner;
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
-import org.apache.solr.cloud.overseer.OverseerAction;
-import org.apache.solr.common.cloud.ZkNodeProps;
import org.apache.solr.common.util.TimeSource;
-import org.apache.solr.common.util.Utils;
import org.apache.solr.util.TimeOut;
import org.apache.zookeeper.KeeperException;
import org.junit.After;
@@ -157,9 +154,7 @@ public class OverseerRolesTest extends SolrCloudTestCase {
String leaderId = OverseerCollectionConfigSetProcessor.getLeaderId(zkClient());
String leader = OverseerCollectionConfigSetProcessor.getLeaderNode(zkClient());
log.info("### Sending QUIT to overseer {}", leader);
- getOverseerJetty().getCoreContainer().getZkController().getOverseer().getStateUpdateQueue()
- .offer(Utils.toJSON(new ZkNodeProps(Overseer.QUEUE_OPERATION, OverseerAction.QUIT.toLower(),
- "id", leaderId)));
+ getOverseerJetty().getCoreContainer().getZkController().getOverseer().sendQuitToOverseer(leaderId);
waitForNewOverseer(15, s -> Objects.equals(leader, s) == false, false);
diff --git a/solr/core/src/test/org/apache/solr/cloud/OverseerStatusTest.java b/solr/core/src/test/org/apache/solr/cloud/OverseerStatusTest.java
index 7293035..671ff3d 100644
--- a/solr/core/src/test/org/apache/solr/cloud/OverseerStatusTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/OverseerStatusTest.java
@@ -49,8 +49,11 @@ public class OverseerStatusTest extends SolrCloudTestCase {
SimpleOrderedMap<Object> createcollection
= (SimpleOrderedMap<Object>) collection_operations.get(CollectionParams.CollectionAction.CREATE.toLower());
assertEquals("No stats for create in OverseerCollectionProcessor", numCollectionCreates + 1, createcollection.get("requests"));
- createcollection = (SimpleOrderedMap<Object>) overseer_operations.get(CollectionParams.CollectionAction.CREATE.toLower());
- assertEquals("No stats for create in Overseer", numOverseerCreates + 1, createcollection.get("requests"));
+ // When cluster state updates are distributed, Overseer doesn't see the updates and doesn't report stats on them.
+ if (!cluster.getOpenOverseer().getDistributedClusterStateUpdater().isDistributedStateUpdate()) {
+ createcollection = (SimpleOrderedMap<Object>) overseer_operations.get(CollectionParams.CollectionAction.CREATE.toLower());
+ assertEquals("No stats for create in Overseer", numOverseerCreates + 1, createcollection.get("requests"));
+ }
// Reload the collection
CollectionAdminRequest.reloadCollection(collectionName).process(cluster.getSolrClient());
@@ -81,19 +84,21 @@ public class OverseerStatusTest extends SolrCloudTestCase {
assertNotNull(amIleader.get("errors"));
assertNotNull(amIleader.get("avgTimePerRequest"));
- amIleader = (SimpleOrderedMap<Object>) overseer_operations.get("am_i_leader");
- assertNotNull("Overseer amILeader stats should not be null", amIleader);
- assertNotNull(amIleader.get("requests"));
- assertTrue(Integer.parseInt(amIleader.get("requests").toString()) > 0);
- assertNotNull(amIleader.get("errors"));
- assertNotNull(amIleader.get("avgTimePerRequest"));
-
- SimpleOrderedMap<Object> updateState = (SimpleOrderedMap<Object>) overseer_operations.get("update_state");
- assertNotNull("Overseer update_state stats should not be null", updateState);
- assertNotNull(updateState.get("requests"));
- assertTrue(Integer.parseInt(updateState.get("requests").toString()) > 0);
- assertNotNull(updateState.get("errors"));
- assertNotNull(updateState.get("avgTimePerRequest"));
+ // When cluster state updates are distributed, Overseer doesn't see the updates and doesn't report stats on them.
+ if (!cluster.getOpenOverseer().getDistributedClusterStateUpdater().isDistributedStateUpdate()) {
+ amIleader = (SimpleOrderedMap<Object>) overseer_operations.get("am_i_leader");
+ assertNotNull("Overseer amILeader stats should not be null", amIleader);
+ assertNotNull(amIleader.get("requests"));
+ assertTrue(Integer.parseInt(amIleader.get("requests").toString()) > 0);
+ assertNotNull(amIleader.get("errors"));
+ assertNotNull(amIleader.get("avgTimePerRequest"));
+ SimpleOrderedMap<Object> updateState = (SimpleOrderedMap<Object>) overseer_operations.get("update_state");
+ assertNotNull("Overseer update_state stats should not be null", updateState);
+ assertNotNull(updateState.get("requests"));
+ assertTrue(Integer.parseInt(updateState.get("requests").toString()) > 0);
+ assertNotNull(updateState.get("errors"));
+ assertNotNull(updateState.get("avgTimePerRequest"));
+ }
}
}
diff --git a/solr/core/src/test/org/apache/solr/cloud/OverseerTest.java b/solr/core/src/test/org/apache/solr/cloud/OverseerTest.java
index 1632617..a7c8e48 100644
--- a/solr/core/src/test/org/apache/solr/cloud/OverseerTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/OverseerTest.java
@@ -188,7 +188,10 @@ public class OverseerTest extends SolrTestCaseJ4 {
ZkStateReader.REPLICATION_FACTOR, "1",
ZkStateReader.NUM_SHARDS_PROP, Integer.toString(numShards),
"createNodeSet", "");
- ZkDistributedQueue q = MiniSolrCloudCluster.getOpenOverseer(overseers).getStateUpdateQueue();
+ final Overseer overseer = MiniSolrCloudCluster.getOpenOverseer(overseers);
+ // This being an Overseer test, we force it to use the Overseer based cluster state update. Look for "new Overseer" calls in this class.
+ assertFalse(overseer.getDistributedClusterStateUpdater().isDistributedStateUpdate());
+ ZkDistributedQueue q = overseer.getStateUpdateQueue();
q.offer(Utils.toJSON(m));
}
@@ -204,6 +207,7 @@ public class OverseerTest extends SolrTestCaseJ4 {
ZkStateReader.CORE_NAME_PROP, coreName,
ZkStateReader.CORE_NODE_NAME_PROP, coreNodeName,
ZkStateReader.COLLECTION_PROP, collection);
+ assertFalse(overseer.getDistributedClusterStateUpdater().isDistributedStateUpdate());
ZkDistributedQueue q = overseer.getStateUpdateQueue();
q.offer(Utils.toJSON(m));
return null;
@@ -387,7 +391,7 @@ public class OverseerTest extends SolrTestCaseJ4 {
ZkStateReader.REPLICATION_FACTOR, "1",
ZkStateReader.NUM_SHARDS_PROP, Integer.toString(numShards),
"createNodeSet", "");
- ZkDistributedQueue q = overseers.get(0).getStateUpdateQueue();
+ ZkDistributedQueue q = getOverseerZero().getStateUpdateQueue();
q.offer(Utils.toJSON(m));
}
@@ -530,7 +534,7 @@ public class OverseerTest extends SolrTestCaseJ4 {
ZkStateReader.NODE_NAME_PROP, "127.0.0.1:8983_solr");
List<ZkWriteCommand> commands = new NodeMutator().downNode(reader.getClusterState(), m);
- ZkDistributedQueue q = overseers.get(0).getStateUpdateQueue();
+ ZkDistributedQueue q = getOverseerZero().getStateUpdateQueue();
q.offer(Utils.toJSON(m));
@@ -586,7 +590,7 @@ public class OverseerTest extends SolrTestCaseJ4 {
overseerClient = electNewOverseer(server.getZkAddress());
- ZkDistributedQueue q = overseers.get(0).getStateUpdateQueue();
+ ZkDistributedQueue q = getOverseerZero().getStateUpdateQueue();
createCollection(COLLECTION, 1);
@@ -640,7 +644,15 @@ public class OverseerTest extends SolrTestCaseJ4 {
}
private Overseer getOpenOverseer() {
- return MiniSolrCloudCluster.getOpenOverseer(overseers);
+ Overseer overseer = MiniSolrCloudCluster.getOpenOverseer(overseers);
+ assertFalse(overseer.getDistributedClusterStateUpdater().isDistributedStateUpdate());
+ return overseer;
+ }
+
+ private Overseer getOverseerZero() {
+ Overseer overseer = overseers.get(0);
+ assertFalse(overseer.getDistributedClusterStateUpdater().isDistributedStateUpdate());
+ return overseer;
}
@Test
@@ -739,7 +751,7 @@ public class OverseerTest extends SolrTestCaseJ4 {
httpShardHandlerFactory.init(new PluginInfo("shardHandlerFactory", Collections.emptyMap()));
httpShardHandlerFactorys.add(httpShardHandlerFactory);
Overseer overseer = new Overseer((HttpShardHandler) httpShardHandlerFactory.getShardHandler(), updateShardHandler, "/admin/cores", reader, zkController,
- new CloudConfig.CloudConfigBuilder("127.0.0.1", 8983, "").build());
+ new CloudConfig.CloudConfigBuilder("127.0.0.1", 8983, "").setUseDistributedClusterStateUpdates(false).build());
overseers.add(overseer);
ElectionContext ec = new OverseerElectionContext(zkClient, overseer,
server.getZkAddress().replaceAll("/", "_"));
@@ -897,7 +909,7 @@ public class OverseerTest extends SolrTestCaseJ4 {
electNewOverseer(server.getZkAddress());
- // Create collection znode before repeatedly trying to enqueue the Cluster state change message
+ // Create collection znode before repeatedly trying to enqueue the cluster state update message
zkClient.makePath(ZkStateReader.COLLECTIONS_ZKNODE + "/" + COLLECTION, true);
for (int i = 0; i < atLeast(4); i++) {
@@ -914,7 +926,7 @@ public class OverseerTest extends SolrTestCaseJ4 {
ZkStateReader.REPLICATION_FACTOR, "1",
ZkStateReader.NUM_SHARDS_PROP, "1",
"createNodeSet", "");
- ZkDistributedQueue q = MiniSolrCloudCluster.getOpenOverseer(overseers).getStateUpdateQueue();
+ ZkDistributedQueue q = getOpenOverseer().getStateUpdateQueue();
q.offer(Utils.toJSON(m));
break;
} catch (SolrException | KeeperException | AlreadyClosedException e) {
@@ -1104,7 +1116,7 @@ public class OverseerTest extends SolrTestCaseJ4 {
ZkStateReader.NUM_SHARDS_PROP, "1",
ZkStateReader.REPLICATION_FACTOR, "1"
);
- ZkDistributedQueue q = overseers.get(0).getStateUpdateQueue();
+ ZkDistributedQueue q = getOverseerZero().getStateUpdateQueue();
q.offer(Utils.toJSON(m));
}
@@ -1117,7 +1129,7 @@ public class OverseerTest extends SolrTestCaseJ4 {
ZkStateReader.CORE_NODE_NAME_PROP, "node1",
ZkStateReader.COLLECTION_PROP, "perf" + j,
ZkStateReader.NUM_SHARDS_PROP, "1");
- ZkDistributedQueue q = overseers.get(0).getStateUpdateQueue();
+ ZkDistributedQueue q = getOverseerZero().getStateUpdateQueue();
q.offer(Utils.toJSON(m));
if (j >= MAX_COLLECTIONS - 1) j = 0;
if (k >= MAX_CORES - 1) k = 0;
@@ -1216,7 +1228,7 @@ public class OverseerTest extends SolrTestCaseJ4 {
ZkStateReader.ROLES_PROP, "",
ZkStateReader.STATE_PROP, Replica.State.RECOVERING.toString());
queue.offer(Utils.toJSON(m));
- m = new ZkNodeProps(Overseer.QUEUE_OPERATION, "state",
+ m = new ZkNodeProps(Overseer.QUEUE_OPERATION, OverseerAction.STATE.toLower(),
ZkStateReader.NODE_NAME_PROP, "node1:8983_",
ZkStateReader.SHARD_ID_PROP, "shard1",
ZkStateReader.COLLECTION_PROP, COLLECTION,
@@ -1228,7 +1240,7 @@ public class OverseerTest extends SolrTestCaseJ4 {
overseerClient = electNewOverseer(server.getZkAddress());
//submit to proper queue
- queue = overseers.get(0).getStateUpdateQueue();
+ queue = getOverseerZero().getStateUpdateQueue();
m = new ZkNodeProps(Overseer.QUEUE_OPERATION, OverseerAction.STATE.toLower(),
ZkStateReader.NODE_NAME_PROP, "127.0.0.1:8983_solr",
ZkStateReader.SHARD_ID_PROP, "shard1",
@@ -1265,7 +1277,7 @@ public class OverseerTest extends SolrTestCaseJ4 {
overseerClient = electNewOverseer(server.getZkAddress());
- ZkDistributedQueue q = overseers.get(0).getStateUpdateQueue();
+ ZkDistributedQueue q = getOverseerZero().getStateUpdateQueue();
createCollection("c1", 1);
@@ -1383,8 +1395,9 @@ public class OverseerTest extends SolrTestCaseJ4 {
ZkController zkController = createMockZkController(address, null, reader);
zkControllers.add(zkController);
+ // Create an Overseer with associated configuration to NOT USE distributed state update. Tests in this class really test the Overseer.
Overseer overseer = new Overseer((HttpShardHandler) httpShardHandlerFactory.getShardHandler(), updateShardHandler, "/admin/cores", reader, zkController,
- new CloudConfig.CloudConfigBuilder("127.0.0.1", 8983, "").build());
+ new CloudConfig.CloudConfigBuilder("127.0.0.1", 8983, "").setUseDistributedClusterStateUpdates(false).build());
overseers.add(overseer);
ElectionContext ec = new OverseerElectionContext(zkClient, overseer,
address.replaceAll("/", "_"));
@@ -1464,7 +1477,7 @@ public class OverseerTest extends SolrTestCaseJ4 {
overseerClient = electNewOverseer(server.getZkAddress());
- ZkDistributedQueue q = overseers.get(0).getStateUpdateQueue();
+ ZkDistributedQueue q = getOverseerZero().getStateUpdateQueue();
// create collection
{
diff --git a/solr/core/src/test/org/apache/solr/cloud/TestRandomRequestDistribution.java b/solr/core/src/test/org/apache/solr/cloud/TestRandomRequestDistribution.java
index 5453574..3eac4c6 100644
--- a/solr/core/src/test/org/apache/solr/cloud/TestRandomRequestDistribution.java
+++ b/solr/core/src/test/org/apache/solr/cloud/TestRandomRequestDistribution.java
@@ -181,8 +181,15 @@ public class TestRandomRequestDistribution extends AbstractFullDistribZkTestBase
if (log.isInfoEnabled()) {
log.info("Forcing {} to go into 'down' state", notLeader.getStr(ZkStateReader.CORE_NAME_PROP));
}
- ZkDistributedQueue q = jettys.get(0).getCoreContainer().getZkController().getOverseer().getStateUpdateQueue();
- q.offer(Utils.toJSON(m));
+
+ final Overseer overseer = jettys.get(0).getCoreContainer().getZkController().getOverseer();
+ if (overseer.getDistributedClusterStateUpdater().isDistributedStateUpdate()) {
+ overseer.getDistributedClusterStateUpdater().doSingleStateUpdate(
+ DistributedClusterStateUpdater.MutatingCommand.ReplicaSetState, m, overseer.getSolrCloudManager(), overseer.getZkStateReader());
+ } else {
+ ZkDistributedQueue q = overseer.getStateUpdateQueue();
+ q.offer(Utils.toJSON(m));
+ }
verifyReplicaStatus(cloudClient.getZkStateReader(), "football", "shard1", notLeader.getName(), Replica.State.DOWN);
diff --git a/solr/core/src/test/org/apache/solr/cloud/TestSkipOverseerOperations.java b/solr/core/src/test/org/apache/solr/cloud/TestSkipOverseerOperations.java
index ce72800..ada8bd3 100644
--- a/solr/core/src/test/org/apache/solr/cloud/TestSkipOverseerOperations.java
+++ b/solr/core/src/test/org/apache/solr/cloud/TestSkipOverseerOperations.java
@@ -121,7 +121,15 @@ public class TestSkipOverseerOperations extends SolrCloudTestCase {
waitForState("Expected 2x1 for collection: " + collection, collection,
clusterShape(2, 2));
CollectionAdminResponse resp2 = CollectionAdminRequest.getOverseerStatus().process(cluster.getSolrClient());
- assertEquals(getNumLeaderOpeations(resp), getNumLeaderOpeations(resp2));
+
+ // When cluster state updates are done in a distributed way, the stats that this test is verifying are not available.
+ // See comment in OverseerStatusCmd.call().
+ // Keeping the rest of the test running in case other errors can happen and can be caught...
+ // Eventually maintain per node cluster state updates stats and be able to check them here? Longer term question...
+
+ if (!cluster.getOpenOverseer().getDistributedClusterStateUpdater().isDistributedStateUpdate()) {
+ assertEquals(getNumLeaderOpeations(resp), getNumLeaderOpeations(resp2));
+ }
CollectionAdminRequest.deleteCollection(collection).process(cluster.getSolrClient());
}
@@ -186,15 +194,30 @@ public class TestSkipOverseerOperations extends SolrCloudTestCase {
waitForState("Expected 2x2 for collection: " + collection, collection,
clusterShape(2, 4));
CollectionAdminResponse resp2 = CollectionAdminRequest.getOverseerStatus().process(cluster.getSolrClient());
- // 2 for recovering state, 4 for active state
- assertEquals(getNumStateOpeations(resp) + 6, getNumStateOpeations(resp2));
+
+ // See comment in testSkipLeaderOperations() above why this assert is skipped
+ if (!cluster.getOpenOverseer().getDistributedClusterStateUpdater().isDistributedStateUpdate()) {
+ // 2 for recovering state, 4 for active state
+ assertEquals(getNumStateOpeations(resp) + 6, getNumStateOpeations(resp2));
+ }
CollectionAdminRequest.deleteCollection(collection).process(cluster.getSolrClient());
}
+ /**
+ * Returns the value corresponding to stat: "overseer_operations", "leader", "requests"
+ * This stat (see {@link org.apache.solr.cloud.api.collections.OverseerStatusCmd} is updated when the cluster state
+ * updater processes a message of type {@link org.apache.solr.cloud.overseer.OverseerAction#LEADER} to set a shard leader<p>
+ *
+ * The update happens in org.apache.solr.cloud.Overseer.ClusterStateUpdater.processQueueItem()
+ */
private int getNumLeaderOpeations(CollectionAdminResponse resp) {
return (int) resp.getResponse().findRecursive("overseer_operations", "leader", "requests");
}
+ /**
+ * "state" stats are when Overseer processes a {@link org.apache.solr.cloud.overseer.OverseerAction#STATE} message
+ * that sets replica properties
+ */
private int getNumStateOpeations(CollectionAdminResponse resp) {
return (int) resp.getResponse().findRecursive("overseer_operations", "state", "requests");
}
diff --git a/solr/core/src/test/org/apache/solr/cloud/ZkControllerTest.java b/solr/core/src/test/org/apache/solr/cloud/ZkControllerTest.java
index 8018e78..ae9001b 100644
--- a/solr/core/src/test/org/apache/solr/cloud/ZkControllerTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/ZkControllerTest.java
@@ -288,7 +288,12 @@ public class ZkControllerTest extends SolrTestCaseJ4 {
ZkNodeProps m = new ZkNodeProps(Overseer.QUEUE_OPERATION,
CollectionParams.CollectionAction.CREATE.toLower(), ZkStateReader.NODE_NAME_PROP, nodeName, ZkStateReader.NUM_SHARDS_PROP, "1",
"name", collectionName);
- zkController.getOverseerJobQueue().offer(Utils.toJSON(m));
+ if (zkController.getDistributedClusterStateUpdater().isDistributedStateUpdate()) {
+ zkController.getDistributedClusterStateUpdater().doSingleStateUpdate(DistributedClusterStateUpdater.MutatingCommand.ClusterCreateCollection, m,
+ zkController.getSolrCloudManager(), zkController.getZkStateReader());
+ } else {
+ zkController.getOverseerJobQueue().offer(Utils.toJSON(m));
+ }
HashMap<String, Object> propMap = new HashMap<>();
propMap.put(Overseer.QUEUE_OPERATION, ADDREPLICA.toLower());
@@ -297,7 +302,12 @@ public class ZkControllerTest extends SolrTestCaseJ4 {
propMap.put(ZkStateReader.NODE_NAME_PROP, "non_existent_host1");
propMap.put(ZkStateReader.CORE_NAME_PROP, collectionName);
propMap.put(ZkStateReader.STATE_PROP, "active");
- zkController.getOverseerJobQueue().offer(Utils.toJSON(propMap));
+ if (zkController.getDistributedClusterStateUpdater().isDistributedStateUpdate()) {
+ zkController.getDistributedClusterStateUpdater().doSingleStateUpdate(DistributedClusterStateUpdater.MutatingCommand.SliceAddReplica, new ZkNodeProps(propMap),
+ zkController.getSolrCloudManager(), zkController.getZkStateReader());
+ } else {
+ zkController.getOverseerJobQueue().offer(Utils.toJSON(propMap));
+ }
propMap = new HashMap<>();
propMap.put(Overseer.QUEUE_OPERATION, ADDREPLICA.toLower());
@@ -306,7 +316,12 @@ public class ZkControllerTest extends SolrTestCaseJ4 {
propMap.put(ZkStateReader.NODE_NAME_PROP, "non_existent_host2");
propMap.put(ZkStateReader.CORE_NAME_PROP, collectionName);
propMap.put(ZkStateReader.STATE_PROP, "down");
- zkController.getOverseerJobQueue().offer(Utils.toJSON(propMap));
+ if (zkController.getDistributedClusterStateUpdater().isDistributedStateUpdate()) {
+ zkController.getDistributedClusterStateUpdater().doSingleStateUpdate(DistributedClusterStateUpdater.MutatingCommand.SliceAddReplica, new ZkNodeProps(propMap),
+ zkController.getSolrCloudManager(), zkController.getZkStateReader());
+ } else {
+ zkController.getOverseerJobQueue().offer(Utils.toJSON(propMap));
+ }
zkController.getZkStateReader().forciblyRefreshAllClusterStateSlow();
diff --git a/solr/server/solr/solr.xml b/solr/server/solr/solr.xml
index e34d0a23..0fc94e2 100644
--- a/solr/server/solr/solr.xml
+++ b/solr/server/solr/solr.xml
@@ -45,6 +45,7 @@
<int name="distribUpdateConnTimeout">${distribUpdateConnTimeout:60000}</int>
<str name="zkCredentialsProvider">${zkCredentialsProvider:org.apache.solr.common.cloud.DefaultZkCredentialsProvider}</str>
<str name="zkACLProvider">${zkACLProvider:org.apache.solr.common.cloud.DefaultZkACLProvider}</str>
+ <bool name="distributedClusterStateUpdates">${distributedClusterStateUpdates:false}</bool>
</solrcloud>
diff --git a/solr/solr-ref-guide/src/format-of-solr-xml.adoc b/solr/solr-ref-guide/src/format-of-solr-xml.adoc
index 458b936..bdae4e5 100644
--- a/solr/solr-ref-guide/src/format-of-solr-xml.adoc
+++ b/solr/solr-ref-guide/src/format-of-solr-xml.adoc
@@ -42,6 +42,7 @@ You can find `solr.xml` in your `$SOLR_HOME` directory (usually `server/solr` or
<int name="distribUpdateConnTimeout">${distribUpdateConnTimeout:60000}</int>
<str name="zkCredentialsProvider">${zkCredentialsProvider:org.apache.solr.common.cloud.DefaultZkCredentialsProvider}</str>
<str name="zkACLProvider">${zkACLProvider:org.apache.solr.common.cloud.DefaultZkACLProvider}</str>
+ <bool name="distributedClusterStateUpdates">${distributedClusterStateUpdates:false}</bool>
</solrcloud>
<shardHandlerFactory name="shardHandlerFactory"
@@ -163,6 +164,9 @@ If `TRUE`, node names are not based on the address of the node, but on a generic
Optional parameters that can be specified if you are using <<zookeeper-access-control.adoc#,ZooKeeper Access Control>>.
+`distributedClusterStateUpdates`::
+If `TRUE`, the internal behavior of SolrCloud is changed to not use the Overseer for collections' `state.json` updates but do this directly against ZooKeeper.
+
=== The <logging> Element
`class`::
diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/PerReplicaStatesOps.java b/solr/solrj/src/java/org/apache/solr/common/cloud/PerReplicaStatesOps.java
index 0f5bb53..02bf6f9 100644
--- a/solr/solrj/src/java/org/apache/solr/common/cloud/PerReplicaStatesOps.java
+++ b/solr/solrj/src/java/org/apache/solr/common/cloud/PerReplicaStatesOps.java
@@ -42,7 +42,6 @@ public class PerReplicaStatesOps {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private PerReplicaStates rs;
List<PerReplicaStates.Operation> ops;
- private boolean preOp = true;
final Function<PerReplicaStates, List<PerReplicaStates.Operation>> fun;
PerReplicaStatesOps(Function<PerReplicaStates, List<PerReplicaStates.Operation>> fun) {
@@ -255,7 +254,6 @@ public class PerReplicaStatesOps {
}
return operations;
});
- result.preOp = false;
result.ops = result.refresh(null);
return result;
}
@@ -278,13 +276,6 @@ public class PerReplicaStatesOps {
}
/**
- * To be executed before collection state.json is persisted
- */
- public boolean isPreOp() {
- return preOp;
- }
-
- /**
* This method should compute the set of ZK operations for a given action
* for instance, a state change may result in 2 operations on per-replica states (1 CREATE and 1 DELETE)
* if a multi operation fails because the state got modified from behind,
diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java b/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java
index ccd93c6..baae22b 100644
--- a/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java
+++ b/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java
@@ -410,7 +410,7 @@ public class ZkStateReader implements SolrCloseable {
if (log.isDebugEnabled()) {
log.debug("Server older than client {}<{}", collection.getZNodeVersion(), version);
}
- DocCollection nu = getCollectionLive(this, coll);
+ DocCollection nu = getCollectionLive(coll);
if (nu == null) return -1;
if (nu.getZNodeVersion() > collection.getZNodeVersion()) {
if (updateWatchedCollection(coll, nu)) {
@@ -680,7 +680,7 @@ public class ZkStateReader implements SolrCloseable {
}
}
if (shouldFetch) {
- cachedDocCollection = getCollectionLive(ZkStateReader.this, collName);
+ cachedDocCollection = getCollectionLive(collName);
lastUpdateTime = System.nanoTime();
}
}
@@ -1193,7 +1193,7 @@ public class ZkStateReader implements SolrCloseable {
}
/**
- * Watches a single collection's format2 state.json.
+ * Watches a single collection's state.json.
*/
class StateWatcher implements Watcher {
private final String coll;
@@ -1446,9 +1446,9 @@ public class ZkStateReader implements SolrCloseable {
}
}
- public static DocCollection getCollectionLive(ZkStateReader zkStateReader, String coll) {
+ public DocCollection getCollectionLive(String coll) {
try {
- return zkStateReader.fetchCollectionState(coll, null);
+ return fetchCollectionState(coll, null);
} catch (KeeperException e) {
throw new SolrException(ErrorCode.BAD_REQUEST, "Could not load collection from ZK: " + coll, e);
} catch (InterruptedException e) {
diff --git a/solr/test-framework/src/java/org/apache/solr/cloud/MiniSolrCloudCluster.java b/solr/test-framework/src/java/org/apache/solr/cloud/MiniSolrCloudCluster.java
index 7cd09a1..c248db8 100644
--- a/solr/test-framework/src/java/org/apache/solr/cloud/MiniSolrCloudCluster.java
+++ b/solr/test-framework/src/java/org/apache/solr/cloud/MiniSolrCloudCluster.java
@@ -120,6 +120,7 @@ public class MiniSolrCloudCluster {
" <str name=\"zkACLProvider\">${zkACLProvider:org.apache.solr.common.cloud.DefaultZkACLProvider}</str> \n" +
" <str name=\"pkiHandlerPrivateKeyPath\">${pkiHandlerPrivateKeyPath:cryptokeys/priv_key512_pkcs8.pem}</str> \n" +
" <str name=\"pkiHandlerPublicKeyPath\">${pkiHandlerPublicKeyPath:cryptokeys/pub_key512.der}</str> \n" +
+ " <str name=\"distributedClusterStateUpdates\">${solr.distributedClusterStateUpdates:false}</str> \n" +
" </solrcloud>\n" +
// NOTE: this turns off the metrics collection unless overriden by a sysprop
" <metrics enabled=\"${metricsEnabled:false}\">\n" +
diff --git a/solr/test-framework/src/java/org/apache/solr/cloud/SolrCloudTestCase.java b/solr/test-framework/src/java/org/apache/solr/cloud/SolrCloudTestCase.java
index c6f26c6..35c87e7 100644
--- a/solr/test-framework/src/java/org/apache/solr/cloud/SolrCloudTestCase.java
+++ b/solr/test-framework/src/java/org/apache/solr/cloud/SolrCloudTestCase.java
@@ -35,6 +35,7 @@ import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.Predicate;
+import org.apache.lucene.util.LuceneTestCase;
import org.apache.solr.SolrTestCaseJ4;
import org.apache.solr.client.solrj.SolrRequest;
import org.apache.solr.client.solrj.SolrServerException;
@@ -109,6 +110,7 @@ public class SolrCloudTestCase extends SolrTestCaseJ4 {
private Map<String, Object> clusterProperties = new HashMap<>();
private boolean trackJettyMetrics;
+ private boolean useDistributedClusterStateUpdate;
/**
* Create a builder
@@ -119,6 +121,8 @@ public class SolrCloudTestCase extends SolrTestCaseJ4 {
public Builder(int nodeCount, Path baseDir) {
this.nodeCount = nodeCount;
this.baseDir = baseDir;
+ // By default the MiniSolrCloudCluster being built will randomly (seed based) decide which cluster update strategy to use
+ this.useDistributedClusterStateUpdate = LuceneTestCase.random().nextInt(2) == 0;
}
/**
@@ -187,6 +191,48 @@ public class SolrCloudTestCase extends SolrTestCaseJ4 {
}
/**
+ * This method makes the MiniSolrCloudCluster use the "other" cluster state update strategy than it normally would.
+ * When some test classes call this method (and some don't) we make sure that a run of multiple tests with a single
+ * seed will exercise both code lines (distributed updates and Overseer based updates) so regressions can be spotted
+ * faster.<p>
+ *
+ * The real need is for a few tests covering reasonable use cases to call this method. If you're adding a new test,
+ * you don't have to call it (but it's ok if you do).
+ */
+ public Builder useOtherClusterStateUpdateStrategy() {
+ useDistributedClusterStateUpdate = !useDistributedClusterStateUpdate;
+ return this;
+ }
+
+ /**
+ * Force the cluster state update strategy to be either Overseer based or distributed. <b>This method can be useful when
+ * debugging tests</b> failing in only one of the two modes to have all local runs exhibit the issue, as well obviously for
+ * tests that are not compatible with one of the two modes.
+ * <p>
+ * If this method is not called, the strategy being used will be random if the configuration passed to the cluster
+ * ({@code solr.xml} equivalent) contains a placeholder similar to:
+ * <pre>
+ * {@code
+ * <solrcloud>
+ * ....
+ * <str name="distributedClusterStateUpdates">${solr.distributedClusterStateUpdates:false}</str>
+ * ....
+ * </solrcloud>
+ * }</pre>
+ * For an example of a configuration supporting this setting, see {@link MiniSolrCloudCluster#DEFAULT_CLOUD_SOLR_XML}.
+ * When a test sets a different {@code solr.xml} config (using {@link #withSolrXml}), if the config does not contain
+ * the placeholder, the strategy will be defined by the value assigned to {@code useDistributedClusterStateUpdates}
+ * in {@link org.apache.solr.core.CloudConfig.CloudConfigBuilder}.
+ *
+ * @param distributed When {@code true}, cluster state updates are handled in a distributed way by nodes. When
+ * {@code false}, cluster state updates are handled by Overseer.
+ */
+ public Builder withDistributedClusterStateUpdates(boolean distributed) {
+ useDistributedClusterStateUpdate = distributed;
+ return this;
+ }
+
+ /**
* Set a cluster property
*
* @param propertyName the property name
@@ -217,6 +263,14 @@ public class SolrCloudTestCase extends SolrTestCaseJ4 {
* @throws Exception if an error occurs on startup
*/
public MiniSolrCloudCluster build() throws Exception {
+ // This will have an impact on how the MiniSolrCloudCluster and therefore the test run if the config being
+ // used does have the appropriate placeholder.
+ // It is a good place to hard code true or false instead of useDistributedClusterStateUpdate to run all qualifying
+ // tests with a given cluster state update strategy (non qualifying tests will use the default value assigned to
+ // useDistributedClusterStateUpdates in org.apache.solr.core.CloudConfig.CloudConfigBuilder, so if you really want
+ // ALL tests to run with a given strategy, patch it there too (and revert before commit!)
+ System.setProperty("solr.distributedClusterStateUpdates", Boolean.toString(useDistributedClusterStateUpdate));
+
JettyConfig jettyConfig = jettyConfigBuilder.build();
MiniSolrCloudCluster cluster = new MiniSolrCloudCluster(nodeCount, baseDir, solrxml, jettyConfig,
null, securityJson, trackJettyMetrics);