You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by il...@apache.org on 2021/02/13 01:42:35 UTC

[lucene-solr] branch master updated: SOLR-14928: allow cluster state updates to be done in a distributed way and not through Overseer (#2364)

This is an automated email from the ASF dual-hosted git repository.

ilan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git


The following commit(s) were added to refs/heads/master by this push:
     new 23755dd  SOLR-14928: allow cluster state updates to be done in a distributed way and not through Overseer (#2364)
23755dd is described below

commit 23755ddfdd36a9613010cb9e6201127df55be744
Author: Ilan Ginzburg <il...@gmail.com>
AuthorDate: Sat Feb 13 02:42:18 2021 +0100

    SOLR-14928: allow cluster state updates to be done in a distributed way and not through Overseer (#2364)
---
 solr/CHANGES.txt                                   |   2 +
 .../solr/cloud/DistributedClusterStateUpdater.java | 822 +++++++++++++++++++++
 .../src/java/org/apache/solr/cloud/Overseer.java   |  61 +-
 .../apache/solr/cloud/OverseerNodePrioritizer.java |  18 +-
 .../solr/cloud/RefreshCollectionMessage.java       |   3 +-
 .../solr/cloud/ShardLeaderElectionContext.java     |  15 +-
 .../solr/cloud/ShardLeaderElectionContextBase.java |   9 +-
 .../java/org/apache/solr/cloud/ZkController.java   |  82 +-
 .../solr/cloud/api/collections/AddReplicaCmd.java  |  18 +-
 .../solr/cloud/api/collections/AliasCmd.java       |   2 +-
 .../cloud/api/collections/CreateCollectionCmd.java |  70 +-
 .../solr/cloud/api/collections/CreateShardCmd.java |  16 +-
 .../cloud/api/collections/DeleteCollectionCmd.java |  11 +-
 .../solr/cloud/api/collections/DeleteShardCmd.java |  22 +-
 .../solr/cloud/api/collections/MigrateCmd.java     |   8 +-
 .../OverseerCollectionMessageHandler.java          |  46 +-
 .../cloud/api/collections/OverseerStatusCmd.java   |  10 +
 .../api/collections/ReindexCollectionCmd.java      |  29 +-
 .../solr/cloud/api/collections/RestoreCmd.java     |  52 +-
 .../solr/cloud/api/collections/SplitShardCmd.java  |  62 +-
 .../solr/cloud/overseer/CollectionMutator.java     |  23 +-
 .../apache/solr/cloud/overseer/NodeMutator.java    |  90 ++-
 .../apache/solr/cloud/overseer/SliceMutator.java   |   2 +-
 .../apache/solr/cloud/overseer/ZkStateWriter.java  |  31 +-
 .../apache/solr/cloud/overseer/ZkWriteCommand.java |  43 +-
 .../src/java/org/apache/solr/core/CloudConfig.java |  17 +-
 .../java/org/apache/solr/core/SolrXmlConfig.java   |   3 +
 .../solr/handler/admin/CollectionsHandler.java     |  32 +-
 .../processor/DistributedZkUpdateProcessor.java    |  12 +-
 .../solr/cloud/CreateCollectionCleanupTest.java    |   2 +
 .../org/apache/solr/cloud/DeleteReplicaTest.java   |  34 +-
 .../org/apache/solr/cloud/DeleteShardTest.java     |  13 +-
 .../test/org/apache/solr/cloud/MockSolrSource.java |  15 +-
 .../OverseerCollectionConfigSetProcessorTest.java  | 266 +++++--
 .../org/apache/solr/cloud/OverseerRolesTest.java   |   7 +-
 .../org/apache/solr/cloud/OverseerStatusTest.java  |  35 +-
 .../test/org/apache/solr/cloud/OverseerTest.java   |  43 +-
 .../solr/cloud/TestRandomRequestDistribution.java  |  11 +-
 .../solr/cloud/TestSkipOverseerOperations.java     |  29 +-
 .../org/apache/solr/cloud/ZkControllerTest.java    |  21 +-
 solr/server/solr/solr.xml                          |   1 +
 solr/solr-ref-guide/src/format-of-solr-xml.adoc    |   4 +
 .../solr/common/cloud/PerReplicaStatesOps.java     |   9 -
 .../apache/solr/common/cloud/ZkStateReader.java    |  10 +-
 .../apache/solr/cloud/MiniSolrCloudCluster.java    |   1 +
 .../org/apache/solr/cloud/SolrCloudTestCase.java   |  54 ++
 46 files changed, 1790 insertions(+), 376 deletions(-)

diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt
index 378cb54..c05cd84 100644
--- a/solr/CHANGES.txt
+++ b/solr/CHANGES.txt
@@ -201,6 +201,8 @@ Other Changes
 
 * SOLR-15118: Switch /v2/collections APIs over to the now-preferred annotated-POJO implementation approach (Jason Gerlowski)
 
+* SOLR-14928: Allow cluster state updates to be done in a distributed fashion without going through Overseer (Ilan Ginzburg)
+
 Bug Fixes
 ---------------------
 * SOLR-14546: Fix for a relatively hard to hit issue in OverseerTaskProcessor that could lead to out of order execution
diff --git a/solr/core/src/java/org/apache/solr/cloud/DistributedClusterStateUpdater.java b/solr/core/src/java/org/apache/solr/cloud/DistributedClusterStateUpdater.java
new file mode 100644
index 0000000..f57cc31
--- /dev/null
+++ b/solr/core/src/java/org/apache/solr/cloud/DistributedClusterStateUpdater.java
@@ -0,0 +1,822 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.cloud;
+
+import org.apache.solr.client.solrj.cloud.SolrCloudManager;
+import org.apache.solr.cloud.overseer.*;
+import org.apache.solr.common.SolrException;
+import org.apache.solr.common.cloud.*;
+import org.apache.solr.common.params.CollectionParams;
+import org.apache.solr.common.params.CommonParams;
+import org.apache.solr.common.util.Pair;
+import org.apache.solr.common.util.Utils;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.data.Stat;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.invoke.MethodHandles;
+import java.util.*;
+
+import static java.util.Collections.singletonMap;
+import static org.apache.solr.cloud.overseer.ZkStateWriter.NO_OP;
+import static org.apache.solr.common.cloud.ZkStateReader.COLLECTIONS_ZKNODE;
+import static org.apache.solr.common.params.CollectionParams.CollectionAction.CREATE;
+import static org.apache.solr.common.params.CollectionParams.CollectionAction.DELETE;
+import static org.apache.solr.common.params.CollectionParams.CollectionAction.CREATESHARD;
+import static org.apache.solr.common.params.CollectionParams.CollectionAction.DELETESHARD;
+import static org.apache.solr.common.params.CollectionParams.CollectionAction.ADDREPLICA;
+import static org.apache.solr.common.params.CollectionParams.CollectionAction.ADDREPLICAPROP;
+import static org.apache.solr.common.params.CollectionParams.CollectionAction.DELETEREPLICAPROP;
+import static org.apache.solr.common.params.CollectionParams.CollectionAction.BALANCESHARDUNIQUE;
+import static org.apache.solr.common.params.CollectionParams.CollectionAction.MODIFYCOLLECTION;
+
+/**
+ * Gives access to distributed cluster state update methods and allows code to inquire whether distributed state update is enabled.
+ */
+public class DistributedClusterStateUpdater {
+
+  private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+  /**
+   * When {@code true} each node updates Zookeeper directly for changing state.json files. When {@code false} messages
+   * are instead sent to the Overseer and the update is done there.
+   */
+  private final boolean useDistributedStateUpdate;
+
+  /**
+   * Builds an instance with the specified behavior regarding distribution of state updates, allowing to know distributed
+   * updates are not enabled (parameter {@code useDistributedStateUpdate} is {@code false}), or when they are (parameter
+   * is {@code true)}, gives access to methods and classes allowing the execution of the updates.
+   *
+   * @param useDistributedStateUpdate when this parameter is {@code false}, only method expected to ever be called on this
+   *                                  instance is {@link #isDistributedStateUpdate}, and it will return {@code false}.
+   */
+  public DistributedClusterStateUpdater(boolean useDistributedStateUpdate) {
+    this.useDistributedStateUpdate = useDistributedStateUpdate;
+    if (log.isInfoEnabled()) {
+      log.info("Creating DistributedClusterStateUpdater with useDistributedStateUpdate=" + useDistributedStateUpdate
+      + ". Solr will be using " + (useDistributedStateUpdate ? "distributed" : "Overseer based") + " cluster state updates."); // nowarn
+    }
+  }
+
+  /**
+   * Create a new instance of {@link StateChangeRecorder} for a given collection and a given intention (collection
+   * creation vs. operations on an existing collection)
+   */
+  public StateChangeRecorder createStateChangeRecorder(String collectionName, boolean isCollectionCreation) {
+    if (!useDistributedStateUpdate) {
+      // Seeing this exception or any other of this kind here means there's a big bug in the code. No user input can cause this.
+      throw new IllegalStateException("Not expecting to create instances of StateChangeRecorder when not using distributed state update");
+    }
+    return new StateChangeRecorder(collectionName, isCollectionCreation);
+  }
+
+  /**
+   * Syntactic sugar to allow a single change to the cluster state to be made in a single call.
+   */
+  public void doSingleStateUpdate(MutatingCommand command, ZkNodeProps message,
+                                  SolrCloudManager scm, ZkStateReader zkStateReader) throws KeeperException, InterruptedException {
+    if (!useDistributedStateUpdate) {
+      throw new IllegalStateException("Not expecting to execute doSingleStateUpdate when not using distributed state update");
+    }
+    String collectionName = command.getCollectionName(message);
+    final StateChangeRecorder scr = new StateChangeRecorder(collectionName, command.isCollectionCreation());
+    scr.record(command, message);
+    scr.executeStateUpdates(scm, zkStateReader);
+  }
+
+  public void executeNodeDownStateUpdate(String nodeName, ZkStateReader zkStateReader) {
+    if (!useDistributedStateUpdate) {
+      throw new IllegalStateException("Not expecting to execute executeNodeDownStateUpdate when not using distributed state update");
+    }
+    CollectionNodeDownChangeCalculator.executeNodeDownStateUpdate(nodeName, zkStateReader);
+  }
+
+  /**
+   * When this method returns {@code false} the legacy behavior of enqueueing cluster state update messages to Overseer
+   * should be used and no other method of this class should be called.
+   */
+  public boolean isDistributedStateUpdate() {
+    return useDistributedStateUpdate;
+  }
+
+  /**
+   * Naming of enum instances are the mutator object name (e.g. {@code Cluster} for {@link ClusterStateMutator} or
+   * {@code Collection} for {@link CollectionMutator}) followed by the method name of the mutator.
+   * For example {@link #SliceAddReplica} represents {@link SliceMutator#addReplica}.
+   * <p>
+   * Even though the various mutator classes do not implement any common interface, luckily their constructors and methods
+   * take the same set of parameters so all can be called from the enum method {@link #buildWriteCommand(SolrCloudManager, ClusterState, ZkNodeProps)}.
+   * <p>
+   * Given that {@link OverseerAction#DOWNNODE} is different (it returns a list of write commands and impacts more than one collection),
+   * it is handled specifically in {@link CollectionNodeDownChangeCalculator#executeNodeDownStateUpdate}.
+   */
+  public enum MutatingCommand {
+    BalanceShardsUnique(BALANCESHARDUNIQUE, ZkStateReader.COLLECTION_PROP) {
+      @Override
+      public ZkWriteCommand buildWriteCommand(SolrCloudManager scm, ClusterState cs, ZkNodeProps message) {
+        ExclusiveSliceProperty dProp = new ExclusiveSliceProperty(cs, message);
+        // Next line is where the actual work is done
+        if (dProp.balanceProperty()) {
+          return new ZkWriteCommand(getCollectionName(message), dProp.getDocCollection());
+        } else {
+          return NO_OP;
+        }
+      }
+    },
+    ClusterCreateCollection(CREATE, CommonParams.NAME) {
+      @Override
+      public ZkWriteCommand buildWriteCommand(SolrCloudManager scm, ClusterState cs, ZkNodeProps message) {
+        return new ClusterStateMutator(scm).createCollection(cs, message);
+      }
+
+      @Override
+      public boolean isCollectionCreation() {
+        return true;
+      }
+    },
+    ClusterDeleteCollection(DELETE, CommonParams.NAME) {
+      @Override
+      public ZkWriteCommand buildWriteCommand(SolrCloudManager scm, ClusterState cs, ZkNodeProps message) {
+        return new ClusterStateMutator(scm).deleteCollection(cs, message);
+      }
+    },
+    CollectionDeleteShard(DELETESHARD, ZkStateReader.COLLECTION_PROP) {
+      @Override
+      public ZkWriteCommand buildWriteCommand(SolrCloudManager scm, ClusterState cs, ZkNodeProps message) {
+        return new CollectionMutator(scm).deleteShard(cs, message);
+      }
+    },
+    CollectionModifyCollection(MODIFYCOLLECTION, ZkStateReader.COLLECTION_PROP) {
+      @Override
+      public ZkWriteCommand buildWriteCommand(SolrCloudManager scm, ClusterState cs, ZkNodeProps message) {
+        return new CollectionMutator(scm).modifyCollection(cs, message);
+      }
+    },
+    CollectionCreateShard(CREATESHARD, ZkStateReader.COLLECTION_PROP) {
+      @Override
+      public ZkWriteCommand buildWriteCommand(SolrCloudManager scm, ClusterState cs, ZkNodeProps message) {
+        return new CollectionMutator(scm).createShard(cs, message);
+      }
+    },
+    ReplicaAddReplicaProperty(ADDREPLICAPROP, ZkStateReader.COLLECTION_PROP) {
+      @Override
+      public ZkWriteCommand buildWriteCommand(SolrCloudManager scm, ClusterState cs, ZkNodeProps message) {
+        return new ReplicaMutator(scm).addReplicaProperty(cs, message);
+      }
+    },
+    ReplicaDeleteReplicaProperty(DELETEREPLICAPROP, ZkStateReader.COLLECTION_PROP) {
+      @Override
+      public ZkWriteCommand buildWriteCommand(SolrCloudManager scm, ClusterState cs, ZkNodeProps message) {
+        return new ReplicaMutator(scm).deleteReplicaProperty(cs, message);
+      }
+    },
+    ReplicaSetState(null, ZkStateReader.COLLECTION_PROP) {
+      @Override
+      public ZkWriteCommand buildWriteCommand(SolrCloudManager scm, ClusterState cs, ZkNodeProps message) {
+        return new ReplicaMutator(scm).setState(cs, message);
+      }
+    },
+    SliceAddReplica(ADDREPLICA, ZkStateReader.COLLECTION_PROP) {
+      @Override
+      public ZkWriteCommand buildWriteCommand(SolrCloudManager scm, ClusterState cs, ZkNodeProps message) {
+        return new SliceMutator(scm).addReplica(cs, message);
+      }
+    },
+    SliceAddRoutingRule(null, ZkStateReader.COLLECTION_PROP) {
+      @Override
+      public ZkWriteCommand buildWriteCommand(SolrCloudManager scm, ClusterState cs, ZkNodeProps message) {
+        return new SliceMutator(scm).addRoutingRule(cs, message);
+      }
+    },
+    SliceRemoveReplica(null, ZkStateReader.COLLECTION_PROP) {
+      @Override
+      public ZkWriteCommand buildWriteCommand(SolrCloudManager scm, ClusterState cs, ZkNodeProps message) {
+        return new SliceMutator(scm).removeReplica(cs, message);
+      }
+    },
+    SliceRemoveRoutingRule(null, ZkStateReader.COLLECTION_PROP) {
+      @Override
+      public ZkWriteCommand buildWriteCommand(SolrCloudManager scm, ClusterState cs, ZkNodeProps message) {
+        return new SliceMutator(scm).removeRoutingRule(cs, message);
+      }
+    },
+    SliceSetShardLeader(null, ZkStateReader.COLLECTION_PROP) {
+      @Override
+      public ZkWriteCommand buildWriteCommand(SolrCloudManager scm, ClusterState cs, ZkNodeProps message) {
+        return new SliceMutator(scm).setShardLeader(cs, message);
+      }
+    },
+    SliceUpdateShardState(null, ZkStateReader.COLLECTION_PROP) {
+      @Override
+      public ZkWriteCommand buildWriteCommand(SolrCloudManager scm, ClusterState cs, ZkNodeProps message) {
+        return new SliceMutator(scm).updateShardState(cs, message);
+      }
+    };
+
+    private static final EnumMap<CollectionParams.CollectionAction, MutatingCommand> actionsToCommands;
+
+    static {
+      actionsToCommands = new EnumMap<>(CollectionParams.CollectionAction.class);
+      for (MutatingCommand mc : MutatingCommand.values()) {
+        if (mc.collectionAction != null) {
+          actionsToCommands.put(mc.collectionAction, mc);
+        }
+      }
+    }
+
+    private final CollectionParams.CollectionAction collectionAction;
+    private final String collectionNameParamName;
+
+    MutatingCommand(CollectionParams.CollectionAction collectionAction, String collectionNameParamName) {
+      this.collectionAction = collectionAction;
+      this.collectionNameParamName = collectionNameParamName;
+    }
+
+    /**
+     * mutating commands that return a single ZkWriteCommand override this method
+     */
+    public abstract ZkWriteCommand buildWriteCommand(SolrCloudManager scm, ClusterState cs, ZkNodeProps message);
+
+    public String getCollectionName(ZkNodeProps message) {
+      return message.getStr(collectionNameParamName);
+    }
+
+    /**
+     * @return the {@link MutatingCommand} corresponding to the passed {@link org.apache.solr.common.params.CollectionParams.CollectionAction} or
+     * {@code null} if no cluster state update command is defined for that action (given that {@link org.apache.solr.common.params.CollectionParams.CollectionAction}
+     * are used for the Collection API and only some are used for the cluster state updates, this is expected).
+     */
+    public static MutatingCommand getCommandFor(CollectionParams.CollectionAction collectionAction) {
+      return actionsToCommands.get(collectionAction);
+    }
+
+    /**
+     * Given only one command creates a collection {@link #ClusterCreateCollection}, the default implementation is provided here.
+     */
+    public boolean isCollectionCreation() {
+      return false;
+    }
+  }
+
+  /**
+   * Instances of this class are the fundamental building block of the CAS (Compare and Swap) update approach. These instances
+   * accept an initial cluster state (as present in Zookeeper basically) and apply to it a set of modifications that are
+   * then attempted to be written back to Zookeeper {@link ZkUpdateApplicator is driving this process}.
+   * If the update fails (due to a concurrent update), the Zookeeper content is read again, the changes (updates) are
+   * applied to it again and a new write attempt is made. This guarantees than an update does not overwrite data just
+   * written by a concurrent update happening from the same or from another node.
+   */
+  interface StateChangeCalculator {
+    String getCollectionName();
+
+    /**
+     * @return {@code true} if this updater is computing updates for creating a collection that does not exist yet.
+     */
+    boolean isCollectionCreation();
+
+    /**
+     * Given an initial {@link ClusterState}, computes after applying updates the cluster state to be written to state.json
+     * (made available through {@link #getUpdatedClusterState()}) as well as the list of per replica operations (made available
+     * through {@link #getPerReplicaStatesOps()}). Any or both of these methods will return {@code null} if there is no
+     * corresponding update to apply.
+     */
+    void computeUpdates(ClusterState currentState);
+
+    /**
+     * Method can only be called after {@link #computeUpdates} has been called.
+     * @return the new state to write into {@code state.json} or {@code null} if no update needed.
+     */
+    ClusterState getUpdatedClusterState();
+
+    /**
+     * Method can only be called after {@link #computeUpdates} has been called.
+     * @return {@code null} when there are no per replica state ops
+     */
+    List<PerReplicaStatesOps> getPerReplicaStatesOps();
+  }
+
+  /**
+   * This class is passed a {@link StateChangeCalculator} targeting a single collection that is able to apply an update to an
+   * initial cluster state and return the updated cluster state. The {@link StateChangeCalculator} is used (possibly multiple times)
+   * to do a Compare And Swap (a.k.a conditional update or CAS) of the collection's {@code state.json} Zookeeper file.<p>
+   *
+   * When there are per replica states to update, they are attempted once (they do their own Compare And Swap), before
+   * the (potentially multiple) attempts to update the {@code state.json} file. This conforms to the strategy in place
+   * when {@code state.json} updates are sent to the Overseer to do. See {@link ZkStateWriter#writePendingUpdates}.
+   */
+  static private class ZkUpdateApplicator {
+    /**
+     * When trying to update a {@code state.json} file that keeps getting changed by concurrent updater, the number of attempts
+     * made before giving up. This is likely way too high, if we get to 50 failed attempts something else went wrong.
+     * To be reconsidered once Collection API commands are distributed as well.
+     */
+    public static final int CAS_MAX_ATTEMPTS = 50;
+
+    private final ZkStateReader zkStateReader;
+    private final StateChangeCalculator updater;
+
+    static void applyUpdate(ZkStateReader zkStateReader, StateChangeCalculator updater) throws KeeperException, InterruptedException {
+      ZkUpdateApplicator zua = new ZkUpdateApplicator(zkStateReader, updater);
+      zua.applyUpdate();
+    }
+
+    private ZkUpdateApplicator(ZkStateReader zkStateReader, StateChangeCalculator updater) {
+      this.zkStateReader = zkStateReader;
+      this.updater = updater;
+    }
+
+    /**
+     * By delegating work to {@link PerReplicaStatesOps} for per replica state updates, and using optimistic locking
+     * (with retries) to directly update the content of {@code state.json}, updates Zookeeper with the changes computed
+     * by the {@link StateChangeCalculator}.
+     */
+    private void applyUpdate() throws KeeperException, InterruptedException {
+      /* Initial slightly naive implementation (later on we should consider some caching between updates...).
+       * For updates:
+       * - Read the state.json file from Zookeeper
+       * - Run the updater to execute the changes on top of that file
+       * - Compare and Swap the file with the new version (fail if something else changed ZK in the meantime)
+       * - Retry a few times all above steps if update is failing.
+       *
+       * For creations:
+       * - Build the state.json file using the updater
+       * - Try to write it to Zookeeper (do not overwrite if it exists)
+       * - Fail (without retries) if write failed.
+       */
+
+      // Note we DO NOT track nor use the live nodes in the cluster state.
+      // That may means the two abstractions (collection metadata vs. nodes) should be separated.
+      // For now trying to diverge as little as possible from existing data structures and code given the need to
+      // support both the old way (Overseer) and new way (distributed) of handling cluster state update.
+      final Set<String> liveNodes = Collections.emptySet();
+
+      // Per Replica States updates are done before all other updates and not subject to the number of attempts of CAS
+      // made here, given they have their own CAS strategy and implementation (see PerReplicaStatesOps.persist()).
+      boolean firstAttempt = true;
+
+      // When there are multiple retries of state.json write and the cluster state gets updated over and over again with
+      // the changes done in the per replica states, we avoid refetching those multiple times.
+      PerReplicaStates fetchedPerReplicaStates = null;
+
+      // Later on (when Collection API commands are distributed) we will have to rely on the version of state.json
+      // to implement the replacement of Collection API locking. Then we should not blindly retry cluster state updates
+      // as we do here but instead intelligently fail (or retry completely) the Collection API call when seeing that
+      // state.json was changed by a concurrent command execution.
+      // The loop below is ok for distributing cluster state updates from Overseer to all nodes while Collection API
+      // commands are still executed on the Overseer and manage their locking the old fashioned way.
+      for (int attempt = 0; attempt < CAS_MAX_ATTEMPTS; attempt++) {
+        // Start by reading the current state.json (if this is an update).
+        // TODO Eventually rethink the way each node manages and caches its copy of the cluster state. Knowing about all collections in the cluster might not be needed.
+        ClusterState initialClusterState;
+        if (updater.isCollectionCreation()) {
+          initialClusterState = new ClusterState(liveNodes, Collections.emptyMap());
+        } else {
+          // Get the state for existing data in ZK (and if no data exists we should fail)
+          initialClusterState = fetchStateForCollection();
+        }
+
+        // Apply the desired changes. Note that the cluster state passed to the chain of mutators is totally up to date
+        // (it's read from ZK just above). So assumptions made in the mutators (like SliceMutator.removeReplica() deleting
+        // the whole collection if it's not found) are ok. Actually in the removeReplica case, the collection will always
+        // exist otherwise the call to fetchStateForCollection() above would have failed.
+        updater.computeUpdates(initialClusterState);
+
+        ClusterState updatedState = updater.getUpdatedClusterState();
+        List<PerReplicaStatesOps> allStatesOps = updater.getPerReplicaStatesOps();
+
+        if (firstAttempt && allStatesOps != null) {
+          // Do the per replica states updates (if any) before the state.json update (if any)
+          firstAttempt = false;
+
+          // The parent node of the per replica state nodes happens to be the node of state.json.
+          String prsParentNode = ZkStateReader.getCollectionPath(updater.getCollectionName());
+
+          for (PerReplicaStatesOps prso : allStatesOps) {
+            prso.persist(prsParentNode, zkStateReader.getZkClient());
+          }
+        }
+
+        if (updatedState == null) {
+          // No update to state.json needed
+          return;
+        }
+
+        // Get the latest version of the collection from the cluster state first.
+        // There is no notion of "cached" here (the boolean passed below) as we the updatedState is based on CollectionRef
+        DocCollection docCollection = updatedState.getCollectionOrNull(updater.getCollectionName(), true);
+
+        // If we did update per replica states and we're also updating state.json, update the content of state.json to reflect
+        // the changes made to replica states. Not strictly necessary (the state source of truth is in per replica states), but nice to have...
+        if (allStatesOps != null) {
+          if (docCollection != null) {
+            // Fetch the per replica states updates done previously or skip fetching if we already have them
+            fetchedPerReplicaStates = PerReplicaStates.fetch(docCollection.getZNode(), zkStateReader.getZkClient(), fetchedPerReplicaStates);
+            // Transpose the per replica states into the cluster state
+            updatedState = updatedState.copyWith(updater.getCollectionName(), docCollection.copyWith(fetchedPerReplicaStates));
+          }
+        }
+
+        try {
+          // Try to do a conditional update (a.k.a. CAS: compare and swap).
+          doStateDotJsonCasUpdate(updatedState);
+          return; // state.json updated successfully.
+        } catch (KeeperException.BadVersionException bve) {
+          if (updater.isCollectionCreation()) {
+            // Not expecting to see this exception when creating new state.json fails, so throwing it up the food chain.
+            throw bve;
+          }
+        }
+        // We've tried to update an existing state.json and got a BadVersionException. We'll try again a few times.
+        // When only two threads compete, no point in waiting: if we lost this time we'll get it next time right away.
+        // But if more threads compete, then waiting a bit (random delay) can improve our chances. The delay should likely
+        // be proportional to the time between reading the cluster state and updating it. We can measure it in the loop above.
+        // With "per replica states" collections, concurrent attempts of even just two threads are expected to be extremely rare.
+      }
+
+      // We made quite a few attempts but failed repeatedly. This is pretty bad but we can't loop trying forever.
+      // Offering a job to the Overseer wouldn't usually fail if the ZK queue can be written to (but the Overseer can then
+      // loop forever attempting the update).
+      // We do want whoever called us to fail right away rather than to wait for a cluster change and timeout because it
+      // didn't happen. Likely need to review call by call what is the appropriate behaviour, especially once Collection
+      // API is distributed (because then the Collection API call will fail if the underlying cluster state update cannot
+      // be done, and that's a desirable thing).
+      throw new KeeperException.BadVersionException(ZkStateReader.getCollectionPath(updater.getCollectionName()));
+    }
+
+    /**
+     * After the computing of the new {@link ClusterState} containing all needed updates to the collection based on what the
+     * {@link StateChangeCalculator} computed, this method does an update in ZK to the collection's {@code state.json}. It is the
+     * equivalent of Overseer's {@link ZkStateWriter#writePendingUpdates} (in its actions related to {@code state.json}
+     * as opposed to the per replica states).
+     * <p>
+     * Note that in a similar way to what happens in {@link ZkStateWriter#writePendingUpdates}, collection delete is handled
+     * as a special case. (see comment on {@link DistributedClusterStateUpdater.StateChangeRecorder.RecordedMutationsPlayer}
+     * on why the code has to be duplicated)<p>
+     *
+     * <b>Note for the future:</b> Given this method is where the actually write to ZK is done, that's the place where we
+     * can rebuild a DocCollection with updated zk version. Eventually if we maintain a cache of recently used collections,
+     * we want to capture the updated collection and put it in the cache to avoid reading it again (unless it changed,
+     * the CAS will fail and we will refresh).<p>
+     *
+     * This could serve as the basis for a strategy where each node does not need any view of all collections in the cluster
+     * but only a cache of recently used collections (possibly not even needing watches on them, but we'll discuss this later).
+     */
+    private void doStateDotJsonCasUpdate(ClusterState updatedState) throws KeeperException, InterruptedException {
+      String jsonPath = ZkStateReader.getCollectionPath(updater.getCollectionName());
+
+      // Collection delete
+      if (!updatedState.hasCollection(updater.getCollectionName())) {
+        // We do not have a collection znode version to test we delete the right version of state.json. But this doesn't really matter:
+        // if we had one, and the delete failed (because state.json got updated in the meantime), we would re-read the collection
+        // state, update our version, run the CAS delete again and it will pass. Which means that one way or another, deletes are final.
+        // I hope nobody deletes a collection then creates a new one with the same name immediately (although the creation should fail
+        // if the znode still exists, so the creation would only succeed after the delete made it, and we're ok).
+        // With Overseer based updates the same behavior can be observed: a collection update is enqueued followed by the
+        // collection delete before the update was executed.
+        log.debug("going to recursively delete state.json at {}", jsonPath);
+        zkStateReader.getZkClient().clean(jsonPath);
+      } else {
+        // Collection update or creation
+        DocCollection collection = updatedState.getCollection(updater.getCollectionName());
+        byte[] stateJson = Utils.toJSON(singletonMap(updater.getCollectionName(), collection));
+
+        if (updater.isCollectionCreation()) {
+          // The state.json file does not exist yet (more precisely it is assumed not to exist)
+          log.debug("going to create collection {}", jsonPath);
+          zkStateReader.getZkClient().create(jsonPath, stateJson, CreateMode.PERSISTENT, true);
+        } else {
+          // We're updating an existing state.json
+          if (log.isDebugEnabled()) {
+            log.debug("going to update collection {} version: {}", jsonPath, collection.getZNodeVersion());
+          }
+          zkStateReader.getZkClient().setData(jsonPath, stateJson, collection.getZNodeVersion(), true);
+        }
+      }
+    }
+
+    /**
+     * Creates a {@link ClusterState} with the state of an existing single collection, with no live nodes information.
+     * Eventually this state should be reused across calls if it is fresh enough... (we have to deal anyway with failures
+     * of conditional updates so trying to use non fresh data is ok, a second attempt will be made)
+     */
+    private ClusterState fetchStateForCollection() throws KeeperException, InterruptedException {
+      String collectionStatePath = ZkStateReader.getCollectionPath(updater.getCollectionName());
+      Stat stat = new Stat();
+      byte[] data = zkStateReader.getZkClient().getData(collectionStatePath, null, stat, true);
+      ClusterState clusterState = ClusterState.createFromJson(stat.getVersion(), data, Collections.emptySet());
+      return clusterState;
+    }
+  }
+
+  /**
+   * Class handling the distributed updates of collection's Zookeeper files {@code state.json} based on multiple updates
+   * applied to a single collection (as is sometimes done by *Cmd classes implementing the Collection API commands).<p>
+   * Previously these updates were sent one by one to Overseer and then grouped by org.apache.solr.cloud.Overseer.ClusterStateUpdater.
+   * <p>
+   * Records desired changes to {@code state.json} files in Zookeeper (as are done by the family of mutator classes such as
+   * {@link org.apache.solr.cloud.overseer.ClusterStateMutator}, {@link org.apache.solr.cloud.overseer.CollectionMutator}
+   * etc.) in order to be able to later execute them on the actual content of the {@code state.json} files using optimistic
+   * locking (and retry a few times if the optimistic locking failed).
+   * <p>
+   * Instances are <b>not</b> thread safe.
+   */
+  public static class StateChangeRecorder {
+    final List<Pair<MutatingCommand, ZkNodeProps>> mutations;
+    /**
+     * The collection name for which are all recorded commands
+     */
+    final String collectionName;
+    /**
+     * {@code true} if recorded commands assume creation of the collection {@code state.json} file.<br>
+     * {@code false} if an existing {@code state.json} is to be updated.<p>
+     * <p>
+     * This variable is used for defensive programming and catching issues. It might be removed once we're done removing and testing
+     * the distribution of the cluster state update updates.
+     */
+    final boolean isCollectionCreation;
+
+    /**
+     * For collection creation recording, there should be only one actual creation (and it should be the first recorded command
+     */
+    boolean creationCommandRecorded = false;
+
+    private StateChangeRecorder(String collectionName, boolean isCollectionCreation) {
+      if (collectionName == null) {
+        final String err = "Internal bug. collectionName=null (isCollectionCreation=" + isCollectionCreation + ")";
+        log.error(err);
+        throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, err);
+      }
+      mutations = new LinkedList<>();
+      this.collectionName = collectionName;
+      this.isCollectionCreation = isCollectionCreation;
+    }
+
+    /**
+     * Records a mutation method and its parameters so that it can be executed later to modify the corresponding Zookeeper state.
+     * Note the message is identical to the one used for communicating with Overseer (at least initially) so it also contains
+     * the action in parameter {@link org.apache.solr.cloud.Overseer#QUEUE_OPERATION}, but that value is ignored here
+     * in favor of the value passed in {@code command}.
+     *
+     * @param message the parameters associated with the command that are kept in the recorded mutations to be played
+     *                later. Note that this call usually replaces a call to {@link org.apache.solr.cloud.Overseer#offerStateUpdate(byte[])}
+     *                that is passed a <b>copy</b> of the data!<br>
+     *                This means that if  {@code message} passed in here is reused before the recorded commands are replayed,
+     *                things will break! Need to make sure all places calling this method do not reuse the data passed in
+     *                (otherwise need to make a copy).
+     */
+    public void record(MutatingCommand command, ZkNodeProps message) {
+      if (isCollectionCreation && !creationCommandRecorded) {
+        // First received command should be collection creation
+        if (!command.isCollectionCreation()) {
+          final String err = "Internal bug. Creation of collection " + collectionName + " unexpected command " + command.name();
+          log.error(err);
+          throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, err);
+        }
+        creationCommandRecorded = true;
+      } else {
+        // If collection creation already received or not expected, should not get (another) one
+        if (command.isCollectionCreation()) {
+          final String err = "Internal bug. Creation of collection " + collectionName + " unexpected command " +
+              command.name() + " (isCollectionCreation=" + isCollectionCreation + ", creationCommandRecorded=" + creationCommandRecorded + ")";
+          log.error(err);
+          throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, err);
+        }
+      }
+
+      if (!collectionName.equals(command.getCollectionName(message))) {
+        // All recorded commands must be for same collection
+        final String err = "Internal bug. State change for collection " + collectionName +
+            " received command " + command + " for collection " + command.getCollectionName(message);
+        log.error(err);
+        throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, err);
+      }
+
+      mutations.add(new Pair<>(command, message));
+    }
+
+    /**
+     * This class allows taking the initial (passed in) cluster state, applying to it cluster mutations and returning the resulting
+     * cluster state.
+     * <p>
+     * It is used to be able to try to apply multiple times a set of changes to cluster state when the Compare And Swap (conditional
+     * update) fails due to concurrent modification.
+     * <p>
+     * For each mutation, a {@link ZkWriteCommand} is first created (capturing how the mutation impacts the cluster state), this is
+     * the equivalent of what the Overseer is doing in ClusterStateUpdater.processMessage().<p>
+     * <p>
+     * Then, a new {@link ClusterState} is built by replacing the existing collection by its new value as computed in the
+     * {@link ZkWriteCommand}. This is done by Overseer in {@link ZkStateWriter#enqueueUpdate} (and {@link ZkStateWriter} is hard
+     * tu reuse because although it contains the logic for doing the update that would be needed here, it is coupled with the
+     * actual instance of {@link ClusterState} being maintained, the stream of updates to be applied to it and applying
+     * the per replica state changes).
+     */
+    private static class RecordedMutationsPlayer implements StateChangeCalculator {
+      private final SolrCloudManager scm;
+      private final String collectionName;
+      private final boolean isCollectionCreation;
+      final List<Pair<MutatingCommand, ZkNodeProps>> mutations;
+
+      // null means no update to state.json needed. Set in computeUpdates()
+      private ClusterState computedState = null;
+
+      // null means no updates needed to the per replica state znodes. Set in computeUpdates()
+      private List<PerReplicaStatesOps> replicaOpsList = null;
+
+      RecordedMutationsPlayer(SolrCloudManager scm, String collectionName, boolean isCollectionCreation, List<Pair<MutatingCommand, ZkNodeProps>> mutations) {
+        this.scm = scm;
+        this.collectionName = collectionName;
+        this.isCollectionCreation = isCollectionCreation;
+        this.mutations = mutations;
+      }
+
+      @Override
+      public String getCollectionName() {
+        return collectionName;
+      }
+
+      @Override
+      public boolean isCollectionCreation() {
+        return isCollectionCreation;
+      }
+
+      @Override
+      public void computeUpdates(ClusterState clusterState) {
+        boolean hasJsonUpdates = false;
+        List<PerReplicaStatesOps> perReplicaStateOps = new LinkedList<>();
+        for (Pair<MutatingCommand, ZkNodeProps> mutation : mutations) {
+          MutatingCommand mutatingCommand = mutation.first();
+          ZkNodeProps message = mutation.second();
+          try {
+            ZkWriteCommand zkcmd = mutatingCommand.buildWriteCommand(scm, clusterState, message);
+            if (zkcmd != ZkStateWriter.NO_OP) {
+              hasJsonUpdates = true;
+              clusterState = clusterState.copyWith(zkcmd.name, zkcmd.collection);
+            }
+            if (zkcmd.ops != null && zkcmd.ops.get() != null) {
+              perReplicaStateOps.add(zkcmd.ops);
+            }
+          } catch (Exception e) {
+            // Seems weird to skip rather than fail, but that's what Overseer is doing (see ClusterStateUpdater.processQueueItem()).
+            // Maybe in the new distributed update world we should make the caller fail? (something Overseer cluster state updater can't do)
+            // To be reconsidered once Collection API commands are distributed because then cluster updates are done synchronously and
+            // have the opportunity to make the Collection API call fail directly.
+            log.error("Distributed cluster state update could not process the current clusterstate state update message, skipping the message: {}", message, e);
+          }
+        }
+
+        computedState = hasJsonUpdates ? clusterState : null;
+        replicaOpsList = perReplicaStateOps.isEmpty() ? null : perReplicaStateOps;
+      }
+
+      @Override
+      public ClusterState getUpdatedClusterState() {
+        return computedState;
+      }
+
+      @Override
+      public List<PerReplicaStatesOps> getPerReplicaStatesOps() {
+        return replicaOpsList;
+      }
+    }
+
+    /**
+     * Using optimistic locking (and retries when needed) updates Zookeeper with the changes previously recorded by calls
+     * to {@link #record(MutatingCommand, ZkNodeProps)}.
+     */
+    public void executeStateUpdates(SolrCloudManager scm, ZkStateReader zkStateReader) throws KeeperException, InterruptedException {
+      if (log.isDebugEnabled()) {
+        log.debug("Executing updates for collection " + collectionName + ", is creation=" + isCollectionCreation + ", " + mutations.size() + " recorded mutations.", new Exception("StackTraceOnly")); // nowarn
+      }
+      if (mutations.isEmpty()) {
+        final String err = "Internal bug. Unexpected empty set of mutations to apply for collection " + collectionName;
+        log.error(err);
+        throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, err);
+      }
+
+      RecordedMutationsPlayer mutationPlayer = new RecordedMutationsPlayer(scm, collectionName, isCollectionCreation, mutations);
+      ZkUpdateApplicator.applyUpdate(zkStateReader, mutationPlayer);
+
+      // TODO update stats here for the various commands executed successfully or not?
+      // This would replace the stats about cluster state updates that the Collection API currently makes available using
+      // the OVERSEERSTATUS command, but obviously would be per node and will not have stats about queues (since there
+      // will be no queues). Would be useful in some tests though, for example TestSkipOverseerOperations.
+      // Probably better to rethink what types of stats are expected from a distributed system rather than trying to present
+      // those previously provided by a central server in the system (the Overseer).
+    }
+  }
+
+  /**
+   * This class handles the changes to be made as a result of a {@link OverseerAction#DOWNNODE} event.<p>
+   *
+   * Instances of this class deal with a single collection. Static method {@link #executeNodeDownStateUpdate} is the entry point
+   * dealing with a node going down and processing all collections.
+   */
+  private static class CollectionNodeDownChangeCalculator implements StateChangeCalculator {
+    private final String collectionName;
+    private final String nodeName;
+
+    // null means no update to state.json needed. Set in computeUpdates()
+    private ClusterState computedState = null;
+
+    // null means no updates needed to the per replica state znodes. Set in computeUpdates()
+    private List<PerReplicaStatesOps> replicaOpsList = null;
+
+    /**
+     * Entry point to mark all replicas of all collections present on a single node as being DOWN (because the node is down)
+     */
+    public static void executeNodeDownStateUpdate(String nodeName, ZkStateReader zkStateReader) {
+      // This code does a version of what NodeMutator.downNode() is doing. We can't assume we have a cache of the collections,
+      // so we're going to read all of them from ZK, fetch the state.json for each and if it has any replicas on the
+      // failed node, do an update (conditional of course) of the state.json
+
+      // For Per Replica States collections there is still a need to read state.json, but the update of state.json is replaced
+      // by a few znode deletions and creations. Might be faster or slower overall, depending on the number of impacted
+      // replicas of such a collection and the total size of that collection's state.json.
+
+      // Note code here also has to duplicate some of the work done in ZkStateReader because ZkStateReader couples reading of
+      // the cluster state and maintaining a cached copy of the cluster state. Something likely to be refactored later (once
+      // Overseer is totally removed and Zookeeper access patterns become clearer).
+
+      log.debug("DownNode state change invoked for node: {}", nodeName);
+
+      try {
+        final List<String> collectionNames = zkStateReader.getZkClient().getChildren(COLLECTIONS_ZKNODE, null, true);
+
+        // Collections are totally independent of each other. Multiple threads could share the load here (need a ZK connection for each though).
+        for (String collectionName : collectionNames) {
+          CollectionNodeDownChangeCalculator collectionUpdater = new CollectionNodeDownChangeCalculator(collectionName, nodeName);
+          ZkUpdateApplicator.applyUpdate(zkStateReader, collectionUpdater);
+        }
+      } catch (Exception e) {
+        if (e instanceof InterruptedException) {
+          Thread.currentThread().interrupt();
+        }
+        // Overseer behavior is to log an error and carry on when a message fails. See Overseer.ClusterStateUpdater.processQueueItem()
+        log.error("Could not successfully process DOWNNODE, giving up", e);
+      }
+    }
+
+    private CollectionNodeDownChangeCalculator(String collectionName, String nodeName) {
+      this.collectionName = collectionName;
+      this.nodeName = nodeName;
+    }
+
+    @Override
+    public String getCollectionName() {
+      return collectionName;
+    }
+
+    @Override
+    public boolean isCollectionCreation() {
+      return false;
+    }
+
+    @Override
+    public void computeUpdates(ClusterState clusterState) {
+      final DocCollection docCollection = clusterState.getCollectionOrNull(collectionName);
+      Optional<ZkWriteCommand> result = docCollection != null ? NodeMutator.computeCollectionUpdate(nodeName, collectionName, docCollection) : Optional.empty();
+
+      if (docCollection == null) {
+        // This is possible but should be rare. Logging warn in case it is seen often and likely a sign of another issue
+        log.warn("Processing DOWNNODE, collection " + collectionName + " disappeared during iteration"); // nowarn
+      }
+
+      if (result.isPresent()) {
+        ZkWriteCommand zkcmd = result.get();
+        computedState = (zkcmd != ZkStateWriter.NO_OP) ? clusterState.copyWith(zkcmd.name, zkcmd.collection) : null;
+        replicaOpsList = (zkcmd.ops != null && zkcmd.ops.get() != null) ? Collections.singletonList(zkcmd.ops) : null;
+      } else {
+        computedState = null;
+        replicaOpsList = null;
+      }
+    }
+
+    @Override
+    public ClusterState getUpdatedClusterState() {
+      return computedState;
+    }
+
+    @Override
+    public List<PerReplicaStatesOps> getPerReplicaStatesOps() {
+      return replicaOpsList;
+    }
+  }
+}
diff --git a/solr/core/src/java/org/apache/solr/cloud/Overseer.java b/solr/core/src/java/org/apache/solr/cloud/Overseer.java
index a3fd760..e131d8c 100644
--- a/solr/core/src/java/org/apache/solr/cloud/Overseer.java
+++ b/solr/core/src/java/org/apache/solr/cloud/Overseer.java
@@ -596,8 +596,6 @@ public class Overseer implements SolrCloseable {
 
   private OverseerThread updaterThread;
 
-  private OverseerThread triggerThread;
-
   private final ZkStateReader reader;
 
   private final HttpShardHandler shardHandler;
@@ -616,6 +614,7 @@ public class Overseer implements SolrCloseable {
   private volatile boolean systemCollCompatCheck = true;
 
   private CloudConfig config;
+  private final DistributedClusterStateUpdater distributedClusterStateUpdater;
 
   // overseer not responsible for closing reader
   public Overseer(HttpShardHandler shardHandler,
@@ -629,6 +628,7 @@ public class Overseer implements SolrCloseable {
     this.zkController = zkController;
     this.stats = new Stats();
     this.config = config;
+    this.distributedClusterStateUpdater = new DistributedClusterStateUpdater(config.getDistributedClusterStateUpdates());
 
     this.solrMetricsContext = new SolrMetricsContext(zkController.getCoreContainer().getMetricManager(), SolrInfoBean.Group.overseer.toString(), metricTag);
   }
@@ -650,7 +650,9 @@ public class Overseer implements SolrCloseable {
 
     ThreadGroup ccTg = new ThreadGroup("Overseer collection creation process.");
 
-    OverseerNodePrioritizer overseerPrioritizer = new OverseerNodePrioritizer(reader, getStateUpdateQueue(), adminPath, shardHandler.getShardHandlerFactory());
+    // Below is the only non test usage of the "cluster state update" queue even when distributed cluster state updates are enabled.
+    // That queue is used to tell the Overseer to quit. As long as we have an Overseer, we need to support this.
+    OverseerNodePrioritizer overseerPrioritizer = new OverseerNodePrioritizer(reader, this, adminPath, shardHandler.getShardHandlerFactory());
     overseerCollectionConfigSetProcessor = new OverseerCollectionConfigSetProcessor(reader, id, shardHandler, adminPath, stats, Overseer.this, overseerPrioritizer, solrMetricsContext);
     ccThread = new OverseerThread(ccTg, overseerCollectionConfigSetProcessor, "OverseerCollectionConfigSetProcessor-" + id);
     ccThread.setDaemon(true);
@@ -814,6 +816,10 @@ public class Overseer implements SolrCloseable {
     return zkController.getSolrCloudManager();
   }
 
+  public DistributedClusterStateUpdater getDistributedClusterStateUpdater() {
+    return distributedClusterStateUpdater;
+  }
+
   /**
    * For tests.
    * 
@@ -824,15 +830,6 @@ public class Overseer implements SolrCloseable {
     return updaterThread;
   }
 
-  /**
-   * For tests.
-   * @lucene.internal
-   * @return trigger thread
-   */
-  public synchronized OverseerThread getTriggerThread() {
-    return triggerThread;
-  }
-  
   public synchronized void close() {
     if (this.id != null) {
       log.info("Overseer (id={}) closing", id);
@@ -863,10 +860,6 @@ public class Overseer implements SolrCloseable {
       IOUtils.closeQuietly(ccThread);
       ccThread.interrupt();
     }
-    if (triggerThread != null)  {
-      IOUtils.closeQuietly(triggerThread);
-      triggerThread.interrupt();
-    }
     if (updaterThread != null) {
       try {
         updaterThread.join();
@@ -877,14 +870,8 @@ public class Overseer implements SolrCloseable {
         ccThread.join();
       } catch (InterruptedException e) {}
     }
-    if (triggerThread != null)  {
-      try {
-        triggerThread.join();
-      } catch (InterruptedException e)  {}
-    }
     updaterThread = null;
     ccThread = null;
-    triggerThread = null;
   }
 
   /**
@@ -904,6 +891,17 @@ public class Overseer implements SolrCloseable {
    * @return a {@link ZkDistributedQueue} object
    */
   ZkDistributedQueue getStateUpdateQueue() {
+    if (distributedClusterStateUpdater.isDistributedStateUpdate()) {
+      throw new IllegalStateException("Cluster state is done in a distributed way, should not try to access ZK queue");
+    }
+    return getStateUpdateQueue(new Stats());
+  }
+
+  /**
+   * Separated into its own method from {@link #getStateUpdateQueue()} that does the same thing because this one is legit
+   * to call even when cluster state updates are distributed whereas the other one is not.
+   */
+  ZkDistributedQueue getOverseerQuitNotificationQueue() {
     return getStateUpdateQueue(new Stats());
   }
 
@@ -1064,6 +1062,14 @@ public class Overseer implements SolrCloseable {
   }
 
   public void offerStateUpdate(byte[] data) throws KeeperException, InterruptedException {
+    // When cluster state update is distributed, the Overseer cluster state update queue should only ever receive QUIT messages.
+    // These go to sendQuitToOverseer for execution path clarity.
+    if (distributedClusterStateUpdater.isDistributedStateUpdate()) {
+      final ZkNodeProps message = ZkNodeProps.load(data);
+      final String operation = message.getStr(QUEUE_OPERATION);
+      log.error("Received unexpected message on Overseer cluster state updater for " + operation + " when distributed updates are configured"); // nowarn
+      throw new RuntimeException("Message " + operation + " offered to state update queue when distributed state update is configured.");
+    }
     if (zkController.getZkClient().isClosed()) {
       throw new AlreadyClosedException();
     }
@@ -1080,7 +1086,16 @@ public class Overseer implements SolrCloseable {
 
   public interface Message {
     ClusterState run(ClusterState clusterState, Overseer overseer) throws Exception;
-
   }
 
+  /**
+   * This method enqueues a QUIT message to the overseer of given id.
+   * Effect is similar to building the message then calling {@link #offerStateUpdate} but this method can legitimately be called
+   * when cluster state update is distributed (and Overseer cluster state updater not really used) while {@link #offerStateUpdate} is not.
+   * Note that sending "QUIT" to overseer is not a cluster state update and was likely added to this queue because it was simpler.
+   */
+  public void sendQuitToOverseer(String overseerId)  throws KeeperException, InterruptedException {
+    getOverseerQuitNotificationQueue().offer(
+        Utils.toJSON(new ZkNodeProps(Overseer.QUEUE_OPERATION, OverseerAction.QUIT.toLower(), ID, overseerId)));
+  }
 }
diff --git a/solr/core/src/java/org/apache/solr/cloud/OverseerNodePrioritizer.java b/solr/core/src/java/org/apache/solr/cloud/OverseerNodePrioritizer.java
index d532f03..a027a05 100644
--- a/solr/core/src/java/org/apache/solr/cloud/OverseerNodePrioritizer.java
+++ b/solr/core/src/java/org/apache/solr/cloud/OverseerNodePrioritizer.java
@@ -20,9 +20,7 @@ import java.lang.invoke.MethodHandles;
 import java.util.List;
 import java.util.Map;
 
-import org.apache.solr.cloud.overseer.OverseerAction;
 import org.apache.solr.common.cloud.SolrZkClient;
-import org.apache.solr.common.cloud.ZkNodeProps;
 import org.apache.solr.common.cloud.ZkStateReader;
 import org.apache.solr.common.params.CoreAdminParams;
 import org.apache.solr.common.params.CoreAdminParams.CoreAdminAction;
@@ -35,8 +33,6 @@ import org.apache.zookeeper.data.Stat;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import static org.apache.solr.common.params.CommonParams.ID;
-
 /**
  * Responsible for prioritization of Overseer nodes, for example with the
  * ADDROLE collection command.
@@ -49,13 +45,16 @@ public class OverseerNodePrioritizer {
   private final String adminPath;
   private final ShardHandlerFactory shardHandlerFactory;
 
-  private ZkDistributedQueue stateUpdateQueue;
+  /**
+   * Only used to send QUIT to the overseer
+   */
+  private final Overseer overseer;
 
-  public OverseerNodePrioritizer(ZkStateReader zkStateReader, ZkDistributedQueue stateUpdateQueue, String adminPath, ShardHandlerFactory shardHandlerFactory) {
+  public OverseerNodePrioritizer(ZkStateReader zkStateReader, Overseer overseer, String adminPath, ShardHandlerFactory shardHandlerFactory) {
     this.zkStateReader = zkStateReader;
     this.adminPath = adminPath;
     this.shardHandlerFactory = shardHandlerFactory;
-    this.stateUpdateQueue = stateUpdateQueue;
+    this.overseer = overseer;
   }
 
   public synchronized void prioritizeOverseerNodes(String overseerId) throws Exception {
@@ -95,10 +94,7 @@ public class OverseerNodePrioritizer {
       invokeOverseerOp(electionNodes.get(1), "rejoin");//ask second inline to go behind
     }
     //now ask the current leader to QUIT , so that the designate can takeover
-    stateUpdateQueue.offer(
-        Utils.toJSON(new ZkNodeProps(Overseer.QUEUE_OPERATION, OverseerAction.QUIT.toLower(),
-            ID, OverseerTaskProcessor.getLeaderId(zkStateReader.getZkClient()))));
-
+    overseer.sendQuitToOverseer(OverseerTaskProcessor.getLeaderId(zkStateReader.getZkClient()));
   }
 
   private void invokeOverseerOp(String electionNode, String op) {
diff --git a/solr/core/src/java/org/apache/solr/cloud/RefreshCollectionMessage.java b/solr/core/src/java/org/apache/solr/cloud/RefreshCollectionMessage.java
index 2f221af..c2053f9 100644
--- a/solr/core/src/java/org/apache/solr/cloud/RefreshCollectionMessage.java
+++ b/solr/core/src/java/org/apache/solr/cloud/RefreshCollectionMessage.java
@@ -24,7 +24,6 @@ import org.apache.zookeeper.data.Stat;
 
 /**
  * Refresh the Cluster State for a given collection
- *
  */
 public class RefreshCollectionMessage implements Overseer.Message {
   public final String collection;
@@ -44,7 +43,7 @@ public class RefreshCollectionMessage implements Overseer.Message {
       //our state is up to date
       return clusterState;
     } else {
-      coll = ZkStateReader.getCollectionLive(overseer.getZkStateReader(), collection);
+      coll = overseer.getZkStateReader().getCollectionLive(collection);
       return clusterState.copyWith(collection, coll);
     }
   }
diff --git a/solr/core/src/java/org/apache/solr/cloud/ShardLeaderElectionContext.java b/solr/core/src/java/org/apache/solr/cloud/ShardLeaderElectionContext.java
index 68b062e..9856198 100644
--- a/solr/core/src/java/org/apache/solr/cloud/ShardLeaderElectionContext.java
+++ b/solr/core/src/java/org/apache/solr/cloud/ShardLeaderElectionContext.java
@@ -51,16 +51,17 @@ final class ShardLeaderElectionContext extends ShardLeaderElectionContextBase {
 
   private final CoreContainer cc;
   private final SyncStrategy syncStrategy;
+  private final DistributedClusterStateUpdater distributedClusterStateUpdater;
 
   private volatile boolean isClosed = false;
 
   public ShardLeaderElectionContext(LeaderElector leaderElector,
                                     final String shardId, final String collection,
                                     final String coreNodeName, ZkNodeProps props, ZkController zkController, CoreContainer cc) {
-    super(leaderElector, shardId, collection, coreNodeName, props,
-        zkController);
+    super(leaderElector, shardId, collection, coreNodeName, props, zkController);
     this.cc = cc;
-    syncStrategy = new SyncStrategy(cc);
+    this.syncStrategy = new SyncStrategy(cc);
+    this.distributedClusterStateUpdater = zkController.getDistributedClusterStateUpdater();
   }
 
   @Override
@@ -116,7 +117,13 @@ final class ShardLeaderElectionContext extends ShardLeaderElectionContextBase {
         // Clear the leader in clusterstate. We only need to worry about this if there is actually more than one replica.
         ZkNodeProps m = new ZkNodeProps(Overseer.QUEUE_OPERATION, OverseerAction.LEADER.toLower(),
             ZkStateReader.SHARD_ID_PROP, shardId, ZkStateReader.COLLECTION_PROP, collection);
-        zkController.getOverseer().getStateUpdateQueue().offer(Utils.toJSON(m));
+
+        if (distributedClusterStateUpdater.isDistributedStateUpdate()) {
+          distributedClusterStateUpdater.doSingleStateUpdate(DistributedClusterStateUpdater.MutatingCommand.SliceSetShardLeader, m,
+              zkController.getSolrCloudManager(), zkStateReader);
+        } else {
+          zkController.getOverseer().getStateUpdateQueue().offer(Utils.toJSON(m));
+        }
       }
 
       if (!weAreReplacement) {
diff --git a/solr/core/src/java/org/apache/solr/cloud/ShardLeaderElectionContextBase.java b/solr/core/src/java/org/apache/solr/cloud/ShardLeaderElectionContextBase.java
index 531294a..2118ad0 100644
--- a/solr/core/src/java/org/apache/solr/cloud/ShardLeaderElectionContextBase.java
+++ b/solr/core/src/java/org/apache/solr/cloud/ShardLeaderElectionContextBase.java
@@ -175,8 +175,13 @@ class ShardLeaderElectionContextBase extends ElectionContext {
       assert zkController != null;
       assert zkController.getOverseer() != null;
       DocCollection coll = zkStateReader.getCollection(this.collection);
-      if (coll == null || ZkController.sendToOverseer(coll, id)) {
-        zkController.getOverseer().offerStateUpdate(Utils.toJSON(m));
+      if (coll == null || ZkController.updateStateDotJson(coll, id)) {
+        if (zkController.getDistributedClusterStateUpdater().isDistributedStateUpdate()) {
+          zkController.getDistributedClusterStateUpdater().doSingleStateUpdate(DistributedClusterStateUpdater.MutatingCommand.SliceSetShardLeader, m,
+              zkController.getSolrCloudManager(), zkStateReader);
+        } else {
+          zkController.getOverseer().offerStateUpdate(Utils.toJSON(m));
+        }
       } else {
         PerReplicaStates prs = PerReplicaStates.fetch(coll.getZNode(), zkClient, coll.getPerReplicaStates());
         PerReplicaStatesOps.flipLeader(zkStateReader.getClusterState().getCollection(collection).getSlice(shardId).getReplicaNames(), id, prs)
diff --git a/solr/core/src/java/org/apache/solr/cloud/ZkController.java b/solr/core/src/java/org/apache/solr/cloud/ZkController.java
index f5d6928..232b045 100644
--- a/solr/core/src/java/org/apache/solr/cloud/ZkController.java
+++ b/solr/core/src/java/org/apache/solr/cloud/ZkController.java
@@ -196,6 +196,8 @@ public class ZkController implements Closeable {
   private final CloudConfig cloudConfig;
   private final NodesSysPropsCacher sysPropsCacher;
 
+  private final DistributedClusterStateUpdater distributedClusterStateUpdater;
+
   private LeaderElector overseerElector;
 
   private Map<String, ReplicateFromLeader> replicateFromLeaders = new ConcurrentHashMap<>();
@@ -285,6 +287,9 @@ public class ZkController implements Closeable {
 
     this.cloudConfig = cloudConfig;
 
+    // Use the configured way to do cluster state update (Overseer queue vs distributed)
+    distributedClusterStateUpdater = new DistributedClusterStateUpdater(cloudConfig.getDistributedClusterStateUpdates());
+
     this.genericCoreNodeNames = cloudConfig.getGenericCoreNodeNames();
 
     // be forgiving and strip this off leading/trailing slashes
@@ -464,7 +469,11 @@ public class ZkController implements Closeable {
 
     init();
 
-    this.overseerJobQueue = overseer.getStateUpdateQueue();
+    if (distributedClusterStateUpdater.isDistributedStateUpdate()) {
+      this.overseerJobQueue = null;
+    } else {
+      this.overseerJobQueue = overseer.getStateUpdateQueue();
+    }
     this.overseerCollectionQueue = overseer.getCollectionQueue(zkClient);
     this.overseerConfigSetQueue = overseer.getConfigSetQueue(zkClient);
     this.sysPropsCacher = new NodesSysPropsCacher(getSolrCloudManager().getNodeStateProvider(),
@@ -757,6 +766,10 @@ public class ZkController implements Closeable {
     return zkStateReader.getClusterState();
   }
 
+  public DistributedClusterStateUpdater getDistributedClusterStateUpdater() {
+    return distributedClusterStateUpdater;
+  }
+
   public SolrCloudManager getSolrCloudManager() {
     if (cloudManager != null) {
       return cloudManager;
@@ -1533,7 +1546,7 @@ public class ZkController implements Closeable {
       String coreNodeName = cd.getCloudDescriptor().getCoreNodeName();
 
       Map<String,Object> props = new HashMap<>();
-      props.put(Overseer.QUEUE_OPERATION, "state");
+      props.put(Overseer.QUEUE_OPERATION, OverseerAction.STATE.toLower());
       props.put(ZkStateReader.STATE_PROP, state.toString());
       props.put(ZkStateReader.CORE_NAME_PROP, cd.getName());
       props.put(ZkStateReader.ROLES_PROP, cd.getCloudDescriptor().getRoles());
@@ -1585,8 +1598,13 @@ public class ZkController implements Closeable {
         cd.getCloudDescriptor().setLastPublished(state);
       }
       DocCollection coll = zkStateReader.getCollection(collection);
-      if (forcePublish || sendToOverseer(coll, coreNodeName)) {
-        overseerJobQueue.offer(Utils.toJSON(m));
+      if (forcePublish || updateStateDotJson(coll, coreNodeName)) {
+        if (distributedClusterStateUpdater.isDistributedStateUpdate()) {
+          distributedClusterStateUpdater.doSingleStateUpdate(DistributedClusterStateUpdater.MutatingCommand.ReplicaSetState, m,
+              getSolrCloudManager(), zkStateReader);
+        } else {
+          overseerJobQueue.offer(Utils.toJSON(m));
+        }
       } else {
         if (log.isDebugEnabled()) {
           log.debug("bypassed overseer for message : {}", Utils.toJSONString(m));
@@ -1601,9 +1619,9 @@ public class ZkController implements Closeable {
   }
 
   /**
-   * Whether a message needs to be sent to overseer or not
+   * Returns {@code true} if a message needs to be sent to overseer (or done in a distributed way) to update state.json for the collection
    */
-  static boolean sendToOverseer(DocCollection coll, String replicaName) {
+  static boolean updateStateDotJson(DocCollection coll, String replicaName) {
     if (coll == null) return true;
     if (!coll.isPerReplicaState()) return true;
     Replica r = coll.getReplica(replicaName);
@@ -1663,22 +1681,20 @@ public class ZkController implements Closeable {
     }
     CloudDescriptor cloudDescriptor = cd.getCloudDescriptor();
     if (removeCoreFromZk) {
-      ZkNodeProps m = new ZkNodeProps(Overseer.QUEUE_OPERATION,
-          OverseerAction.DELETECORE.toLower(), ZkStateReader.CORE_NAME_PROP, coreName,
+      ZkNodeProps m = new ZkNodeProps(Overseer.QUEUE_OPERATION, OverseerAction.DELETECORE.toLower(),
+          ZkStateReader.CORE_NAME_PROP, coreName,
           ZkStateReader.NODE_NAME_PROP, getNodeName(),
           ZkStateReader.COLLECTION_PROP, cloudDescriptor.getCollectionName(),
           ZkStateReader.CORE_NODE_NAME_PROP, coreNodeName);
-      overseerJobQueue.offer(Utils.toJSON(m));
+      if (distributedClusterStateUpdater.isDistributedStateUpdate()) {
+        distributedClusterStateUpdater.doSingleStateUpdate(DistributedClusterStateUpdater.MutatingCommand.SliceRemoveReplica, m,
+            getSolrCloudManager(), zkStateReader);
+      } else {
+        overseerJobQueue.offer(Utils.toJSON(m));
+      }
     }
   }
 
-  public void createCollection(String collection) throws Exception {
-    ZkNodeProps m = new ZkNodeProps(Overseer.QUEUE_OPERATION,
-        CollectionParams.CollectionAction.CREATE.toLower(), ZkStateReader.NODE_NAME_PROP, getNodeName(),
-        ZkStateReader.COLLECTION_PROP, collection);
-    overseerJobQueue.offer(Utils.toJSON(m));
-  }
-
   public ZkStateReader getZkStateReader() {
     return zkStateReader;
   }
@@ -2036,6 +2052,9 @@ public class ZkController implements Closeable {
   }
 
   public ZkDistributedQueue getOverseerJobQueue() {
+    if (distributedClusterStateUpdater.isDistributedStateUpdate()) {
+      throw new IllegalStateException("Cluster is configured with distributed state update, not expecting the queue to be retrieved");
+    }
     return overseerJobQueue;
   }
 
@@ -2632,17 +2651,26 @@ public class ZkController implements Closeable {
    */
   public void publishNodeAsDown(String nodeName) {
     log.info("Publish node={} as DOWN", nodeName);
-    ZkNodeProps m = new ZkNodeProps(Overseer.QUEUE_OPERATION, OverseerAction.DOWNNODE.toLower(),
-        ZkStateReader.NODE_NAME_PROP, nodeName);
-    try {
-      overseer.getStateUpdateQueue().offer(Utils.toJSON(m));
-    } catch (AlreadyClosedException e) {
-      log.info("Not publishing node as DOWN because a resource required to do so is already closed.");
-    } catch (InterruptedException e) {
-      Thread.currentThread().interrupt();
-      log.debug("Publish node as down was interrupted.");
-    } catch (KeeperException e) {
-      log.warn("Could not publish node as down: ", e);
+    if (distributedClusterStateUpdater.isDistributedStateUpdate()) {
+      // Note that with the current implementation, when distributed cluster state updates are enabled, we mark the node
+      // down synchronously from this thread, whereas the Overseer cluster state update frees this thread right away and
+      // the Overseer will async mark the node down but updating all affected collections.
+      // If this is an issue (i.e. takes too long), then the call below should be executed from another thread so that
+      // the calling thread can immediately return.
+      distributedClusterStateUpdater.executeNodeDownStateUpdate(nodeName, zkStateReader);
+    } else {
+      ZkNodeProps m = new ZkNodeProps(Overseer.QUEUE_OPERATION, OverseerAction.DOWNNODE.toLower(),
+          ZkStateReader.NODE_NAME_PROP, nodeName);
+      try {
+        overseer.getStateUpdateQueue().offer(Utils.toJSON(m));
+      } catch (AlreadyClosedException e) {
+        log.info("Not publishing node as DOWN because a resource required to do so is already closed.");
+      } catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
+        log.debug("Publish node as down was interrupted.");
+      } catch (KeeperException e) {
+        log.warn("Could not publish node as down: ", e);
+      }
     }
   }
 
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/AddReplicaCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/AddReplicaCmd.java
index 6e34b0f..a655b76 100644
--- a/solr/core/src/java/org/apache/solr/cloud/api/collections/AddReplicaCmd.java
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/AddReplicaCmd.java
@@ -43,6 +43,7 @@ import java.util.stream.Collectors;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.solr.client.solrj.cloud.SolrCloudManager;
 import org.apache.solr.cloud.ActiveReplicaWatcher;
+import org.apache.solr.cloud.DistributedClusterStateUpdater;
 import org.apache.solr.cloud.Overseer;
 import org.apache.solr.cloud.api.collections.OverseerCollectionMessageHandler.ShardRequestTracker;
 import org.apache.solr.common.SolrCloseableLatch;
@@ -197,8 +198,6 @@ public class AddReplicaCmd implements OverseerCollectionMessageHandler.Cmd {
   }
 
   private ModifiableSolrParams getReplicaParams(ClusterState clusterState, ZkNodeProps message, @SuppressWarnings({"rawtypes"})NamedList results, String collectionName, DocCollection coll, boolean skipCreateReplicaInClusterState, String asyncId, ShardHandler shardHandler, CreateReplica createReplica) throws IOException, InterruptedException, KeeperException {
-    ModifiableSolrParams params = new ModifiableSolrParams();
-
     ZkStateReader zkStateReader = ocmh.zkStateReader;
     if (!skipCreateReplicaInClusterState) {
       ZkNodeProps props = new ZkNodeProps(
@@ -212,12 +211,19 @@ public class AddReplicaCmd implements OverseerCollectionMessageHandler.Cmd {
       if (createReplica.coreNodeName != null) {
         props = props.plus(ZkStateReader.CORE_NODE_NAME_PROP, createReplica.coreNodeName);
       }
-      try {
-        ocmh.overseer.offerStateUpdate(Utils.toJSON(props));
-      } catch (Exception e) {
-        throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Exception updating Overseer state queue", e);
+      if (ocmh.getDistributedClusterStateUpdater().isDistributedStateUpdate()) {
+        ocmh.getDistributedClusterStateUpdater().doSingleStateUpdate(DistributedClusterStateUpdater.MutatingCommand.SliceAddReplica, props,
+            ocmh.cloudManager, ocmh.zkStateReader);
+      } else {
+        try {
+          ocmh.overseer.offerStateUpdate(Utils.toJSON(props));
+        } catch (Exception e) {
+          throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Exception updating Overseer state queue", e);
+        }
       }
     }
+
+    ModifiableSolrParams params = new ModifiableSolrParams();
     params.set(CoreAdminParams.CORE_NODE_NAME,
         ocmh.waitToSeeReplicasInState(collectionName, Collections.singleton(createReplica.coreName)).get(createReplica.coreName).getName());
 
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/AliasCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/AliasCmd.java
index 611bd2d..fb41150 100644
--- a/solr/core/src/java/org/apache/solr/cloud/api/collections/AliasCmd.java
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/AliasCmd.java
@@ -74,7 +74,7 @@ abstract class AliasCmd implements OverseerCollectionMessageHandler.Cmd {
         new LocalSolrQueryRequest(null, createReqParams),
         null,
         ocmh.overseer.getCoreContainer().getCollectionsHandler());
-    createMsgMap.put(Overseer.QUEUE_OPERATION, "create");
+    createMsgMap.put(Overseer.QUEUE_OPERATION, CollectionParams.CollectionAction.CREATE.toLower());
 
     NamedList results = new NamedList();
     try {
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateCollectionCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateCollectionCmd.java
index d95501e..c10e9ca 100644
--- a/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateCollectionCmd.java
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateCollectionCmd.java
@@ -38,6 +38,7 @@ import org.apache.solr.client.solrj.cloud.DistribStateManager;
 import org.apache.solr.client.solrj.cloud.NotEmptyException;
 import org.apache.solr.client.solrj.cloud.SolrCloudManager;
 import org.apache.solr.client.solrj.cloud.VersionedData;
+import org.apache.solr.cloud.DistributedClusterStateUpdater;
 import org.apache.solr.cloud.Overseer;
 import org.apache.solr.cloud.RefreshCollectionMessage;
 import org.apache.solr.cloud.ZkController;
@@ -145,17 +146,35 @@ public class CreateCollectionCmd implements OverseerCollectionMessageHandler.Cmd
 
       createCollectionZkNode(stateManager, collectionName, collectionParams);
 
+      // Note that in code below there are two main execution paths: Overseer based cluster state updates and distributed
+      // cluster state updates (look for isDistributedStateUpdate() conditions).
+      //
+      // PerReplicaStates (PRS) collections follow a hybrid approach. Even when the cluster is Overseer cluster state update based,
+      // these collections are created locally then the cluster state updater is notified (look for usage of RefreshCollectionMessage).
+      // This explains why PRS collections have less diverging execution paths between distributed or Overseer based cluster state updates.
+
       if (isPRS) {
         // In case of a PRS collection, create the collection structure directly instead of resubmitting
         // to the overseer queue.
         // TODO: Consider doing this for all collections, not just the PRS collections.
+        // TODO comment above achieved by switching the cluster to distributed state updates
+
+        // This code directly updates Zookeeper by creating the collection state.json. It is compatible with both distributed
+        // cluster state updates and Overseer based cluster state updates.
         ZkWriteCommand command = new ClusterStateMutator(ocmh.cloudManager).createCollection(clusterState, message);
         byte[] data = Utils.toJSON(Collections.singletonMap(collectionName, command.collection));
         ocmh.zkStateReader.getZkClient().create(collectionPath, data, CreateMode.PERSISTENT, true);
         clusterState = clusterState.copyWith(collectionName, command.collection);
         newColl = command.collection;
       } else {
-        ocmh.overseer.offerStateUpdate(Utils.toJSON(message));
+        if (ocmh.getDistributedClusterStateUpdater().isDistributedStateUpdate()) {
+          // The message has been crafted by CollectionsHandler.CollectionOperation.CREATE_OP and defines the QUEUE_OPERATION
+          // to be CollectionParams.CollectionAction.CREATE.
+          ocmh.getDistributedClusterStateUpdater().doSingleStateUpdate(DistributedClusterStateUpdater.MutatingCommand.ClusterCreateCollection, message,
+              ocmh.cloudManager, ocmh.zkStateReader);
+        } else {
+          ocmh.overseer.offerStateUpdate(Utils.toJSON(message));
+        }
 
         // wait for a while until we see the collection
         TimeOut waitUntil = new TimeOut(30, TimeUnit.SECONDS, timeSource);
@@ -169,14 +188,13 @@ public class CreateCollectionCmd implements OverseerCollectionMessageHandler.Cmd
           throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Could not fully create collection: " + collectionName);
         }
 
-        // refresh cluster state
+        // refresh cluster state (value read below comes from Zookeeper watch firing following the update done previously,
+        // be it by Overseer or by this thread when updates are distributed)
         clusterState = ocmh.cloudManager.getClusterStateProvider().getClusterState();
         newColl = clusterState.getCollection(collectionName);
-
       }
 
-
-      List<ReplicaPosition> replicaPositions = null;
+      final List<ReplicaPosition> replicaPositions;
       try {
         replicaPositions = buildReplicaPositions(ocmh.overseer.getCoreContainer(), ocmh.cloudManager, clusterState, newColl,
             message, shardNames);
@@ -199,6 +217,17 @@ public class CreateCollectionCmd implements OverseerCollectionMessageHandler.Cmd
       }
       Map<String,ShardRequest> coresToCreate = new LinkedHashMap<>();
       ShardHandler shardHandler = ocmh.shardHandlerFactory.getShardHandler();
+      final DistributedClusterStateUpdater.StateChangeRecorder scr;
+
+      // PRS collections update Zookeeper directly, so even if we run in distributed state update,
+      // there's nothing to update in state.json for such collection in the loop over replica positions below.
+      if (!isPRS && ocmh.getDistributedClusterStateUpdater().isDistributedStateUpdate()) {
+        // The collection got created. Now we're adding replicas (and will update ZK only once when done adding).
+        scr = ocmh.getDistributedClusterStateUpdater().createStateChangeRecorder(collectionName, false);;
+      } else {
+        scr = null;
+      }
+
       for (ReplicaPosition replicaPosition : replicaPositions) {
         String nodeName = replicaPosition.node;
 
@@ -226,6 +255,8 @@ public class CreateCollectionCmd implements OverseerCollectionMessageHandler.Cmd
           // In case of a PRS collection, execute the ADDREPLICA directly instead of resubmitting
           // to the overseer queue.
           // TODO: Consider doing this for all collections, not just the PRS collections.
+
+          // This PRS specific code is compatible with both Overseer and distributed cluster state update strategies
           ZkWriteCommand command = new SliceMutator(ocmh.cloudManager).addReplica(clusterState, props);
           byte[] data = Utils.toJSON(Collections.singletonMap(collectionName, command.collection));
 //        log.info("collection updated : {}", new String(data, StandardCharsets.UTF_8));
@@ -233,7 +264,11 @@ public class CreateCollectionCmd implements OverseerCollectionMessageHandler.Cmd
           clusterState = clusterState.copyWith(collectionName, command.collection);
           newColl = command.collection;
         } else {
-          ocmh.overseer.offerStateUpdate(Utils.toJSON(props));
+          if (ocmh.getDistributedClusterStateUpdater().isDistributedStateUpdate()) {
+            scr.record(DistributedClusterStateUpdater.MutatingCommand.SliceAddReplica, props);
+          } else {
+            ocmh.overseer.offerStateUpdate(Utils.toJSON(props));
+          }
         }
 
         // Need to create new params for each request
@@ -266,14 +301,20 @@ public class CreateCollectionCmd implements OverseerCollectionMessageHandler.Cmd
         coresToCreate.put(coreName, sreq);
       }
 
-      // wait for all replica entries to be created
-      Map<String, Replica> replicas ;
+      // PRS collections did their own thing and we didn't create a StateChangeRecorder for them
+      if (!isPRS && ocmh.getDistributedClusterStateUpdater().isDistributedStateUpdate()) {
+        // Add the replicas to the collection state (all at once after the loop above)
+        scr.executeStateUpdates(ocmh.cloudManager, ocmh.zkStateReader);
+      }
+
+      final Map<String, Replica> replicas;
       if (isPRS) {
         replicas = new ConcurrentHashMap<>();
         newColl.getSlices().stream().flatMap(slice -> slice.getReplicas().stream())
                 .filter(r -> coresToCreate.containsKey(r.getCoreName()))       // Only the elements that were asked for...
                 .forEach(r -> replicas.putIfAbsent(r.getCoreName(), r)); // ...get added to the map
       } else {
+        // wait for all replica entries to be created and visible in local cluster state (updated by ZK watches)
         replicas = ocmh.waitToSeeReplicasInState(collectionName, coresToCreate.keySet());
       }
 
@@ -286,7 +327,7 @@ public class CreateCollectionCmd implements OverseerCollectionMessageHandler.Cmd
       shardRequestTracker.processResponses(results, shardHandler, false, null, Collections.emptySet());
       @SuppressWarnings({"rawtypes"})
       boolean failure = results.get("failure") != null && ((SimpleOrderedMap)results.get("failure")).size() > 0;
-      if(isPRS) {
+      if (isPRS) {
         TimeOut timeout = new TimeOut(Integer.getInteger("solr.waitToSeeReplicasInStateTimeoutSeconds", 120), TimeUnit.SECONDS, timeSource); // could be a big cluster
         PerReplicaStates prs = PerReplicaStates.fetch(collectionPath, ocmh.zkStateReader.getZkClient(), null);
         while (!timeout.hasTimedOut()) {
@@ -299,9 +340,14 @@ public class CreateCollectionCmd implements OverseerCollectionMessageHandler.Cmd
         } else {
           failure = true;
         }
-        // Now ask Overseer to fetch the latest state of collection
-        // from ZK
-        ocmh.overseer.submit(new RefreshCollectionMessage(collectionName));
+        // When cluster state updates are distributed, Overseer state updater is not used and doesn't have to be notified
+        // of a new collection created elsewhere (which is how all collections are created).
+        // Note it is likely possibly to skip the the whole if (isPRS) bloc, but keeping distributed state updates as
+        // close in behavior to Overseer state updates for now.
+        if (!ocmh.getDistributedClusterStateUpdater().isDistributedStateUpdate()) {
+          // Now ask Overseer to fetch the latest state of collection from ZK
+          ocmh.overseer.submit(new RefreshCollectionMessage(collectionName));
+        }
       }
       if (failure) {
         // Let's cleanup as we hit an exception
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateShardCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateShardCmd.java
index 989003a..04eb0f9 100644
--- a/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateShardCmd.java
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateShardCmd.java
@@ -21,6 +21,7 @@ import java.lang.invoke.MethodHandles;
 import java.util.HashMap;
 import java.util.Map;
 
+import org.apache.solr.cloud.DistributedClusterStateUpdater;
 import org.apache.solr.common.SolrException;
 import org.apache.solr.common.cloud.ClusterState;
 import org.apache.solr.common.cloud.DocCollection;
@@ -78,10 +79,17 @@ public class CreateShardCmd implements OverseerCollectionMessageHandler.Cmd {
       throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, NRT_REPLICAS + " + " + TLOG_REPLICAS + " must be greater than 0");
     }
 
-    //ZkStateReader zkStateReader = ocmh.zkStateReader;
-    ocmh.overseer.offerStateUpdate(Utils.toJSON(message));
-    // wait for a while until we see the shard
-    //ocmh.waitForNewShard(collectionName, sliceName);
+    if (ocmh.getDistributedClusterStateUpdater().isDistributedStateUpdate()) {
+      // The message has been crafted by CollectionsHandler.CollectionOperation.CREATESHARD_OP and defines the QUEUE_OPERATION
+      // to be CollectionParams.CollectionAction.CREATESHARD.
+      // Likely a bug here (distributed or Overseer based) as we use the collection alias name and not the real name?
+      ocmh.getDistributedClusterStateUpdater().doSingleStateUpdate(DistributedClusterStateUpdater.MutatingCommand.CollectionCreateShard, message,
+          ocmh.cloudManager, ocmh.zkStateReader);
+    } else {
+      // message contains extCollectionName that might be an alias. Unclear (to me) how this works in that case.
+      ocmh.overseer.offerStateUpdate(Utils.toJSON(message));
+    }
+
     // wait for a while until we see the shard and update the local view of the cluster state
     clusterState = ocmh.waitForNewShard(collectionName, sliceName);
 
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteCollectionCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteCollectionCmd.java
index 3b7eb93..4d2129f 100644
--- a/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteCollectionCmd.java
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteCollectionCmd.java
@@ -28,6 +28,7 @@ import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 
+import org.apache.solr.cloud.DistributedClusterStateUpdater;
 import org.apache.solr.cloud.Overseer;
 import org.apache.solr.common.NonExistentCoreException;
 import org.apache.solr.common.SolrException;
@@ -41,7 +42,6 @@ import org.apache.solr.common.cloud.ZkStateReader;
 import org.apache.solr.common.params.CoreAdminParams;
 import org.apache.solr.common.params.ModifiableSolrParams;
 import org.apache.solr.common.util.NamedList;
-import org.apache.solr.common.util.TimeSource;
 import org.apache.solr.common.util.Utils;
 import org.apache.solr.core.SolrInfoBean;
 import org.apache.solr.core.snapshots.SolrSnapshotManager;
@@ -63,11 +63,9 @@ public class DeleteCollectionCmd implements OverseerCollectionMessageHandler.Cmd
   private static final Set<String> okayExceptions = Collections.singleton(NonExistentCoreException.class.getName());
 
   private final OverseerCollectionMessageHandler ocmh;
-  private final TimeSource timeSource;
 
   public DeleteCollectionCmd(OverseerCollectionMessageHandler ocmh) {
     this.ocmh = ocmh;
-    this.timeSource = ocmh.cloudManager.getTimeSource();
   }
 
   @Override
@@ -150,7 +148,12 @@ public class DeleteCollectionCmd implements OverseerCollectionMessageHandler.Cmd
       }
 
       ZkNodeProps m = new ZkNodeProps(Overseer.QUEUE_OPERATION, DELETE.toLower(), NAME, collection);
-      ocmh.overseer.offerStateUpdate(Utils.toJSON(m));
+      if (ocmh.getDistributedClusterStateUpdater().isDistributedStateUpdate()) {
+        ocmh.getDistributedClusterStateUpdater().doSingleStateUpdate(DistributedClusterStateUpdater.MutatingCommand.ClusterDeleteCollection, m,
+            ocmh.cloudManager, ocmh.zkStateReader);
+      } else {
+        ocmh.overseer.offerStateUpdate(Utils.toJSON(m));
+      }
 
       // wait for a while until we don't see the collection
       zkStateReader.waitForState(collection, 60, TimeUnit.SECONDS, Objects::isNull);
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteShardCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteShardCmd.java
index ff7edfa..79948c2 100644
--- a/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteShardCmd.java
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteShardCmd.java
@@ -34,6 +34,7 @@ import java.util.Map;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 
+import org.apache.solr.cloud.DistributedClusterStateUpdater;
 import org.apache.solr.cloud.Overseer;
 import org.apache.solr.cloud.overseer.OverseerAction;
 import org.apache.solr.common.SolrException;
@@ -97,7 +98,19 @@ public class DeleteShardCmd implements OverseerCollectionMessageHandler.Cmd {
       propMap.put(sliceId, Slice.State.CONSTRUCTION.toString());
       propMap.put(ZkStateReader.COLLECTION_PROP, collectionName);
       ZkNodeProps m = new ZkNodeProps(propMap);
-      ocmh.overseer.offerStateUpdate(Utils.toJSON(m));
+      if (ocmh.getDistributedClusterStateUpdater().isDistributedStateUpdate()) {
+        // In this DeleteShardCmd.call() method there are potentially two cluster state updates. This is the first one.
+        // Even though the code of this method does not wait for it to complete, it does call the Collection API before
+        // it issues the second state change below. The collection API will be doing its own state change(s), and these will
+        // happen after this one (given it's for the same collection). Therefore we persist this state change
+        // immediately and do not group it with the one done further down.
+        // Once the Collection API is also distributed (and not only the cluster state updates), we will likely be able
+        // to batch more/all cluster state updates done by this command (DeleteShardCmd). TODO SOLR-15146
+        ocmh.getDistributedClusterStateUpdater().doSingleStateUpdate(DistributedClusterStateUpdater.MutatingCommand.SliceUpdateShardState, m,
+            ocmh.cloudManager, ocmh.zkStateReader);
+      } else {
+        ocmh.overseer.offerStateUpdate(Utils.toJSON(m));
+      }
     }
 
     String asyncId = message.getStr(ASYNC);
@@ -144,7 +157,12 @@ public class DeleteShardCmd implements OverseerCollectionMessageHandler.Cmd {
       ZkNodeProps m = new ZkNodeProps(Overseer.QUEUE_OPERATION, DELETESHARD.toLower(), ZkStateReader.COLLECTION_PROP,
           collectionName, ZkStateReader.SHARD_ID_PROP, sliceId);
       ZkStateReader zkStateReader = ocmh.zkStateReader;
-      ocmh.overseer.offerStateUpdate(Utils.toJSON(m));
+      if (ocmh.getDistributedClusterStateUpdater().isDistributedStateUpdate()) {
+        ocmh.getDistributedClusterStateUpdater().doSingleStateUpdate(DistributedClusterStateUpdater.MutatingCommand.CollectionDeleteShard, m,
+            ocmh.cloudManager, ocmh.zkStateReader);
+      } else {
+        ocmh.overseer.offerStateUpdate(Utils.toJSON(m));
+      }
 
       zkStateReader.waitForState(collectionName, 45, TimeUnit.SECONDS, (c) -> c.getSlice(sliceId) == null);
 
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/MigrateCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/MigrateCmd.java
index 2b094b2..1b4b019 100644
--- a/solr/core/src/java/org/apache/solr/cloud/api/collections/MigrateCmd.java
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/MigrateCmd.java
@@ -24,6 +24,7 @@ import java.util.Map;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.solr.client.solrj.request.CoreAdminRequest;
+import org.apache.solr.cloud.DistributedClusterStateUpdater;
 import org.apache.solr.cloud.Overseer;
 import org.apache.solr.cloud.api.collections.OverseerCollectionMessageHandler.ShardRequestTracker;
 import org.apache.solr.cloud.overseer.OverseerAction;
@@ -201,7 +202,12 @@ public class MigrateCmd implements OverseerCollectionMessageHandler.Cmd {
         "targetCollection", targetCollection.getName(),
         "expireAt", RoutingRule.makeExpiryAt(timeout));
     log.info("Adding routing rule: {}", m);
-    ocmh.overseer.offerStateUpdate(Utils.toJSON(m));
+    if (ocmh.getDistributedClusterStateUpdater().isDistributedStateUpdate()) {
+      ocmh.getDistributedClusterStateUpdater().doSingleStateUpdate(DistributedClusterStateUpdater.MutatingCommand.SliceAddRoutingRule, m,
+          ocmh.cloudManager, ocmh.zkStateReader);
+    } else {
+      ocmh.overseer.offerStateUpdate(Utils.toJSON(m));
+    }
 
     // wait for a while until we see the new rule
     log.info("Waiting to see routing rule updated in clusterstate");
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/OverseerCollectionMessageHandler.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/OverseerCollectionMessageHandler.java
index 53a02fa..c66a65f 100644
--- a/solr/core/src/java/org/apache/solr/cloud/api/collections/OverseerCollectionMessageHandler.java
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/OverseerCollectionMessageHandler.java
@@ -29,6 +29,7 @@ import org.apache.solr.client.solrj.impl.HttpSolrClient;
 import org.apache.solr.client.solrj.request.AbstractUpdateRequest;
 import org.apache.solr.client.solrj.request.UpdateRequest;
 import org.apache.solr.client.solrj.response.UpdateResponse;
+import org.apache.solr.cloud.DistributedClusterStateUpdater;
 import org.apache.solr.cloud.LockTree;
 import org.apache.solr.cloud.Overseer;
 import org.apache.solr.cloud.OverseerMessageHandler;
@@ -149,6 +150,7 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler,
   String myId;
   Stats stats;
   TimeSource timeSource;
+  private final DistributedClusterStateUpdater distributedClusterStateUpdater;
 
   // Set that tracks collections that are currently being processed by a running task.
   // This is used for handling mutual exclusion of the tasks.
@@ -186,6 +188,7 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler,
     this.myId = myId;
     this.stats = stats;
     this.overseer = overseer;
+    this.distributedClusterStateUpdater = overseer.getDistributedClusterStateUpdater();
     this.cloudManager = overseer.getSolrCloudManager();
     this.timeSource = cloudManager.getTimeSource();
     this.isClosed = false;
@@ -267,6 +270,10 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler,
     return new OverseerSolrResponse(results);
   }
 
+  DistributedClusterStateUpdater getDistributedClusterStateUpdater() {
+    return distributedClusterStateUpdater;
+  }
+
   @SuppressForbidden(reason = "Needs currentTimeMillis for mock requests")
   @SuppressWarnings({"unchecked"})
   private void mockOperation(ClusterState state, ZkNodeProps message, @SuppressWarnings({"rawtypes"})NamedList results) throws InterruptedException {
@@ -328,23 +335,31 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler,
   private void processReplicaAddPropertyCommand(ClusterState clusterState, ZkNodeProps message, @SuppressWarnings({"rawtypes"})NamedList results)
       throws Exception {
     checkRequired(message, COLLECTION_PROP, SHARD_ID_PROP, REPLICA_PROP, PROPERTY_PROP, PROPERTY_VALUE_PROP);
-    SolrZkClient zkClient = zkStateReader.getZkClient();
     Map<String, Object> propMap = new HashMap<>();
     propMap.put(Overseer.QUEUE_OPERATION, ADDREPLICAPROP.toLower());
     propMap.putAll(message.getProperties());
     ZkNodeProps m = new ZkNodeProps(propMap);
-    overseer.offerStateUpdate(Utils.toJSON(m));
+    if (distributedClusterStateUpdater.isDistributedStateUpdate()) {
+      distributedClusterStateUpdater.doSingleStateUpdate(DistributedClusterStateUpdater.MutatingCommand.ReplicaAddReplicaProperty, m,
+          cloudManager, zkStateReader);
+    } else {
+      overseer.offerStateUpdate(Utils.toJSON(m));
+    }
   }
 
   private void processReplicaDeletePropertyCommand(ClusterState clusterState, ZkNodeProps message, @SuppressWarnings({"rawtypes"})NamedList results)
       throws Exception {
     checkRequired(message, COLLECTION_PROP, SHARD_ID_PROP, REPLICA_PROP, PROPERTY_PROP);
-    SolrZkClient zkClient = zkStateReader.getZkClient();
     Map<String, Object> propMap = new HashMap<>();
     propMap.put(Overseer.QUEUE_OPERATION, DELETEREPLICAPROP.toLower());
     propMap.putAll(message.getProperties());
     ZkNodeProps m = new ZkNodeProps(propMap);
-    overseer.offerStateUpdate(Utils.toJSON(m));
+    if (distributedClusterStateUpdater.isDistributedStateUpdate()) {
+      distributedClusterStateUpdater.doSingleStateUpdate(DistributedClusterStateUpdater.MutatingCommand.ReplicaDeleteReplicaProperty, m,
+          cloudManager, zkStateReader);
+    } else {
+      overseer.offerStateUpdate(Utils.toJSON(m));
+    }
   }
 
   private void balanceProperty(ClusterState clusterState, ZkNodeProps message, @SuppressWarnings({"rawtypes"})NamedList results) throws Exception {
@@ -353,11 +368,15 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler,
           "The '" + COLLECTION_PROP + "' and '" + PROPERTY_PROP +
               "' parameters are required for the BALANCESHARDUNIQUE operation, no action taken");
     }
-    SolrZkClient zkClient = zkStateReader.getZkClient();
     Map<String, Object> m = new HashMap<>();
     m.put(Overseer.QUEUE_OPERATION, BALANCESHARDUNIQUE.toLower());
     m.putAll(message.getProperties());
-    overseer.offerStateUpdate(Utils.toJSON(m));
+    if (distributedClusterStateUpdater.isDistributedStateUpdate()) {
+      distributedClusterStateUpdater.doSingleStateUpdate(DistributedClusterStateUpdater.MutatingCommand.BalanceShardsUnique, new ZkNodeProps(m),
+          cloudManager, zkStateReader);
+    } else {
+      overseer.offerStateUpdate(Utils.toJSON(m));
+    }
   }
 
   /**
@@ -424,7 +443,12 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler,
         ZkStateReader.NODE_NAME_PROP, replica.getNodeName(),
         ZkStateReader.COLLECTION_PROP, collectionName,
         ZkStateReader.CORE_NODE_NAME_PROP, replicaName);
-    overseer.offerStateUpdate(Utils.toJSON(m));
+    if (distributedClusterStateUpdater.isDistributedStateUpdate()) {
+      distributedClusterStateUpdater.doSingleStateUpdate(DistributedClusterStateUpdater.MutatingCommand.SliceRemoveReplica, m,
+          cloudManager, zkStateReader);
+    } else {
+      overseer.offerStateUpdate(Utils.toJSON(m));
+    }
   }
 
   void checkRequired(ZkNodeProps message, String... props) {
@@ -554,7 +578,13 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler,
       reloadCollection(null, new ZkNodeProps(NAME, collectionName), results);
     }
 
-    overseer.offerStateUpdate(Utils.toJSON(message));
+    if (distributedClusterStateUpdater.isDistributedStateUpdate()) {
+      // Apply the state update right away. The wait will still be useful for the change to be visible in the local cluster state (watchers have fired).
+      distributedClusterStateUpdater.doSingleStateUpdate(DistributedClusterStateUpdater.MutatingCommand.CollectionModifyCollection, message,
+          cloudManager, zkStateReader);
+    } else {
+      overseer.offerStateUpdate(Utils.toJSON(message));
+    }
 
     try {
       zkStateReader.waitForState(collectionName, 30, TimeUnit.SECONDS, c -> {
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/OverseerStatusCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/OverseerStatusCmd.java
index 7bc51c9..f0a04d2 100644
--- a/solr/core/src/java/org/apache/solr/cloud/api/collections/OverseerStatusCmd.java
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/OverseerStatusCmd.java
@@ -60,6 +60,16 @@ public class OverseerStatusCmd implements OverseerCollectionMessageHandler.Cmd {
     zkStateReader.getZkClient().getData("/overseer/collection-queue-work",null, stat, true);
     results.add("overseer_collection_queue_size", stat.getNumChildren());
 
+    // Overseer reported stats below are tracked in the Overseer cluster state updater when it performs certain operations.
+    // Sharing the ocmh.stats variable between the cluster state updater and the Collection API (this command) is by the way
+    // about the only thing that ties the cluster state updater to the collection api message handler and that takes
+    // advantage of the fact that both run on the same node (the Overseer node). (recently added PerReplicaStates also
+    // take advantage of this through method Overseer.submit()).
+    // When distributed updates are enabled, cluster state updates are not done by the Overseer (it doesn't even see them)
+    // and therefore can't report them. The corresponding data in OVERSEERSTATUS (all data built below) is no longer returned.
+    // This means top level keys "overseer_operations", "collection_operations", "overseer_queue", "overseer_internal_queue"
+    // and "collection_queue" are either empty or do not contain all expected information when cluster state updates are distributed.
+
     @SuppressWarnings({"rawtypes"})
     NamedList overseerStats = new NamedList();
     @SuppressWarnings({"rawtypes"})
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/ReindexCollectionCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/ReindexCollectionCmd.java
index c45c772..30cfa03 100644
--- a/solr/core/src/java/org/apache/solr/cloud/api/collections/ReindexCollectionCmd.java
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/ReindexCollectionCmd.java
@@ -40,6 +40,7 @@ import org.apache.solr.client.solrj.impl.HttpSolrClient;
 import org.apache.solr.client.solrj.request.CollectionAdminRequest;
 import org.apache.solr.client.solrj.request.QueryRequest;
 import org.apache.solr.client.solrj.response.QueryResponse;
+import org.apache.solr.cloud.DistributedClusterStateUpdater;
 import org.apache.solr.cloud.Overseer;
 import org.apache.solr.common.SolrException;
 import org.apache.solr.common.cloud.ClusterState;
@@ -370,7 +371,12 @@ public class ReindexCollectionCmd implements OverseerCollectionMessageHandler.Cm
           Overseer.QUEUE_OPERATION, CollectionParams.CollectionAction.MODIFYCOLLECTION.toLower(),
           ZkStateReader.COLLECTION_PROP, collection,
           ZkStateReader.READ_ONLY, "true");
-      ocmh.overseer.offerStateUpdate(Utils.toJSON(cmd));
+      if (ocmh.getDistributedClusterStateUpdater().isDistributedStateUpdate()) {
+        ocmh.getDistributedClusterStateUpdater().doSingleStateUpdate(DistributedClusterStateUpdater.MutatingCommand.CollectionModifyCollection, cmd,
+            ocmh.cloudManager, ocmh.zkStateReader);
+      } else {
+        ocmh.overseer.offerStateUpdate(Utils.toJSON(cmd));
+      }
 
       TestInjection.injectReindexLatch();
 
@@ -477,14 +483,24 @@ public class ReindexCollectionCmd implements OverseerCollectionMessageHandler.Cm
             Overseer.QUEUE_OPERATION, CollectionParams.CollectionAction.MODIFYCOLLECTION.toLower(),
             ZkStateReader.COLLECTION_PROP, collection,
             ZkStateReader.READ_ONLY, null);
-        ocmh.overseer.offerStateUpdate(Utils.toJSON(props));
+        if (ocmh.getDistributedClusterStateUpdater().isDistributedStateUpdate()) {
+          ocmh.getDistributedClusterStateUpdater().doSingleStateUpdate(DistributedClusterStateUpdater.MutatingCommand.CollectionModifyCollection, props,
+              ocmh.cloudManager, ocmh.zkStateReader);
+        } else {
+          ocmh.overseer.offerStateUpdate(Utils.toJSON(props));
+        }
       }
       // 9. set FINISHED state on the target and clear the state on the source
       ZkNodeProps props = new ZkNodeProps(
           Overseer.QUEUE_OPERATION, CollectionParams.CollectionAction.MODIFYCOLLECTION.toLower(),
           ZkStateReader.COLLECTION_PROP, targetCollection,
           REINDEXING_STATE, State.FINISHED.toLower());
-      ocmh.overseer.offerStateUpdate(Utils.toJSON(props));
+      if (ocmh.getDistributedClusterStateUpdater().isDistributedStateUpdate()) {
+        ocmh.getDistributedClusterStateUpdater().doSingleStateUpdate(DistributedClusterStateUpdater.MutatingCommand.CollectionModifyCollection, props,
+            ocmh.cloudManager, ocmh.zkStateReader);
+      } else {
+        ocmh.overseer.offerStateUpdate(Utils.toJSON(props));
+      }
 
       reindexingState.put(STATE, State.FINISHED.toLower());
       reindexingState.put(PHASE, "done");
@@ -804,7 +820,12 @@ public class ReindexCollectionCmd implements OverseerCollectionMessageHandler.Cm
         Overseer.QUEUE_OPERATION, CollectionParams.CollectionAction.MODIFYCOLLECTION.toLower(),
         ZkStateReader.COLLECTION_PROP, collection,
         ZkStateReader.READ_ONLY, null);
-    ocmh.overseer.offerStateUpdate(Utils.toJSON(props));
+    if (ocmh.getDistributedClusterStateUpdater().isDistributedStateUpdate()) {
+      ocmh.getDistributedClusterStateUpdater().doSingleStateUpdate(DistributedClusterStateUpdater.MutatingCommand.CollectionModifyCollection, props,
+          ocmh.cloudManager, ocmh.zkStateReader);
+    } else {
+      ocmh.overseer.offerStateUpdate(Utils.toJSON(props));
+    }
     removeReindexingState(collection);
   }
 }
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/RestoreCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/RestoreCmd.java
index 48a584b..774e054 100644
--- a/solr/core/src/java/org/apache/solr/cloud/api/collections/RestoreCmd.java
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/RestoreCmd.java
@@ -17,7 +17,26 @@
 
 package org.apache.solr.cloud.api.collections;
 
+import java.io.Closeable;
+import java.io.IOException;
+import java.lang.invoke.MethodHandles;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 
+import org.apache.solr.cloud.DistributedClusterStateUpdater;
 import org.apache.solr.cloud.Overseer;
 import org.apache.solr.cloud.api.collections.OverseerCollectionMessageHandler.ShardRequestTracker;
 import org.apache.solr.cloud.overseer.OverseerAction;
@@ -48,25 +67,6 @@ import org.apache.zookeeper.KeeperException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.Closeable;
-import java.io.IOException;
-import java.lang.invoke.MethodHandles;
-import java.net.URI;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Objects;
-import java.util.Optional;
-import java.util.Set;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-
 import static org.apache.solr.common.cloud.ZkStateReader.*;
 import static org.apache.solr.common.params.CollectionParams.CollectionAction.CREATE;
 import static org.apache.solr.common.params.CollectionParams.CollectionAction.CREATESHARD;
@@ -343,7 +343,12 @@ public class RestoreCmd implements OverseerCollectionMessageHandler.Cmd {
         propMap.put(shard.getName(), Slice.State.CONSTRUCTION.toString());
       }
       propMap.put(ZkStateReader.COLLECTION_PROP, restoreCollection.getName());
-      ocmh.overseer.offerStateUpdate(Utils.toJSON(new ZkNodeProps(propMap)));
+      if (ocmh.getDistributedClusterStateUpdater().isDistributedStateUpdate()) {
+        ocmh.getDistributedClusterStateUpdater().doSingleStateUpdate(DistributedClusterStateUpdater.MutatingCommand.SliceUpdateShardState, new ZkNodeProps(propMap),
+            ocmh.cloudManager, ocmh.zkStateReader);
+      } else {
+        ocmh.overseer.offerStateUpdate(Utils.toJSON(new ZkNodeProps(propMap)));
+      }
     }
 
     private List<ReplicaPosition> getReplicaPositions(DocCollection restoreCollection, List<String> nodeList, ClusterState clusterState, List<String> sliceNames) throws IOException, InterruptedException {
@@ -461,7 +466,12 @@ public class RestoreCmd implements OverseerCollectionMessageHandler.Cmd {
       for (Slice shard : restoreCollection.getSlices()) {
         propMap.put(shard.getName(), Slice.State.ACTIVE.toString());
       }
-      ocmh.overseer.offerStateUpdate((Utils.toJSON(new ZkNodeProps(propMap))));
+      if (ocmh.getDistributedClusterStateUpdater().isDistributedStateUpdate()) {
+        ocmh.getDistributedClusterStateUpdater().doSingleStateUpdate(DistributedClusterStateUpdater.MutatingCommand.SliceUpdateShardState, new ZkNodeProps(propMap),
+            ocmh.cloudManager, ocmh.zkStateReader);
+      } else {
+        ocmh.overseer.offerStateUpdate(Utils.toJSON(new ZkNodeProps(propMap)));
+      }
     }
 
     private void addReplicasToShards(@SuppressWarnings({"rawtypes"}) NamedList results,
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/SplitShardCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/SplitShardCmd.java
index ff9df3b..2c0ad69 100644
--- a/solr/core/src/java/org/apache/solr/cloud/api/collections/SplitShardCmd.java
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/SplitShardCmd.java
@@ -23,6 +23,7 @@ import org.apache.solr.client.solrj.cloud.NodeStateProvider;
 import org.apache.solr.client.solrj.cloud.SolrCloudManager;
 import org.apache.solr.client.solrj.cloud.VersionedData;
 import org.apache.solr.client.solrj.request.CoreAdminRequest;
+import org.apache.solr.cloud.DistributedClusterStateUpdater;
 import org.apache.solr.cloud.Overseer;
 import org.apache.solr.cloud.api.collections.OverseerCollectionMessageHandler.ShardRequestTracker;
 import org.apache.solr.cloud.overseer.OverseerAction;
@@ -245,7 +246,7 @@ public class SplitShardCmd implements OverseerCollectionMessageHandler.Cmd {
             // delete the shards
             log.info("Sub-shard: {} already exists therefore requesting its deletion", subSlice);
             Map<String, Object> propMap = new HashMap<>();
-            propMap.put(Overseer.QUEUE_OPERATION, "deleteshard");
+            propMap.put(Overseer.QUEUE_OPERATION, DELETESHARD.toLower());
             propMap.put(COLLECTION_PROP, collectionName);
             propMap.put(SHARD_ID_PROP, subSlice);
             ZkNodeProps m = new ZkNodeProps(propMap);
@@ -288,7 +289,12 @@ public class SplitShardCmd implements OverseerCollectionMessageHandler.Cmd {
         propMap.put("shard_parent_node", nodeName);
         propMap.put("shard_parent_zk_session", leaderZnodeStat.getEphemeralOwner());
 
-        ocmh.overseer.offerStateUpdate(Utils.toJSON(new ZkNodeProps(propMap)));
+        if (ocmh.getDistributedClusterStateUpdater().isDistributedStateUpdate()) {
+          ocmh.getDistributedClusterStateUpdater().doSingleStateUpdate(DistributedClusterStateUpdater.MutatingCommand.CollectionCreateShard, new ZkNodeProps(propMap),
+              ocmh.cloudManager, ocmh.zkStateReader);
+        } else {
+          ocmh.overseer.offerStateUpdate(Utils.toJSON(new ZkNodeProps(propMap)));
+        }
 
         // wait until we are able to see the new shard in cluster state and refresh the local view of the cluster state
         clusterState = ocmh.waitForNewShard(collectionName, subSlice);
@@ -442,6 +448,13 @@ public class SplitShardCmd implements OverseerCollectionMessageHandler.Cmd {
       t.stop();
 
       t = timings.sub("createReplicaPlaceholders");
+      final DistributedClusterStateUpdater.StateChangeRecorder scr;
+      boolean hasRecordedDistributedUpdate = false;
+      if (ocmh.getDistributedClusterStateUpdater().isDistributedStateUpdate()) {
+        scr = ocmh.getDistributedClusterStateUpdater().createStateChangeRecorder(collectionName, false);
+      } else {
+        scr = null;
+      }
       for (ReplicaPosition replicaPosition : replicaPositions) {
         String sliceName = replicaPosition.shard;
         String subShardNodeName = replicaPosition.node;
@@ -462,7 +475,12 @@ public class SplitShardCmd implements OverseerCollectionMessageHandler.Cmd {
             ZkStateReader.STATE_PROP, Replica.State.DOWN.toString(),
             ZkStateReader.NODE_NAME_PROP, subShardNodeName,
             CommonAdminParams.WAIT_FOR_FINAL_STATE, Boolean.toString(waitForFinalState));
-        ocmh.overseer.offerStateUpdate(Utils.toJSON(props));
+        if (ocmh.getDistributedClusterStateUpdater().isDistributedStateUpdate()) {
+          hasRecordedDistributedUpdate = true;
+          scr.record(DistributedClusterStateUpdater.MutatingCommand.SliceAddReplica, props);
+        } else {
+          ocmh.overseer.offerStateUpdate(Utils.toJSON(props));
+        }
 
         HashMap<String, Object> propMap = new HashMap<>();
         propMap.put(Overseer.QUEUE_OPERATION, ADDREPLICA.toLower());
@@ -488,6 +506,12 @@ public class SplitShardCmd implements OverseerCollectionMessageHandler.Cmd {
 
         replicas.add(propMap);
       }
+      if (hasRecordedDistributedUpdate && ocmh.getDistributedClusterStateUpdater().isDistributedStateUpdate()) {
+        // Actually add the replicas to the collection state. Note that when Overseer takes care of the state,
+        // there is no wait here for the state update to be visible, but with distributed state update done synchronously
+        // we wait (we could in theory create a thread and have it do the work if we REALLY needed, but we likely don't).
+        scr.executeStateUpdates(ocmh.cloudManager, ocmh.zkStateReader);
+      }
       t.stop();
       assert TestInjection.injectSplitFailureBeforeReplicaCreation();
 
@@ -504,12 +528,17 @@ public class SplitShardCmd implements OverseerCollectionMessageHandler.Cmd {
         }
         propMap.put(ZkStateReader.COLLECTION_PROP, collectionName);
         ZkNodeProps m = new ZkNodeProps(propMap);
-        ocmh.overseer.offerStateUpdate(Utils.toJSON(m));
+        if (ocmh.getDistributedClusterStateUpdater().isDistributedStateUpdate()) {
+          ocmh.getDistributedClusterStateUpdater().doSingleStateUpdate(DistributedClusterStateUpdater.MutatingCommand.SliceUpdateShardState, m,
+              ocmh.cloudManager, ocmh.zkStateReader);
+        } else {
+          ocmh.overseer.offerStateUpdate(Utils.toJSON(m));
+        }
 
         if (leaderZnodeStat == null)  {
           // the leader is not live anymore, fail the split!
           throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "The shard leader node: " + parentShardLeader.getNodeName() + " is not live anymore!");
-        } else if (ephemeralOwner != leaderZnodeStat.getEphemeralOwner()) {
+        } else {
           // there's a new leader, fail the split!
           throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
               "The zk session id for the shard leader node: " + parentShardLeader.getNodeName() + " has changed from "
@@ -538,7 +567,12 @@ public class SplitShardCmd implements OverseerCollectionMessageHandler.Cmd {
         }
         propMap.put(ZkStateReader.COLLECTION_PROP, collectionName);
         ZkNodeProps m = new ZkNodeProps(propMap);
-        ocmh.overseer.offerStateUpdate(Utils.toJSON(m));
+        if (ocmh.getDistributedClusterStateUpdater().isDistributedStateUpdate()) {
+          ocmh.getDistributedClusterStateUpdater().doSingleStateUpdate(DistributedClusterStateUpdater.MutatingCommand.SliceUpdateShardState, m,
+              ocmh.cloudManager, ocmh.zkStateReader);
+        } else {
+          ocmh.overseer.offerStateUpdate(Utils.toJSON(m));
+        }
       } else {
         log.info("Requesting shard state be set to 'recovery'");
         Map<String, Object> propMap = new HashMap<>();
@@ -548,7 +582,12 @@ public class SplitShardCmd implements OverseerCollectionMessageHandler.Cmd {
         }
         propMap.put(ZkStateReader.COLLECTION_PROP, collectionName);
         ZkNodeProps m = new ZkNodeProps(propMap);
-        ocmh.overseer.offerStateUpdate(Utils.toJSON(m));
+        if (ocmh.getDistributedClusterStateUpdater().isDistributedStateUpdate()) {
+          ocmh.getDistributedClusterStateUpdater().doSingleStateUpdate(DistributedClusterStateUpdater.MutatingCommand.SliceUpdateShardState, m,
+              ocmh.cloudManager, ocmh.zkStateReader);
+        } else {
+          ocmh.overseer.offerStateUpdate(Utils.toJSON(m));
+        }
       }
 
       t = timings.sub("createCoresForReplicas");
@@ -709,7 +748,12 @@ public class SplitShardCmd implements OverseerCollectionMessageHandler.Cmd {
     if (sendUpdateState) {
       try {
         ZkNodeProps m = new ZkNodeProps(propMap);
-        ocmh.overseer.offerStateUpdate(Utils.toJSON(m));
+        if (ocmh.getDistributedClusterStateUpdater().isDistributedStateUpdate()) {
+          ocmh.getDistributedClusterStateUpdater().doSingleStateUpdate(DistributedClusterStateUpdater.MutatingCommand.SliceUpdateShardState, m,
+              ocmh.cloudManager, ocmh.zkStateReader);
+        } else {
+          ocmh.overseer.offerStateUpdate(Utils.toJSON(m));
+        }
       } catch (Exception e) {
         // don't give up yet - just log the error, we may still be able to clean up
         log.warn("Cleanup failed after failed split of {}/{}: (slice state changes)", collectionName, parentShard, e);
@@ -724,7 +768,7 @@ public class SplitShardCmd implements OverseerCollectionMessageHandler.Cmd {
       }
       log.debug("- sub-shard: {} exists therefore requesting its deletion", subSlice);
       HashMap<String, Object> props = new HashMap<>();
-      props.put(Overseer.QUEUE_OPERATION, "deleteshard");
+      props.put(Overseer.QUEUE_OPERATION, DELETESHARD.toLower());
       props.put(COLLECTION_PROP, collectionName);
       props.put(SHARD_ID_PROP, subSlice);
       ZkNodeProps m = new ZkNodeProps(props);
diff --git a/solr/core/src/java/org/apache/solr/cloud/overseer/CollectionMutator.java b/solr/core/src/java/org/apache/solr/cloud/overseer/CollectionMutator.java
index 073bdaf..94c89a0 100644
--- a/solr/core/src/java/org/apache/solr/cloud/overseer/CollectionMutator.java
+++ b/solr/core/src/java/org/apache/solr/cloud/overseer/CollectionMutator.java
@@ -26,14 +26,12 @@ import org.apache.solr.client.solrj.cloud.DistribStateManager;
 import org.apache.solr.client.solrj.cloud.SolrCloudManager;
 import org.apache.solr.client.solrj.request.CollectionAdminRequest;
 import org.apache.solr.common.cloud.*;
-import org.apache.solr.common.util.Utils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import static org.apache.solr.common.cloud.ZkStateReader.COLLECTION_PROP;
 import static org.apache.solr.common.cloud.ZkStateReader.NRT_REPLICAS;
 import static org.apache.solr.common.cloud.ZkStateReader.REPLICATION_FACTOR;
-import static org.apache.solr.common.params.CommonParams.NAME;
 
 public class CollectionMutator {
   private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
@@ -155,24 +153,9 @@ public class CollectionMutator {
   }
 
   public static DocCollection updateSlice(String collectionName, DocCollection collection, Slice slice) {
-    DocCollection newCollection = null;
-    Map<String, Slice> slices;
-
-    if (collection == null) {
-      //  when updateSlice is called on a collection that doesn't exist, it's currently when a core is publishing itself
-      // without explicitly creating a collection.  In this current case, we assume custom sharding with an "implicit" router.
-      slices = new LinkedHashMap<>(1);
-      slices.put(slice.getName(), slice);
-      Map<String, Object> props = new HashMap<>(1);
-      props.put(DocCollection.DOC_ROUTER, Utils.makeMap(NAME, ImplicitDocRouter.NAME));
-      newCollection = new DocCollection(collectionName, slices, props, new ImplicitDocRouter());
-    } else {
-      slices = new LinkedHashMap<>(collection.getSlicesMap()); // make a shallow copy
-      slices.put(slice.getName(), slice);
-      newCollection = collection.copyWithSlices(slices);
-    }
-
-    return newCollection;
+    Map<String, Slice> slices = new LinkedHashMap<>(collection.getSlicesMap()); // make a shallow copy
+    slices.put(slice.getName(), slice);
+    return collection.copyWithSlices(slices);
   }
 
   static boolean checkCollectionKeyExistence(ZkNodeProps message) {
diff --git a/solr/core/src/java/org/apache/solr/cloud/overseer/NodeMutator.java b/solr/core/src/java/org/apache/solr/cloud/overseer/NodeMutator.java
index 48e0a16..adf146d 100644
--- a/solr/core/src/java/org/apache/solr/cloud/overseer/NodeMutator.java
+++ b/solr/core/src/java/org/apache/solr/cloud/overseer/NodeMutator.java
@@ -18,6 +18,7 @@ package org.apache.solr.cloud.overseer;
 
 import java.lang.invoke.MethodHandles;
 import java.util.ArrayList;
+import java.util.Optional;
 import java.util.Collection;
 import java.util.LinkedHashMap;
 import java.util.List;
@@ -39,57 +40,74 @@ public class NodeMutator {
   private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
 
   public List<ZkWriteCommand> downNode(ClusterState clusterState, ZkNodeProps message) {
-    List<ZkWriteCommand> zkWriteCommands = new ArrayList<>();
     String nodeName = message.getStr(ZkStateReader.NODE_NAME_PROP);
 
     log.debug("DownNode state invoked for node: {}", nodeName);
 
+    List<ZkWriteCommand> zkWriteCommands = new ArrayList<>();
+
     Map<String, DocCollection> collections = clusterState.getCollectionsMap();
     for (Map.Entry<String, DocCollection> entry : collections.entrySet()) {
-       List<String> downedReplicas = new ArrayList<>();
-
-      String collection = entry.getKey();
+      String collectionName = entry.getKey();
       DocCollection docCollection = entry.getValue();
 
-      Map<String,Slice> slicesCopy = new LinkedHashMap<>(docCollection.getSlicesMap());
-
-      boolean needToUpdateCollection = false;
-      for (Entry<String, Slice> sliceEntry : slicesCopy.entrySet()) {
-        Slice slice = sliceEntry.getValue();
-        Map<String, Replica> newReplicas = slice.getReplicasCopy();
-
-        Collection<Replica> replicas = slice.getReplicas();
-        for (Replica replica : replicas) {
-          String rNodeName = replica.getNodeName();
-          if (rNodeName == null) {
-            throw new RuntimeException("Replica without node name! " + replica);
-          }
-          if (rNodeName.equals(nodeName)) {
-            log.debug("Update replica state for {} to {}", replica, Replica.State.DOWN);
-            Map<String, Object> props = replica.shallowCopy();
-            Replica newReplica = new Replica(replica.getName(), replica.node, replica.collection, slice.getName(), replica.core,
-                Replica.State.DOWN, replica.type, props);
-            newReplicas.put(replica.getName(), newReplica);
-            needToUpdateCollection = true;
-            downedReplicas.add(replica.getName());
-          }
-        }
+      Optional<ZkWriteCommand> zkWriteCommand = computeCollectionUpdate(nodeName, collectionName, docCollection);
 
-        Slice newSlice = new Slice(slice.getName(), newReplicas, slice.shallowCopy(),collection);
-        slicesCopy.put(slice.getName(), newSlice);
+      if (zkWriteCommand.isPresent()) {
+        zkWriteCommands.add(zkWriteCommand.get());
       }
+    }
+
+    return zkWriteCommands;
+  }
+
+  /**
+   * Returns the write command needed to update the replicas of a given collection given the identity of a node being down.
+   * @return An optional with the write command or an empty one if the collection does not need any state modification.
+   *    The returned write command might be for per replica state updates or for an update to state.json, depending on the
+   *    configuration of the collection.
+   */
+  public static Optional<ZkWriteCommand> computeCollectionUpdate(String nodeName, String collectionName, DocCollection docCollection) {
+    boolean needToUpdateCollection = false;
+    List<String> downedReplicas = new ArrayList<>();
+    Map<String,Slice> slicesCopy = new LinkedHashMap<>(docCollection.getSlicesMap());
 
-      if (needToUpdateCollection) {
-        if (docCollection.isPerReplicaState()) {
-          zkWriteCommands.add(new ZkWriteCommand(collection, docCollection.copyWithSlices(slicesCopy),
-              PerReplicaStatesOps.downReplicas(downedReplicas, docCollection.getPerReplicaStates()), false));
-        } else {
-          zkWriteCommands.add(new ZkWriteCommand(collection, docCollection.copyWithSlices(slicesCopy)));
+    for (Entry<String, Slice> sliceEntry : slicesCopy.entrySet()) {
+      Slice slice = sliceEntry.getValue();
+      Map<String, Replica> newReplicas = slice.getReplicasCopy();
+
+      Collection<Replica> replicas = slice.getReplicas();
+      for (Replica replica : replicas) {
+        String rNodeName = replica.getNodeName();
+        if (rNodeName == null) {
+          throw new RuntimeException("Replica without node name! " + replica);
+        }
+        if (rNodeName.equals(nodeName)) {
+          log.debug("Update replica state for {} to {}", replica, Replica.State.DOWN);
+          Map<String, Object> props = replica.shallowCopy();
+          Replica newReplica = new Replica(replica.getName(), replica.node, replica.collection, slice.getName(), replica.core,
+              Replica.State.DOWN, replica.type, props);
+          newReplicas.put(replica.getName(), newReplica);
+          needToUpdateCollection = true;
+          downedReplicas.add(replica.getName());
         }
       }
+
+      Slice newSlice = new Slice(slice.getName(), newReplicas, slice.shallowCopy(),collectionName);
+      slicesCopy.put(slice.getName(), newSlice);
     }
 
-    return zkWriteCommands;
+    if (needToUpdateCollection) {
+      if (docCollection.isPerReplicaState()) {
+        return Optional.of(new ZkWriteCommand(collectionName, docCollection.copyWithSlices(slicesCopy),
+            PerReplicaStatesOps.downReplicas(downedReplicas, docCollection.getPerReplicaStates()), false));
+      } else {
+        return Optional.of(new ZkWriteCommand(collectionName, docCollection.copyWithSlices(slicesCopy)));
+      }
+    } else {
+      // No update needed for this collection
+      return Optional.empty();
+    }
   }
 }
 
diff --git a/solr/core/src/java/org/apache/solr/cloud/overseer/SliceMutator.java b/solr/core/src/java/org/apache/solr/cloud/overseer/SliceMutator.java
index 2594ee4..97285cc 100644
--- a/solr/core/src/java/org/apache/solr/cloud/overseer/SliceMutator.java
+++ b/solr/core/src/java/org/apache/solr/cloud/overseer/SliceMutator.java
@@ -229,7 +229,7 @@ public class SliceMutator {
     DocCollection collection = clusterState.getCollection(collectionName);
     Slice slice = collection.getSlice(shard);
     if (slice == null) {
-      throw new RuntimeException("Overseer.addRoutingRule unknown collection: " + collectionName + " slice:" + shard);
+      throw new RuntimeException("Overseer.addRoutingRule collection: " + collectionName + ", unknown slice: " + shard);
     }
 
     Map<String, RoutingRule> routingRules = slice.getRoutingRules();
diff --git a/solr/core/src/java/org/apache/solr/cloud/overseer/ZkStateWriter.java b/solr/core/src/java/org/apache/solr/cloud/overseer/ZkStateWriter.java
index 978b209..fd2e020 100644
--- a/solr/core/src/java/org/apache/solr/cloud/overseer/ZkStateWriter.java
+++ b/solr/core/src/java/org/apache/solr/cloud/overseer/ZkStateWriter.java
@@ -28,6 +28,7 @@ import org.apache.solr.cloud.Stats;
 import org.apache.solr.common.cloud.ClusterState;
 import org.apache.solr.common.cloud.DocCollection;
 import org.apache.solr.common.cloud.PerReplicaStates;
+import org.apache.solr.common.cloud.PerReplicaStatesOps;
 import org.apache.solr.common.cloud.ZkStateReader;
 import org.apache.solr.common.util.Utils;
 import org.apache.zookeeper.CreateMode;
@@ -59,7 +60,7 @@ public class ZkStateWriter {
   /**
    * Represents a no-op {@link ZkWriteCommand} which will result in no modification to cluster state
    */
-  public static ZkWriteCommand NO_OP = ZkWriteCommand.noop();
+  public static ZkWriteCommand NO_OP = ZkWriteCommand.NO_OP;
 
   protected final ZkStateReader reader;
   protected final Stats stats;
@@ -217,14 +218,17 @@ public class ZkStateWriter {
           ZkWriteCommand cmd = entry.getValue();
           DocCollection c = cmd.collection;
 
-          if(cmd.ops != null && cmd.ops.isPreOp()) {
+          // Update the Per Replica State znodes if needed
+          if (cmd.ops != null) {
             cmd.ops.persist(path, reader.getZkClient());
             clusterState = clusterState.copyWith(name,
                   cmd.collection.copyWith(PerReplicaStates.fetch(cmd.collection.getZNode(), reader.getZkClient(), null)));
           }
-          if (!cmd.persistCollState) continue;
+
+          // Update the state.json file if needed
+          if (!cmd.persistJsonState) continue;
           if (c == null) {
-            // let's clean up the state.json of this collection only, the rest should be clean by delete collection cmd
+            // let's clean up the state.json of this collection only, the rest should be cleaned by delete collection cmd
             log.debug("going to delete state.json {}", path);
             reader.getZkClient().clean(path);
           } else {
@@ -243,13 +247,18 @@ public class ZkStateWriter {
               clusterState = clusterState.copyWith(name, newCollection);
             }
           }
-            if(cmd.ops != null && !cmd.ops.isPreOp()) {
-              cmd.ops.persist(path, reader.getZkClient());
-              DocCollection currentCollState = clusterState.getCollection(cmd.name);
-              if ( currentCollState != null) {
-                clusterState = clusterState.copyWith(name,
-                        currentCollState.copyWith(PerReplicaStates.fetch(currentCollState.getZNode(), reader.getZkClient(), null)));
-              }
+
+          // When dealing with a per replica collection that did not do any update to the per replica states znodes but did
+          // update state.json, we add then remove a dummy node to change the cversion of the parent znode.
+          // This is not needed by Solr, there's no code watching the children and not watching the state.json node itself.
+          // It would be useful for external code watching the collection's Zookeeper state.json node children but not the node itself.
+          if (cmd.ops == null && cmd.isPerReplicaStateCollection) {
+            PerReplicaStatesOps.touchChildren().persist(path, reader.getZkClient());
+            DocCollection currentCollState = clusterState.getCollection(cmd.name);
+            if (currentCollState != null) {
+              clusterState = clusterState.copyWith(name,
+                      currentCollState.copyWith(PerReplicaStates.fetch(currentCollState.getZNode(), reader.getZkClient(), null)));
+            }
           }
         }
 
diff --git a/solr/core/src/java/org/apache/solr/cloud/overseer/ZkWriteCommand.java b/solr/core/src/java/org/apache/solr/cloud/overseer/ZkWriteCommand.java
index 39d953c..57fc3a5 100644
--- a/solr/core/src/java/org/apache/solr/cloud/overseer/ZkWriteCommand.java
+++ b/solr/core/src/java/org/apache/solr/cloud/overseer/ZkWriteCommand.java
@@ -20,51 +20,34 @@ import org.apache.solr.common.cloud.DocCollection;
 import org.apache.solr.common.cloud.PerReplicaStatesOps;
 
 public class ZkWriteCommand {
+  /**
+   * Single NO_OP instance, can be compared with ==
+   */
+  static final ZkWriteCommand NO_OP = new ZkWriteCommand(null, null);
 
   public final String name;
   public final DocCollection collection;
+  public final boolean isPerReplicaStateCollection;
 
-  public final boolean noop;
   // persist the collection state. If this is false, it means the collection state is not modified
-  public final boolean persistCollState;
+  public final boolean persistJsonState;
   public final PerReplicaStatesOps ops;
 
-  public ZkWriteCommand(String name, DocCollection collection, PerReplicaStatesOps replicaOps, boolean persistCollState) {
-    boolean isPerReplicaState = collection.isPerReplicaState();
-    this.name = name;
-    this.collection = collection;
-    this.noop = false;
-    this.ops = isPerReplicaState ? replicaOps : null;
-    this.persistCollState = isPerReplicaState ? persistCollState : true;
-  }
-  public ZkWriteCommand(String name, DocCollection collection) {
+  public ZkWriteCommand(String name, DocCollection collection, PerReplicaStatesOps replicaOps, boolean persistJsonState) {
+    isPerReplicaStateCollection = collection != null && collection.isPerReplicaState();
     this.name = name;
     this.collection = collection;
-    this.noop = false;
-    persistCollState = true;
-    this.ops = collection != null && collection.isPerReplicaState() ?
-        PerReplicaStatesOps.touchChildren():
-        null;
+    this.ops = replicaOps;
+    this.persistJsonState = persistJsonState || !isPerReplicaStateCollection; // Always persist for non "per replica state" collections
   }
 
-  /**
-   * Returns a no-op
-   */
-  protected ZkWriteCommand() {
-    this.noop = true;
-    this.name = null;
-    this.collection = null;
-    this.ops = null;
-    persistCollState = true;
-  }
-
-  public static ZkWriteCommand noop() {
-    return new ZkWriteCommand();
+  public ZkWriteCommand(String name, DocCollection collection) {
+    this(name, collection, null, true);
   }
 
   @Override
   public String toString() {
-    return getClass().getSimpleName() + ": " + (noop ? "no-op" : name + "=" + collection);
+    return getClass().getSimpleName() + ": " + (this == NO_OP ? "no-op" : name + "=" + collection);
   }
 }
 
diff --git a/solr/core/src/java/org/apache/solr/core/CloudConfig.java b/solr/core/src/java/org/apache/solr/core/CloudConfig.java
index c8efbbb..440f128 100644
--- a/solr/core/src/java/org/apache/solr/core/CloudConfig.java
+++ b/solr/core/src/java/org/apache/solr/core/CloudConfig.java
@@ -48,10 +48,12 @@ public class CloudConfig {
 
   private final String pkiHandlerPublicKeyPath;
 
+  private final boolean useDistributedClusterStateUpdates;
+
   CloudConfig(String zkHost, int zkClientTimeout, int hostPort, String hostName, String hostContext, boolean useGenericCoreNames,
               int leaderVoteWait, int leaderConflictResolveWait, String zkCredentialsProviderClass, String zkACLProviderClass,
               int createCollectionWaitTimeTillActive, boolean createCollectionCheckLeaderActive, String pkiHandlerPrivateKeyPath,
-              String pkiHandlerPublicKeyPath) {
+              String pkiHandlerPublicKeyPath, boolean useDistributedClusterStateUpdates) {
     this.zkHost = zkHost;
     this.zkClientTimeout = zkClientTimeout;
     this.hostPort = hostPort;
@@ -66,6 +68,7 @@ public class CloudConfig {
     this.createCollectionCheckLeaderActive = createCollectionCheckLeaderActive;
     this.pkiHandlerPrivateKeyPath = pkiHandlerPrivateKeyPath;
     this.pkiHandlerPublicKeyPath = pkiHandlerPublicKeyPath;
+    this.useDistributedClusterStateUpdates = useDistributedClusterStateUpdates;
 
     if (this.hostPort == -1)
       throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "'hostPort' must be configured to run SolrCloud");
@@ -129,6 +132,10 @@ public class CloudConfig {
     return pkiHandlerPublicKeyPath;
   }
 
+  public boolean getDistributedClusterStateUpdates() {
+    return useDistributedClusterStateUpdates;
+  }
+
   public static class CloudConfigBuilder {
 
     private static final int DEFAULT_ZK_CLIENT_TIMEOUT = 45000;
@@ -151,6 +158,7 @@ public class CloudConfig {
     private boolean createCollectionCheckLeaderActive = DEFAULT_CREATE_COLLECTION_CHECK_LEADER_ACTIVE;
     private String pkiHandlerPrivateKeyPath;
     private String pkiHandlerPublicKeyPath;
+    private boolean useDistributedClusterStateUpdates = false;
 
     public CloudConfigBuilder(String hostName, int hostPort) {
       this(hostName, hostPort, null);
@@ -217,10 +225,15 @@ public class CloudConfig {
       return this;
     }
 
+    public CloudConfigBuilder setUseDistributedClusterStateUpdates(boolean useDistributedClusterStateUpdates) {
+      this.useDistributedClusterStateUpdates = useDistributedClusterStateUpdates;
+      return this;
+    }
+
     public CloudConfig build() {
       return new CloudConfig(zkHost, zkClientTimeout, hostPort, hostName, hostContext, useGenericCoreNames, leaderVoteWait,
           leaderConflictResolveWait, zkCredentialsProviderClass, zkACLProviderClass, createCollectionWaitTimeTillActive,
-          createCollectionCheckLeaderActive, pkiHandlerPrivateKeyPath, pkiHandlerPublicKeyPath);
+          createCollectionCheckLeaderActive, pkiHandlerPrivateKeyPath, pkiHandlerPublicKeyPath, useDistributedClusterStateUpdates);
     }
   }
 }
diff --git a/solr/core/src/java/org/apache/solr/core/SolrXmlConfig.java b/solr/core/src/java/org/apache/solr/core/SolrXmlConfig.java
index b222c27..c922982 100644
--- a/solr/core/src/java/org/apache/solr/core/SolrXmlConfig.java
+++ b/solr/core/src/java/org/apache/solr/core/SolrXmlConfig.java
@@ -485,6 +485,9 @@ public class SolrXmlConfig {
         case "pkiHandlerPublicKeyPath":
           builder.setPkiHandlerPublicKeyPath(value);
           break;
+        case "distributedClusterStateUpdates":
+          builder.setUseDistributedClusterStateUpdates(Boolean.parseBoolean(value));
+          break;
         default:
           throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Unknown configuration parameter in <solrcloud> section of solr.xml: " + name);
       }
diff --git a/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java b/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java
index 54aaf28..3c51a6c 100644
--- a/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java
+++ b/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java
@@ -28,6 +28,7 @@ import org.apache.solr.client.solrj.request.CollectionAdminRequest;
 import org.apache.solr.client.solrj.request.CoreAdminRequest.RequestSyncShard;
 import org.apache.solr.client.solrj.response.RequestStatusState;
 import org.apache.solr.client.solrj.util.SolrIdentifierValidator;
+import org.apache.solr.cloud.DistributedClusterStateUpdater;
 import org.apache.solr.cloud.OverseerSolrResponse;
 import org.apache.solr.cloud.OverseerSolrResponseSerializer;
 import org.apache.solr.cloud.OverseerTaskQueue;
@@ -125,13 +126,12 @@ public class CollectionsHandler extends RequestHandlerBase implements Permission
 
   protected final CoreContainer coreContainer;
   private final CollectionHandlerApi v2Handler;
+  private final DistributedClusterStateUpdater distributedClusterStateUpdater;
 
   public CollectionsHandler() {
-    super();
     // Unlike most request handlers, CoreContainer initialization
     // should happen in the constructor...
-    this.coreContainer = null;
-    v2Handler = new CollectionHandlerApi(this);
+    this(null);
   }
 
 
@@ -143,6 +143,17 @@ public class CollectionsHandler extends RequestHandlerBase implements Permission
   public CollectionsHandler(final CoreContainer coreContainer) {
     this.coreContainer = coreContainer;
     v2Handler = new CollectionHandlerApi(this);
+    // Get the state change factory to know if need to enqueue to Overseer or process distributed.
+    // Some SolrCloud tests do not need Zookeeper and end up with a null cloudConfig in NodeConfig (because
+    // TestHarness.buildTestNodeConfig() uses the zkHost to decide it's SolrCloud).
+    // These tests do not use Zookeeper and do not do state updates (see subclasses of TestBaseStatsCacheCloud).
+    // Some non SolrCloud tests do not even pass a config at all, so let be cautious here (code is not pretty).
+    // We do want to initialize here and not do it lazy to not deal with synchronization for actual prod code.
+    if (coreContainer == null || coreContainer.getConfig() == null || coreContainer.getConfig().getCloudConfig() == null) {
+      distributedClusterStateUpdater = null;
+    } else {
+      distributedClusterStateUpdater = new DistributedClusterStateUpdater(coreContainer.getConfig().getCloudConfig().getDistributedClusterStateUpdates());
+    }
   }
 
   @Override
@@ -246,7 +257,8 @@ public class CollectionsHandler extends RequestHandlerBase implements Permission
         rsp.setException(exp);
       }
 
-      //TODO yuck; shouldn't create-collection at the overseer do this?  (conditionally perhaps)
+      // Even if Overseer does wait for the collection to be created, it sees a different cluster state than this node,
+      // so this wait is required to make sure the local node Zookeeper watches fired and now see the collection.
       if (action.equals(CollectionAction.CREATE) && asyncId == null) {
         if (rsp.getException() == null) {
           waitForActiveCollection(zkProps.getStr(NAME), cores, overseerResponse);
@@ -254,8 +266,16 @@ public class CollectionsHandler extends RequestHandlerBase implements Permission
       }
 
     } else {
-      // submits and doesn't wait for anything (no response)
-      coreContainer.getZkController().getOverseer().offerStateUpdate(Utils.toJSON(props));
+      if (distributedClusterStateUpdater.isDistributedStateUpdate()) {
+        DistributedClusterStateUpdater.MutatingCommand command = DistributedClusterStateUpdater.MutatingCommand.getCommandFor(operation.action);
+        ZkNodeProps message = new ZkNodeProps(props);
+        // We do the state change synchronously but do not wait for it to be visible in this node's cluster state updated via ZK watches
+        distributedClusterStateUpdater.doSingleStateUpdate(command, message,
+            coreContainer.getZkController().getSolrCloudManager(), coreContainer.getZkController().getZkStateReader());
+      } else {
+        // submits and doesn't wait for anything (no response)
+        coreContainer.getZkController().getOverseer().offerStateUpdate(Utils.toJSON(props));
+      }
     }
 
   }
diff --git a/solr/core/src/java/org/apache/solr/update/processor/DistributedZkUpdateProcessor.java b/solr/core/src/java/org/apache/solr/update/processor/DistributedZkUpdateProcessor.java
index aa7c61e..ece4f53 100644
--- a/solr/core/src/java/org/apache/solr/update/processor/DistributedZkUpdateProcessor.java
+++ b/solr/core/src/java/org/apache/solr/update/processor/DistributedZkUpdateProcessor.java
@@ -33,6 +33,7 @@ import java.util.concurrent.locks.ReentrantLock;
 
 import org.apache.solr.client.solrj.request.UpdateRequest;
 import org.apache.solr.cloud.CloudDescriptor;
+import org.apache.solr.cloud.DistributedClusterStateUpdater;
 import org.apache.solr.cloud.Overseer;
 import org.apache.solr.cloud.ZkController;
 import org.apache.solr.cloud.ZkShardTerms;
@@ -48,6 +49,7 @@ import org.apache.solr.common.cloud.Replica;
 import org.apache.solr.common.cloud.RoutingRule;
 import org.apache.solr.common.cloud.Slice;
 import org.apache.solr.common.cloud.ZkCoreNodeProps;
+import org.apache.solr.common.cloud.ZkNodeProps;
 import org.apache.solr.common.cloud.ZkStateReader;
 import org.apache.solr.common.cloud.ZooKeeperException;
 import org.apache.solr.common.params.CommonParams;
@@ -97,6 +99,7 @@ public class DistributedZkUpdateProcessor extends DistributedUpdateProcessor {
   // this is set to true in the constructor if the next processors in the chain
   // are custom and may modify the SolrInputDocument racing with its serialization for replication
   private final boolean cloneRequiredOnLeader;
+  private final DistributedClusterStateUpdater distributedClusterStateUpdater;
 
   //used for keeping track of replicas that have processed an add/update from the leader
   private RollupRequestReplicationTracker rollupReplicationTracker = null;
@@ -110,6 +113,7 @@ public class DistributedZkUpdateProcessor extends DistributedUpdateProcessor {
     CoreContainer cc = req.getCore().getCoreContainer();
     cloudDesc = req.getCore().getCoreDescriptor().getCloudDescriptor();
     zkController = cc.getZkController();
+    distributedClusterStateUpdater = zkController.getDistributedClusterStateUpdater();
     cmdDistrib = new SolrCmdDistributor(cc.getUpdateShardHandler());
     cloneRequiredOnLeader = isCloneRequiredOnLeader(next);
     collection = cloudDesc.getCollectionName();
@@ -936,7 +940,13 @@ public class DistributedZkUpdateProcessor extends DistributedUpdateProcessor {
                           ZkStateReader.COLLECTION_PROP, collection,
                           ZkStateReader.SHARD_ID_PROP, myShardId,
                           "routeKey", routeKey + "!");
-                      zkController.getOverseer().offerStateUpdate(Utils.toJSON(map));
+                      if (distributedClusterStateUpdater.isDistributedStateUpdate()) {
+                        ZkNodeProps message = new ZkNodeProps(map);
+                        distributedClusterStateUpdater.doSingleStateUpdate(DistributedClusterStateUpdater.MutatingCommand.SliceRemoveRoutingRule, message,
+                            zkController.getOverseer().getSolrCloudManager(), zkController.getOverseer().getZkStateReader());
+                      } else {
+                        zkController.getOverseer().offerStateUpdate(Utils.toJSON(map));
+                      }
                     } catch (KeeperException e) {
                       log.warn("Exception while removing routing rule for route key: {}", routeKey, e);
                     } catch (Exception e) {
diff --git a/solr/core/src/test/org/apache/solr/cloud/CreateCollectionCleanupTest.java b/solr/core/src/test/org/apache/solr/cloud/CreateCollectionCleanupTest.java
index 11e0055..5370713 100644
--- a/solr/core/src/test/org/apache/solr/cloud/CreateCollectionCleanupTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/CreateCollectionCleanupTest.java
@@ -56,6 +56,7 @@ public class CreateCollectionCleanupTest extends SolrCloudTestCase {
       "    <int name=\"distribUpdateConnTimeout\">${distribUpdateConnTimeout:45000}</int>\n" +
       "    <int name=\"distribUpdateSoTimeout\">${distribUpdateSoTimeout:340000}</int>\n" +
       "    <int name=\"createCollectionWaitTimeTillActive\">${createCollectionWaitTimeTillActive:10}</int>\n" +
+      "    <str name=\"distributedClusterStateUpdates\">${solr.distributedClusterStateUpdates:false}</str> \n" +
       "  </solrcloud>\n" +
       "  \n" +
       "</solr>\n";
@@ -66,6 +67,7 @@ public class CreateCollectionCleanupTest extends SolrCloudTestCase {
     configureCluster(1)
         .addConfig("conf1", TEST_PATH().resolve("configsets").resolve("cloud-minimal").resolve("conf"))
         .withSolrXml(CLOUD_SOLR_XML_WITH_10S_CREATE_COLL_WAIT)
+        .useOtherClusterStateUpdateStrategy()
         .configure();
   }
 
diff --git a/solr/core/src/test/org/apache/solr/cloud/DeleteReplicaTest.java b/solr/core/src/test/org/apache/solr/cloud/DeleteReplicaTest.java
index e721517..de1d19a 100644
--- a/solr/core/src/test/org/apache/solr/cloud/DeleteReplicaTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/DeleteReplicaTest.java
@@ -77,6 +77,7 @@ public class DeleteReplicaTest extends SolrCloudTestCase {
     // these tests need to be isolated, so we dont share the minicluster
     configureCluster(4)
         .addConfig("conf", configset("cloud-minimal"))
+        .useOtherClusterStateUpdateStrategy() // Some tests (this one) use "the other" cluster state update strategy to increase coverage
         .configure();
   }
   
@@ -231,7 +232,8 @@ public class DeleteReplicaTest extends SolrCloudTestCase {
                                          ! r.equals(shard.getLeader())));
     
     JettySolrRunner replicaJetty = cluster.getReplicaJetty(replica);
-    ZkStateReaderAccessor accessor = new ZkStateReaderAccessor(replicaJetty.getCoreContainer().getZkController().getZkStateReader());
+    ZkController replicaZkController = replicaJetty.getCoreContainer().getZkController();
+    ZkStateReaderAccessor accessor = new ZkStateReaderAccessor(replicaZkController.getZkStateReader());
 
     final long preDeleteWatcherCount = countUnloadCoreOnDeletedWatchers
       (accessor.getStateWatchers(collectionName));
@@ -243,7 +245,14 @@ public class DeleteReplicaTest extends SolrCloudTestCase {
         ZkStateReader.COLLECTION_PROP, collectionName,
         ZkStateReader.CORE_NODE_NAME_PROP, replica.getName());
 
-    cluster.getOpenOverseer().getStateUpdateQueue().offer(Utils.toJSON(m));
+    if (replicaZkController.getDistributedClusterStateUpdater().isDistributedStateUpdate()) {
+      cluster.getOpenOverseer().getDistributedClusterStateUpdater().doSingleStateUpdate(
+          DistributedClusterStateUpdater.MutatingCommand.SliceRemoveReplica, m,
+          cluster.getOpenOverseer().getSolrCloudManager(),
+          cluster.getOpenOverseer().getZkStateReader());
+    } else {
+      cluster.getOpenOverseer().getStateUpdateQueue().offer(Utils.toJSON(m));
+    }
 
     waitForState("Timeout waiting for replica get deleted", collectionName,
         (liveNodes, collectionState) -> collectionState.getSlice("shard1").getReplicas().size() == 2);
@@ -301,13 +310,22 @@ public class DeleteReplicaTest extends SolrCloudTestCase {
         log.info("Running delete core {}",cd);
 
         try {
+          ZkController replica1ZkController = replica1Jetty.getCoreContainer().getZkController();
           ZkNodeProps m = new ZkNodeProps(
               Overseer.QUEUE_OPERATION, OverseerAction.DELETECORE.toLower(),
               ZkStateReader.CORE_NAME_PROP, replica1.getCoreName(),
               ZkStateReader.NODE_NAME_PROP, replica1.getNodeName(),
               ZkStateReader.COLLECTION_PROP, collectionName,
               ZkStateReader.CORE_NODE_NAME_PROP, replica1.getName());
-          cluster.getOpenOverseer().getStateUpdateQueue().offer(Utils.toJSON(m));
+
+          if (replica1ZkController.getDistributedClusterStateUpdater().isDistributedStateUpdate()) {
+            cluster.getOpenOverseer().getDistributedClusterStateUpdater().doSingleStateUpdate(
+                DistributedClusterStateUpdater.MutatingCommand.SliceRemoveReplica, m,
+                cluster.getOpenOverseer().getSolrCloudManager(),
+                cluster.getOpenOverseer().getZkStateReader());
+          } else {
+            cluster.getOpenOverseer().getStateUpdateQueue().offer(Utils.toJSON(m));
+          }
 
           boolean replicaDeleted = false;
           TimeOut timeOut = new TimeOut(20, TimeUnit.SECONDS, TimeSource.NANO_TIME);
@@ -345,8 +363,14 @@ public class DeleteReplicaTest extends SolrCloudTestCase {
     try {
       replica1Jetty.stop();
       waitForNodeLeave(replica1JettyNodeName);
-      waitForState("Expected replica:"+replica1+" get down", collectionName, (liveNodes, collectionState)
-          -> collectionState.getSlice("shard1").getReplica(replica1.getName()).getState() == DOWN);
+
+      // There is a race condition: the replica might be marked down before we get here, in which case we never get notified
+      // So we check before waiting... Not eliminating but significantly reducing the race window - eliminating would require
+      // deeper changes in the code where the watcher is set.
+      if (getCollectionState(collectionName).getSlice("shard1").getReplica(replica1.getName()).getState() != DOWN) {
+        waitForState("Expected replica:" + replica1 + " get down", collectionName, (liveNodes, collectionState)
+            -> collectionState.getSlice("shard1").getReplica(replica1.getName()).getState() == DOWN);
+      }
       replica1Jetty.start();
       waitingForReplicaGetDeleted.acquire();
     } finally {
diff --git a/solr/core/src/test/org/apache/solr/cloud/DeleteShardTest.java b/solr/core/src/test/org/apache/solr/cloud/DeleteShardTest.java
index 9f3f9a4..5c3aa01 100644
--- a/solr/core/src/test/org/apache/solr/cloud/DeleteShardTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/DeleteShardTest.java
@@ -94,13 +94,22 @@ public class DeleteShardTest extends SolrCloudTestCase {
     CloudSolrClient client = cluster.getSolrClient();
 
     // TODO can this be encapsulated better somewhere?
-    DistributedQueue inQueue =  cluster.getJettySolrRunner(0).getCoreContainer().getZkController().getOverseer().getStateUpdateQueue();
     Map<String, Object> propMap = new HashMap<>();
     propMap.put(Overseer.QUEUE_OPERATION, OverseerAction.UPDATESHARDSTATE.toLower());
     propMap.put(slice, state.toString());
     propMap.put(ZkStateReader.COLLECTION_PROP, collection);
     ZkNodeProps m = new ZkNodeProps(propMap);
-    inQueue.offer(Utils.toJSON(m));
+
+    final Overseer overseer = cluster.getOpenOverseer();
+    if (overseer.getDistributedClusterStateUpdater().isDistributedStateUpdate()) {
+      overseer.getDistributedClusterStateUpdater().doSingleStateUpdate(
+          DistributedClusterStateUpdater.MutatingCommand.SliceUpdateShardState, m,
+          cluster.getOpenOverseer().getSolrCloudManager(),
+          cluster.getOpenOverseer().getZkStateReader());
+    } else {
+      DistributedQueue inQueue = cluster.getJettySolrRunner(0).getCoreContainer().getZkController().getOverseer().getStateUpdateQueue();
+      inQueue.offer(Utils.toJSON(m));
+    }
 
     waitForState("Expected shard " + slice + " to be in state " + state.toString(), collection, (n, c) -> {
       return c.getSlice(slice).getState() == state;
diff --git a/solr/core/src/test/org/apache/solr/cloud/MockSolrSource.java b/solr/core/src/test/org/apache/solr/cloud/MockSolrSource.java
index 7281396..093fe1c 100644
--- a/solr/core/src/test/org/apache/solr/cloud/MockSolrSource.java
+++ b/solr/core/src/test/org/apache/solr/cloud/MockSolrSource.java
@@ -26,8 +26,17 @@ public class MockSolrSource  {
 
   public static ZkController makeSimpleMock(Overseer overseer, ZkStateReader reader, SolrZkClient zkClient) {
     ZkController zkControllerMock = mock(ZkController.class);
-    if (overseer == null) overseer = mock(Overseer.class);
-    
+    final DistributedClusterStateUpdater distributedClusterStateUpdater;
+    if (overseer == null) {
+      // When no overseer is passed, the Overseer queue does nothing. Replicate this in how we handle distributed state
+      // updates by doing nothing as well...
+      distributedClusterStateUpdater = mock(DistributedClusterStateUpdater.class);
+      overseer = mock(Overseer.class);
+      when(overseer.getDistributedClusterStateUpdater()).thenReturn(distributedClusterStateUpdater);
+    } else {
+      // Use the same configuration for state updates as the Overseer.
+      distributedClusterStateUpdater = overseer.getDistributedClusterStateUpdater();
+    }
 
     if (reader != null && zkClient == null) {
       zkClient = reader.getZkClient();
@@ -38,11 +47,11 @@ public class MockSolrSource  {
       when(reader.getZkClient()).thenReturn(zkClient);
     }
      
-    
     when(zkControllerMock.getOverseer()).thenReturn(overseer);
     when(zkControllerMock.getZkStateReader()).thenReturn(reader);
     when(zkControllerMock.getZkClient()).thenReturn(zkClient);
     when(zkControllerMock.getOverseer()).thenReturn(overseer);
+    when(zkControllerMock.getDistributedClusterStateUpdater()).thenReturn(distributedClusterStateUpdater);
     return zkControllerMock;
   }
 }
diff --git a/solr/core/src/test/org/apache/solr/cloud/OverseerCollectionConfigSetProcessorTest.java b/solr/core/src/test/org/apache/solr/cloud/OverseerCollectionConfigSetProcessorTest.java
index 40538c5..132d069 100644
--- a/solr/core/src/test/org/apache/solr/cloud/OverseerCollectionConfigSetProcessorTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/OverseerCollectionConfigSetProcessorTest.java
@@ -107,6 +107,8 @@ public class OverseerCollectionConfigSetProcessorTest extends SolrTestCaseJ4 {
   private static OverseerTaskQueue workQueueMock;
   private static OverseerTaskQueue stateUpdateQueueMock;
   private static Overseer overseerMock;
+  private static DistributedClusterStateUpdater distributedClusterStateUpdater;
+  private static DistributedClusterStateUpdater.StateChangeRecorder stateChangeRecorder;
   private static ZkController zkControllerMock;
   private static SolrCloudManager cloudDataProviderMock;
   private static ClusterStateProvider clusterStateProviderMock;
@@ -152,7 +154,7 @@ public class OverseerCollectionConfigSetProcessorTest extends SolrTestCaseJ4 {
         DistributedMap completedMap,
         DistributedMap failureMap,
         SolrMetricsContext solrMetricsContext) {
-      super(zkStateReader, myId, shardHandlerFactory, adminPath, new Stats(), overseer, new OverseerNodePrioritizer(zkStateReader, overseer.getStateUpdateQueue(), adminPath, shardHandlerFactory), workQueue, runningMap, completedMap, failureMap, solrMetricsContext);
+      super(zkStateReader, myId, shardHandlerFactory, adminPath, new Stats(), overseer, new OverseerNodePrioritizer(zkStateReader, overseer, adminPath, shardHandlerFactory), workQueue, runningMap, completedMap, failureMap, solrMetricsContext);
     }
     
     @Override
@@ -177,6 +179,8 @@ public class OverseerCollectionConfigSetProcessorTest extends SolrTestCaseJ4 {
     clusterStateMock = mock(ClusterState.class);
     solrZkClientMock = mock(SolrZkClient.class);
     overseerMock = mock(Overseer.class);
+    distributedClusterStateUpdater = mock(DistributedClusterStateUpdater.class);
+    stateChangeRecorder = mock(DistributedClusterStateUpdater.StateChangeRecorder.class);
     zkControllerMock = mock(ZkController.class);
     cloudDataProviderMock = mock(SolrCloudManager.class);
     objectCache = new ObjectCache();
@@ -204,6 +208,8 @@ public class OverseerCollectionConfigSetProcessorTest extends SolrTestCaseJ4 {
     clusterStateMock = null;
     solrZkClientMock = null;
     overseerMock = null;
+    distributedClusterStateUpdater = null;
+    stateChangeRecorder = null;
     zkControllerMock = null;
     cloudDataProviderMock = null;
     clusterStateProviderMock = null;
@@ -232,6 +238,8 @@ public class OverseerCollectionConfigSetProcessorTest extends SolrTestCaseJ4 {
     reset(clusterStateMock);
     reset(solrZkClientMock);
     reset(overseerMock);
+    reset(distributedClusterStateUpdater);
+    reset(stateChangeRecorder);
     reset(zkControllerMock);
     reset(cloudDataProviderMock);
     objectCache.clear();
@@ -259,7 +267,7 @@ public class OverseerCollectionConfigSetProcessorTest extends SolrTestCaseJ4 {
   }
 
   @SuppressWarnings("unchecked")
-  protected Set<String> commonMocks(int liveNodesCount) throws Exception {
+  protected Set<String> commonMocks(int liveNodesCount, boolean distributedClusterStateUpdates) throws Exception {
     when(shardHandlerFactoryMock.getShardHandler()).thenReturn(shardHandlerMock);
     when(workQueueMock.peekTopN(anyInt(), any(), anyLong())).thenAnswer(invocation -> {
       Object result;
@@ -379,6 +387,8 @@ public class OverseerCollectionConfigSetProcessorTest extends SolrTestCaseJ4 {
     when(overseerMock.getZkController()).thenReturn(zkControllerMock);
     when(overseerMock.getSolrCloudManager()).thenReturn(cloudDataProviderMock);
     when(overseerMock.getCoreContainer()).thenReturn(coreContainerMock);
+    when(overseerMock.getDistributedClusterStateUpdater()).thenReturn(distributedClusterStateUpdater);
+    when(distributedClusterStateUpdater.createStateChangeRecorder(any(), anyBoolean())).thenReturn(stateChangeRecorder);
     when(coreContainerMock.getUpdateShardHandler()).thenReturn(updateShardHandlerMock);
     when(coreContainerMock.getPlacementPluginFactory()).thenReturn(placementPluginFactoryMock);
     when(updateShardHandlerMock.getDefaultHttpClient()).thenReturn(httpClientMock);
@@ -450,21 +460,45 @@ public class OverseerCollectionConfigSetProcessorTest extends SolrTestCaseJ4 {
     
     when(overseerMock.getStateUpdateQueue(any())).thenReturn(stateUpdateQueueMock);
     when(overseerMock.getStateUpdateQueue()).thenReturn(stateUpdateQueueMock);
-    
-    Mockito.doAnswer(
-        new Answer<Void>() {
-          public Void answer(InvocationOnMock invocation) {
-            try {
-              handleCreateCollMessage(invocation.getArgument(0));
-              stateUpdateQueueMock.offer(invocation.getArgument(0));
-            } catch (KeeperException e) {
-              throw new RuntimeException(e);
-            } catch (InterruptedException e) {
-              throw new RuntimeException(e);
+
+    // Selecting the cluster state update strategy: Overseer when distributedClusterStateUpdates is false, otherwise distributed updates.
+    when(distributedClusterStateUpdater.isDistributedStateUpdate()).thenReturn(distributedClusterStateUpdates);
+
+    if (distributedClusterStateUpdates) {
+      // Mocking for state change via distributed updates. There are two types of updates done in CreateCollectionCmd:
+      // 1. Single line recording and executing a command
+      Mockito.doAnswer(
+          new Answer<Void>() {
+            public Void answer(InvocationOnMock invocation) {
+              handleCreateCollMessageProps(invocation.getArgument(1));
+              return null;
+            }}).when(distributedClusterStateUpdater).doSingleStateUpdate(any(), any(), any(), any());
+
+      // 2. Recording a command to be executed as part of a batch of commands
+      Mockito.doAnswer(
+          new Answer<Void>() {
+            public Void answer(InvocationOnMock invocation) {
+              handleCreateCollMessageProps(invocation.getArgument(1));
+              return null;
+            }}).when(stateChangeRecorder).record(any(), any());
+    } else {
+      // Mocking for state change via the Overseer queue
+      Mockito.doAnswer(
+          new Answer<Void>() {
+            public Void answer(InvocationOnMock invocation) {
+              try {
+                handleCreateCollMessage(invocation.getArgument(0));
+                stateUpdateQueueMock.offer(invocation.getArgument(0));
+              } catch (KeeperException e) {
+                throw new RuntimeException(e);
+              } catch (InterruptedException e) {
+                throw new RuntimeException(e);
+              }
+              return null;
             }
-            return null;
-          }}).when(overseerMock).offerStateUpdate(any());
-    
+          }).when(overseerMock).offerStateUpdate(any());
+    }
+
     when(zkControllerMock.getZkClient()).thenReturn(solrZkClientMock);
     
     when(cloudManagerMock.getDistribStateManager()).thenReturn(distribStateManagerMock);
@@ -520,9 +554,12 @@ public class OverseerCollectionConfigSetProcessorTest extends SolrTestCaseJ4 {
   }
 
   private void handleCreateCollMessage(byte[] bytes) {
+    handleCreateCollMessageProps(ZkNodeProps.load(bytes));
+  }
+
+  private void handleCreateCollMessageProps(ZkNodeProps props) {
     log.info("track created replicas / collections");
     try {
-      ZkNodeProps props = ZkNodeProps.load(bytes);
       if (CollectionParams.CollectionAction.CREATE.isEqual(props.getStr("operation"))) {
         String collName = props.getStr("name");
         if (collName != null) collectionsSet.put(collName, new ClusterState.CollectionRef(
@@ -732,12 +769,11 @@ public class OverseerCollectionConfigSetProcessorTest extends SolrTestCaseJ4 {
     SEND_NULL
   }
   protected void testTemplate(Integer numberOfNodes, Integer numberOfNodesToCreateOn, CreateNodeListOptions createNodeListOption, Integer replicationFactor,
-      Integer numberOfSlices,
-      boolean collectionExceptedToBeCreated) throws Exception {
+      Integer numberOfSlices, boolean collectionExceptedToBeCreated, boolean distributedClusterStateUpdates) throws Exception {
     assertTrue("Wrong usage of testTemplate. numberOfNodesToCreateOn " + numberOfNodesToCreateOn + " is not allowed to be higher than numberOfNodes " + numberOfNodes, numberOfNodes.intValue() >= numberOfNodesToCreateOn.intValue());
     assertTrue("Wrong usage of testTemplage. createNodeListOption has to be " + CreateNodeListOptions.SEND + " when numberOfNodes and numberOfNodesToCreateOn are unequal", ((createNodeListOption == CreateNodeListOptions.SEND) || (numberOfNodes.intValue() == numberOfNodesToCreateOn.intValue())));
     
-    Set<String> liveNodes = commonMocks(numberOfNodes);
+    Set<String> liveNodes = commonMocks(numberOfNodes, distributedClusterStateUpdates);
     List<String> createNodeList = new ArrayList<>();
     int i = 0;
     for (String node : liveNodes) {
@@ -774,140 +810,250 @@ public class OverseerCollectionConfigSetProcessorTest extends SolrTestCaseJ4 {
           createNodeList, dontShuffleCreateNodeSet);
     }
   }
-    @Test
-  public void testNoReplicationEqualNumberOfSlicesPerNode() throws Exception {
+
+  // Tests below are being run twice: once with Overseer based updates and once with distributed updates.
+  // This is done explicitly here because these tests use mocks than can be configured directly.
+  // Tests not using mocks (most other tests) but using the MiniSolrCloudCluster are randomized to sometimes use Overseer
+  // and sometimes distributed state updates (but not both for a given test and a given test seed).
+  // See the SolrCloudTestCase.Builder constructor and the rest of the Builder class.
+
+  @Test
+  public void testNoReplicationEqualNumberOfSlicesPerNodeOverseer() throws Exception {
+    testNoReplicationEqualNumberOfSlicesPerNodeInternal(false);
+  }
+
+  @Test
+  public void testNoReplicationEqualNumberOfSlicesPerNodeDistributedUpdates() throws Exception {
+    testNoReplicationEqualNumberOfSlicesPerNodeInternal(true);
+  }
+
+  private void testNoReplicationEqualNumberOfSlicesPerNodeInternal(boolean distributedClusterStateUpdates) throws Exception {
     Integer numberOfNodes = 4;
     Integer numberOfNodesToCreateOn = 4;
     CreateNodeListOptions createNodeListOptions = CreateNodeListOptions.DONT_SEND;
     Integer replicationFactor = 1;
     Integer numberOfSlices = 8;
     testTemplate(numberOfNodes, numberOfNodesToCreateOn, createNodeListOptions, replicationFactor, numberOfSlices,
-        true);
+        true, distributedClusterStateUpdates);
+  }
+
+  @Test
+  public void testReplicationEqualNumberOfSlicesPerNodeOverseer() throws Exception {
+    testReplicationEqualNumberOfSlicesPerNodeInternal(false);
   }
-  
   @Test
-  public void testReplicationEqualNumberOfSlicesPerNode() throws Exception {
+  public void testReplicationEqualNumberOfSlicesPerNodeDistributedUpdates() throws Exception {
+    testReplicationEqualNumberOfSlicesPerNodeInternal(true);
+  }
+
+  private void testReplicationEqualNumberOfSlicesPerNodeInternal(boolean distributedClusterStateUpdates) throws Exception {
     Integer numberOfNodes = 4;
     Integer numberOfNodesToCreateOn = 4;
     CreateNodeListOptions createNodeListOptions = CreateNodeListOptions.DONT_SEND;
     Integer replicationFactor = 2;
     Integer numberOfSlices = 4;
     testTemplate(numberOfNodes, numberOfNodesToCreateOn, createNodeListOptions, replicationFactor, numberOfSlices,
-        true);
+        true, distributedClusterStateUpdates);
   }
-  
+
+  @Test
+  public void testNoReplicationEqualNumberOfSlicesPerNodeSendCreateNodesEqualToLiveNodesOverseer() throws Exception {
+    testNoReplicationEqualNumberOfSlicesPerNodeSendCreateNodesEqualToLiveNodesInternal(false);
+  }
+
   @Test
-  public void testNoReplicationEqualNumberOfSlicesPerNodeSendCreateNodesEqualToLiveNodes() throws Exception {
+  public void testNoReplicationEqualNumberOfSlicesPerNodeSendCreateNodesEqualToLiveNodesDistributedUpdates() throws Exception {
+    testNoReplicationEqualNumberOfSlicesPerNodeSendCreateNodesEqualToLiveNodesInternal(true);
+  }
+
+  private void testNoReplicationEqualNumberOfSlicesPerNodeSendCreateNodesEqualToLiveNodesInternal(boolean distributedClusterStateUpdates) throws Exception {
     Integer numberOfNodes = 4;
     Integer numberOfNodesToCreateOn = 4;
     CreateNodeListOptions createNodeListOptions = CreateNodeListOptions.SEND;
     Integer replicationFactor = 1;
     Integer numberOfSlices = 8;
     testTemplate(numberOfNodes, numberOfNodesToCreateOn, createNodeListOptions, replicationFactor, numberOfSlices,
-        true);
+        true, distributedClusterStateUpdates);
   }
-  
+
   @Test
-  public void testReplicationEqualNumberOfSlicesPerNodeSendCreateNodesEqualToLiveNodes() throws Exception {
+  public void testReplicationEqualNumberOfSlicesPerNodeSendCreateNodesEqualToLiveNodesOverseer() throws Exception {
+    testReplicationEqualNumberOfSlicesPerNodeSendCreateNodesEqualToLiveNodesInternal(false);
+  }
+
+  @Test
+  public void testReplicationEqualNumberOfSlicesPerNodeSendCreateNodesEqualToLiveNodesDistributedUpdates() throws Exception {
+    testReplicationEqualNumberOfSlicesPerNodeSendCreateNodesEqualToLiveNodesInternal(true);
+  }
+
+  private void testReplicationEqualNumberOfSlicesPerNodeSendCreateNodesEqualToLiveNodesInternal(boolean distributedClusterStateUpdates) throws Exception {
     Integer numberOfNodes = 4;
     Integer numberOfNodesToCreateOn = 4;
     CreateNodeListOptions createNodeListOptions = CreateNodeListOptions.SEND;
     Integer replicationFactor = 2;
     Integer numberOfSlices = 4;
     testTemplate(numberOfNodes, numberOfNodesToCreateOn, createNodeListOptions, replicationFactor, numberOfSlices,
-        true);
+        true, distributedClusterStateUpdates);
   }
-  
+
+  @Test
+  public void testNoReplicationEqualNumberOfSlicesPerNodeSendNullCreateNodesOverseer() throws Exception {
+    testNoReplicationEqualNumberOfSlicesPerNodeSendNullCreateNodesInternal(false);
+  }
+
   @Test
-  public void testNoReplicationEqualNumberOfSlicesPerNodeSendNullCreateNodes() throws Exception {
+  public void testNoReplicationEqualNumberOfSlicesPerNodeSendNullCreateNodesDistributedUpdates() throws Exception {
+    testNoReplicationEqualNumberOfSlicesPerNodeSendNullCreateNodesInternal(true);
+  }
+
+  private void testNoReplicationEqualNumberOfSlicesPerNodeSendNullCreateNodesInternal(boolean distributedClusterStateUpdates) throws Exception {
     Integer numberOfNodes = 4;
     Integer numberOfNodesToCreateOn = 4;
     CreateNodeListOptions createNodeListOptions = CreateNodeListOptions.SEND_NULL;
     Integer replicationFactor = 1;
     Integer numberOfSlices = 8;
     testTemplate(numberOfNodes, numberOfNodesToCreateOn, createNodeListOptions, replicationFactor, numberOfSlices,
-        true);
+        true, distributedClusterStateUpdates);
   }
-  
+
+  @Test
+  public void testReplicationEqualNumberOfSlicesPerNodeSendNullCreateNodesOverseer() throws Exception {
+    testReplicationEqualNumberOfSlicesPerNodeSendNullCreateNodesInternal(false);
+  }
+
   @Test
-  public void testReplicationEqualNumberOfSlicesPerNodeSendNullCreateNodes() throws Exception {
+  public void testReplicationEqualNumberOfSlicesPerNodeSendNullCreateNodesDistributedUpdates() throws Exception {
+    testReplicationEqualNumberOfSlicesPerNodeSendNullCreateNodesInternal(true);
+  }
+
+  private void testReplicationEqualNumberOfSlicesPerNodeSendNullCreateNodesInternal(boolean distributedClusterStateUpdates) throws Exception {
     Integer numberOfNodes = 4;
     Integer numberOfNodesToCreateOn = 4;
     CreateNodeListOptions createNodeListOptions = CreateNodeListOptions.SEND_NULL;
     Integer replicationFactor = 2;
     Integer numberOfSlices = 4;
     testTemplate(numberOfNodes, numberOfNodesToCreateOn, createNodeListOptions, replicationFactor, numberOfSlices,
-        true);
-  }  
-  
+        true, distributedClusterStateUpdates);
+  }
+
+  @Test
+  public void testNoReplicationUnequalNumberOfSlicesPerNodeOverseer() throws Exception {
+    testNoReplicationUnequalNumberOfSlicesPerNodeInternal(false);
+  }
+
   @Test
-  public void testNoReplicationUnequalNumberOfSlicesPerNode() throws Exception {
+  public void testNoReplicationUnequalNumberOfSlicesPerNodeDistributedUpdates() throws Exception {
+    testNoReplicationUnequalNumberOfSlicesPerNodeInternal(true);
+  }
+
+  private void testNoReplicationUnequalNumberOfSlicesPerNodeInternal(boolean distributedClusterStateUpdates) throws Exception {
     Integer numberOfNodes = 4;
     Integer numberOfNodesToCreateOn = 4;
     CreateNodeListOptions createNodeListOptions = CreateNodeListOptions.DONT_SEND;
     Integer replicationFactor = 1;
     Integer numberOfSlices = 6;
     testTemplate(numberOfNodes, numberOfNodesToCreateOn, createNodeListOptions, replicationFactor, numberOfSlices,
-        true);
+        true, distributedClusterStateUpdates);
   }
-  
+
+  @Test
+  public void testReplicationUnequalNumberOfSlicesPerNodeOverseer() throws Exception {
+    testReplicationUnequalNumberOfSlicesPerNodeInternal(false);
+  }
+
   @Test
-  public void testReplicationUnequalNumberOfSlicesPerNode() throws Exception {
+  public void testReplicationUnequalNumberOfSlicesPerNodeDistributedUpdates() throws Exception {
+    testReplicationUnequalNumberOfSlicesPerNodeInternal(true);
+  }
+
+  private void testReplicationUnequalNumberOfSlicesPerNodeInternal(boolean distributedClusterStateUpdates) throws Exception {
     Integer numberOfNodes = 4;
     Integer numberOfNodesToCreateOn = 4;
     CreateNodeListOptions createNodeListOptions = CreateNodeListOptions.DONT_SEND;
     Integer replicationFactor = 2;
     Integer numberOfSlices = 3;
     testTemplate(numberOfNodes, numberOfNodesToCreateOn, createNodeListOptions, replicationFactor, numberOfSlices,
-        true);
+        true, distributedClusterStateUpdates);
   }
-  
+
   @Test
-  public void testNoReplicationLimitedNodesToCreateOn()
-      throws Exception {
+  public void testNoReplicationLimitedNodesToCreateOnOverseer() throws Exception {
+    testNoReplicationLimitedNodesToCreateOnInternal(false);
+  }
+
+  @Test
+  public void testNoReplicationLimitedNodesToCreateOnDistributedUpdates() throws Exception {
+    testNoReplicationLimitedNodesToCreateOnInternal(true);
+  }
+
+  private void testNoReplicationLimitedNodesToCreateOnInternal(boolean distributedClusterStateUpdates) throws Exception {
     Integer numberOfNodes = 4;
     Integer numberOfNodesToCreateOn = 2;
     CreateNodeListOptions createNodeListOptions = CreateNodeListOptions.SEND;
     Integer replicationFactor = 1;
     Integer numberOfSlices = 6;
     testTemplate(numberOfNodes, numberOfNodesToCreateOn, createNodeListOptions, replicationFactor, numberOfSlices,
-        true);
+        true, distributedClusterStateUpdates);
   }
-  
+
+  @Test
+  public void testReplicationLimitedNodesToCreateOnOverseer() throws Exception {
+    testReplicationLimitedNodesToCreateOnInternal(false);
+  }
+
   @Test
-  public void testReplicationLimitedNodesToCreateOn()
-      throws Exception {
+  public void testReplicationLimitedNodesToCreateOnDistributedUpdates() throws Exception {
+    testReplicationLimitedNodesToCreateOnInternal(true);
+  }
+
+  private void testReplicationLimitedNodesToCreateOnInternal(boolean distributedClusterStateUpdates) throws Exception {
     Integer numberOfNodes = 4;
     Integer numberOfNodesToCreateOn = 2;
     CreateNodeListOptions createNodeListOptions = CreateNodeListOptions.SEND;
     Integer replicationFactor = 2;
     Integer numberOfSlices = 3;
     testTemplate(numberOfNodes, numberOfNodesToCreateOn, createNodeListOptions, replicationFactor, numberOfSlices,
-        true);
+        true, distributedClusterStateUpdates);
   }
 
   @Test
-  public void testNoReplicationCollectionNotCreatedDueToMaxShardsPerNodeAndNodesToCreateOnLimits()
-      throws Exception {
+  public void testNoReplicationCollectionNotCreatedDueToMaxShardsPerNodeAndNodesToCreateOnLimitsOverseer() throws Exception {
+    testNoReplicationCollectionNotCreatedDueToMaxShardsPerNodeAndNodesToCreateOnLimitsInternal(false);
+  }
+
+  @Test
+  public void testNoReplicationCollectionNotCreatedDueToMaxShardsPerNodeAndNodesToCreateOnLimitsDistributedUpdates() throws Exception {
+    testNoReplicationCollectionNotCreatedDueToMaxShardsPerNodeAndNodesToCreateOnLimitsInternal(true);
+  }
+
+  private void testNoReplicationCollectionNotCreatedDueToMaxShardsPerNodeAndNodesToCreateOnLimitsInternal(boolean distributedClusterStateUpdates) throws Exception {
     Integer numberOfNodes = 4;
     Integer numberOfNodesToCreateOn = 3;
     CreateNodeListOptions createNodeListOptions = CreateNodeListOptions.SEND;
     Integer replicationFactor = 1;
     Integer numberOfSlices = 8;
     testTemplate(numberOfNodes, numberOfNodesToCreateOn, createNodeListOptions, replicationFactor, numberOfSlices,
-        false);
+        false, distributedClusterStateUpdates);
   }
-  
+
+  @Test
+  public void testReplicationCollectionNotCreatedDueToMaxShardsPerNodeAndNodesToCreateOnLimitsOverseer() throws Exception {
+    testReplicationCollectionNotCreatedDueToMaxShardsPerNodeAndNodesToCreateOnLimitsInternal(false);
+  }
+
   @Test
-  public void testReplicationCollectionNotCreatedDueToMaxShardsPerNodeAndNodesToCreateOnLimits()
-      throws Exception {
+  public void testReplicationCollectionNotCreatedDueToMaxShardsPerNodeAndNodesToCreateOnLimitsDistributedUpdates() throws Exception {
+    testReplicationCollectionNotCreatedDueToMaxShardsPerNodeAndNodesToCreateOnLimitsInternal(true);
+  }
+
+  private void testReplicationCollectionNotCreatedDueToMaxShardsPerNodeAndNodesToCreateOnLimitsInternal(boolean distributedClusterStateUpdates) throws Exception {
     Integer numberOfNodes = 4;
     Integer numberOfNodesToCreateOn = 3;
     CreateNodeListOptions createNodeListOptions = CreateNodeListOptions.SEND;
     Integer replicationFactor = 2;
     Integer numberOfSlices = 4;
     testTemplate(numberOfNodes, numberOfNodesToCreateOn, createNodeListOptions, replicationFactor, numberOfSlices,
-        false);
+        false, distributedClusterStateUpdates);
   }
 
 }
diff --git a/solr/core/src/test/org/apache/solr/cloud/OverseerRolesTest.java b/solr/core/src/test/org/apache/solr/cloud/OverseerRolesTest.java
index e6d26f9..05974a3 100644
--- a/solr/core/src/test/org/apache/solr/cloud/OverseerRolesTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/OverseerRolesTest.java
@@ -26,10 +26,7 @@ import java.util.function.Predicate;
 
 import org.apache.solr.client.solrj.embedded.JettySolrRunner;
 import org.apache.solr.client.solrj.request.CollectionAdminRequest;
-import org.apache.solr.cloud.overseer.OverseerAction;
-import org.apache.solr.common.cloud.ZkNodeProps;
 import org.apache.solr.common.util.TimeSource;
-import org.apache.solr.common.util.Utils;
 import org.apache.solr.util.TimeOut;
 import org.apache.zookeeper.KeeperException;
 import org.junit.After;
@@ -157,9 +154,7 @@ public class OverseerRolesTest extends SolrCloudTestCase {
     String leaderId = OverseerCollectionConfigSetProcessor.getLeaderId(zkClient());
     String leader = OverseerCollectionConfigSetProcessor.getLeaderNode(zkClient());
     log.info("### Sending QUIT to overseer {}", leader);
-    getOverseerJetty().getCoreContainer().getZkController().getOverseer().getStateUpdateQueue()
-        .offer(Utils.toJSON(new ZkNodeProps(Overseer.QUEUE_OPERATION, OverseerAction.QUIT.toLower(),
-            "id", leaderId)));
+    getOverseerJetty().getCoreContainer().getZkController().getOverseer().sendQuitToOverseer(leaderId);
 
     waitForNewOverseer(15, s -> Objects.equals(leader, s) == false, false);
 
diff --git a/solr/core/src/test/org/apache/solr/cloud/OverseerStatusTest.java b/solr/core/src/test/org/apache/solr/cloud/OverseerStatusTest.java
index 7293035..671ff3d 100644
--- a/solr/core/src/test/org/apache/solr/cloud/OverseerStatusTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/OverseerStatusTest.java
@@ -49,8 +49,11 @@ public class OverseerStatusTest extends SolrCloudTestCase {
     SimpleOrderedMap<Object> createcollection
         = (SimpleOrderedMap<Object>) collection_operations.get(CollectionParams.CollectionAction.CREATE.toLower());
     assertEquals("No stats for create in OverseerCollectionProcessor", numCollectionCreates + 1, createcollection.get("requests"));
-    createcollection = (SimpleOrderedMap<Object>) overseer_operations.get(CollectionParams.CollectionAction.CREATE.toLower());
-    assertEquals("No stats for create in Overseer", numOverseerCreates + 1, createcollection.get("requests"));
+    // When cluster state updates are distributed, Overseer doesn't see the updates and doesn't report stats on them.
+    if (!cluster.getOpenOverseer().getDistributedClusterStateUpdater().isDistributedStateUpdate()) {
+      createcollection = (SimpleOrderedMap<Object>) overseer_operations.get(CollectionParams.CollectionAction.CREATE.toLower());
+      assertEquals("No stats for create in Overseer", numOverseerCreates + 1, createcollection.get("requests"));
+    }
 
     // Reload the collection
     CollectionAdminRequest.reloadCollection(collectionName).process(cluster.getSolrClient());
@@ -81,19 +84,21 @@ public class OverseerStatusTest extends SolrCloudTestCase {
     assertNotNull(amIleader.get("errors"));
     assertNotNull(amIleader.get("avgTimePerRequest"));
 
-    amIleader = (SimpleOrderedMap<Object>) overseer_operations.get("am_i_leader");
-    assertNotNull("Overseer amILeader stats should not be null", amIleader);
-    assertNotNull(amIleader.get("requests"));
-    assertTrue(Integer.parseInt(amIleader.get("requests").toString()) > 0);
-    assertNotNull(amIleader.get("errors"));
-    assertNotNull(amIleader.get("avgTimePerRequest"));
-
-    SimpleOrderedMap<Object> updateState = (SimpleOrderedMap<Object>) overseer_operations.get("update_state");
-    assertNotNull("Overseer update_state stats should not be null", updateState);
-    assertNotNull(updateState.get("requests"));
-    assertTrue(Integer.parseInt(updateState.get("requests").toString()) > 0);
-    assertNotNull(updateState.get("errors"));
-    assertNotNull(updateState.get("avgTimePerRequest"));
+    // When cluster state updates are distributed, Overseer doesn't see the updates and doesn't report stats on them.
+    if (!cluster.getOpenOverseer().getDistributedClusterStateUpdater().isDistributedStateUpdate()) {
+      amIleader = (SimpleOrderedMap<Object>) overseer_operations.get("am_i_leader");
+      assertNotNull("Overseer amILeader stats should not be null", amIleader);
+      assertNotNull(amIleader.get("requests"));
+      assertTrue(Integer.parseInt(amIleader.get("requests").toString()) > 0);
+      assertNotNull(amIleader.get("errors"));
+      assertNotNull(amIleader.get("avgTimePerRequest"));
 
+      SimpleOrderedMap<Object> updateState = (SimpleOrderedMap<Object>) overseer_operations.get("update_state");
+      assertNotNull("Overseer update_state stats should not be null", updateState);
+      assertNotNull(updateState.get("requests"));
+      assertTrue(Integer.parseInt(updateState.get("requests").toString()) > 0);
+      assertNotNull(updateState.get("errors"));
+      assertNotNull(updateState.get("avgTimePerRequest"));
+    }
   }
 }
diff --git a/solr/core/src/test/org/apache/solr/cloud/OverseerTest.java b/solr/core/src/test/org/apache/solr/cloud/OverseerTest.java
index 1632617..a7c8e48 100644
--- a/solr/core/src/test/org/apache/solr/cloud/OverseerTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/OverseerTest.java
@@ -188,7 +188,10 @@ public class OverseerTest extends SolrTestCaseJ4 {
           ZkStateReader.REPLICATION_FACTOR, "1",
           ZkStateReader.NUM_SHARDS_PROP, Integer.toString(numShards),
           "createNodeSet", "");
-      ZkDistributedQueue q = MiniSolrCloudCluster.getOpenOverseer(overseers).getStateUpdateQueue();
+      final Overseer overseer = MiniSolrCloudCluster.getOpenOverseer(overseers);
+      // This being an Overseer test, we force it to use the Overseer based cluster state update. Look for "new Overseer" calls in this class.
+      assertFalse(overseer.getDistributedClusterStateUpdater().isDistributedStateUpdate());
+      ZkDistributedQueue q = overseer.getStateUpdateQueue();
       q.offer(Utils.toJSON(m));
     }
 
@@ -204,6 +207,7 @@ public class OverseerTest extends SolrTestCaseJ4 {
             ZkStateReader.CORE_NAME_PROP, coreName,
             ZkStateReader.CORE_NODE_NAME_PROP, coreNodeName,
             ZkStateReader.COLLECTION_PROP, collection);
+        assertFalse(overseer.getDistributedClusterStateUpdater().isDistributedStateUpdate());
         ZkDistributedQueue q = overseer.getStateUpdateQueue();
         q.offer(Utils.toJSON(m));
         return null;
@@ -387,7 +391,7 @@ public class OverseerTest extends SolrTestCaseJ4 {
         ZkStateReader.REPLICATION_FACTOR, "1",
         ZkStateReader.NUM_SHARDS_PROP, Integer.toString(numShards),
         "createNodeSet", "");
-    ZkDistributedQueue q = overseers.get(0).getStateUpdateQueue();
+    ZkDistributedQueue q = getOverseerZero().getStateUpdateQueue();
     q.offer(Utils.toJSON(m));
   }
 
@@ -530,7 +534,7 @@ public class OverseerTest extends SolrTestCaseJ4 {
             ZkStateReader.NODE_NAME_PROP, "127.0.0.1:8983_solr");
         List<ZkWriteCommand> commands = new NodeMutator().downNode(reader.getClusterState(), m);
 
-        ZkDistributedQueue q = overseers.get(0).getStateUpdateQueue();
+        ZkDistributedQueue q = getOverseerZero().getStateUpdateQueue();
 
         q.offer(Utils.toJSON(m));
 
@@ -586,7 +590,7 @@ public class OverseerTest extends SolrTestCaseJ4 {
 
       overseerClient = electNewOverseer(server.getZkAddress());
 
-      ZkDistributedQueue q = overseers.get(0).getStateUpdateQueue();
+      ZkDistributedQueue q = getOverseerZero().getStateUpdateQueue();
 
       createCollection(COLLECTION, 1);
 
@@ -640,7 +644,15 @@ public class OverseerTest extends SolrTestCaseJ4 {
   }
 
   private Overseer getOpenOverseer() {
-    return MiniSolrCloudCluster.getOpenOverseer(overseers);
+    Overseer overseer = MiniSolrCloudCluster.getOpenOverseer(overseers);
+    assertFalse(overseer.getDistributedClusterStateUpdater().isDistributedStateUpdate());
+    return overseer;
+  }
+
+  private Overseer getOverseerZero() {
+    Overseer overseer = overseers.get(0);
+    assertFalse(overseer.getDistributedClusterStateUpdater().isDistributedStateUpdate());
+    return overseer;
   }
 
   @Test
@@ -739,7 +751,7 @@ public class OverseerTest extends SolrTestCaseJ4 {
       httpShardHandlerFactory.init(new PluginInfo("shardHandlerFactory", Collections.emptyMap()));
       httpShardHandlerFactorys.add(httpShardHandlerFactory);
       Overseer overseer = new Overseer((HttpShardHandler) httpShardHandlerFactory.getShardHandler(), updateShardHandler, "/admin/cores", reader, zkController,
-          new CloudConfig.CloudConfigBuilder("127.0.0.1", 8983, "").build());
+          new CloudConfig.CloudConfigBuilder("127.0.0.1", 8983, "").setUseDistributedClusterStateUpdates(false).build());
       overseers.add(overseer);
       ElectionContext ec = new OverseerElectionContext(zkClient, overseer,
           server.getZkAddress().replaceAll("/", "_"));
@@ -897,7 +909,7 @@ public class OverseerTest extends SolrTestCaseJ4 {
 
      electNewOverseer(server.getZkAddress());
 
-      // Create collection znode before repeatedly trying to enqueue the Cluster state change message
+      // Create collection znode before repeatedly trying to enqueue the cluster state update message
       zkClient.makePath(ZkStateReader.COLLECTIONS_ZKNODE + "/" + COLLECTION, true);
 
       for (int i = 0; i < atLeast(4); i++) {
@@ -914,7 +926,7 @@ public class OverseerTest extends SolrTestCaseJ4 {
                 ZkStateReader.REPLICATION_FACTOR, "1",
                 ZkStateReader.NUM_SHARDS_PROP, "1",
                 "createNodeSet", "");
-            ZkDistributedQueue q = MiniSolrCloudCluster.getOpenOverseer(overseers).getStateUpdateQueue();
+            ZkDistributedQueue q = getOpenOverseer().getStateUpdateQueue();
             q.offer(Utils.toJSON(m));
             break;
           } catch (SolrException | KeeperException | AlreadyClosedException e) {
@@ -1104,7 +1116,7 @@ public class OverseerTest extends SolrTestCaseJ4 {
             ZkStateReader.NUM_SHARDS_PROP, "1",
             ZkStateReader.REPLICATION_FACTOR, "1"
             );
-        ZkDistributedQueue q = overseers.get(0).getStateUpdateQueue();
+        ZkDistributedQueue q = getOverseerZero().getStateUpdateQueue();
         q.offer(Utils.toJSON(m));
       }
 
@@ -1117,7 +1129,7 @@ public class OverseerTest extends SolrTestCaseJ4 {
             ZkStateReader.CORE_NODE_NAME_PROP, "node1",
             ZkStateReader.COLLECTION_PROP, "perf" + j,
             ZkStateReader.NUM_SHARDS_PROP, "1");
-        ZkDistributedQueue q = overseers.get(0).getStateUpdateQueue();
+        ZkDistributedQueue q = getOverseerZero().getStateUpdateQueue();
         q.offer(Utils.toJSON(m));
         if (j >= MAX_COLLECTIONS - 1) j = 0;
         if (k >= MAX_CORES - 1) k = 0;
@@ -1216,7 +1228,7 @@ public class OverseerTest extends SolrTestCaseJ4 {
           ZkStateReader.ROLES_PROP, "",
           ZkStateReader.STATE_PROP, Replica.State.RECOVERING.toString());
       queue.offer(Utils.toJSON(m));
-      m = new ZkNodeProps(Overseer.QUEUE_OPERATION, "state",
+      m = new ZkNodeProps(Overseer.QUEUE_OPERATION, OverseerAction.STATE.toLower(),
           ZkStateReader.NODE_NAME_PROP, "node1:8983_",
           ZkStateReader.SHARD_ID_PROP, "shard1",
           ZkStateReader.COLLECTION_PROP, COLLECTION,
@@ -1228,7 +1240,7 @@ public class OverseerTest extends SolrTestCaseJ4 {
       overseerClient = electNewOverseer(server.getZkAddress());
 
       //submit to proper queue
-      queue = overseers.get(0).getStateUpdateQueue();
+      queue = getOverseerZero().getStateUpdateQueue();
       m = new ZkNodeProps(Overseer.QUEUE_OPERATION, OverseerAction.STATE.toLower(),
           ZkStateReader.NODE_NAME_PROP, "127.0.0.1:8983_solr",
           ZkStateReader.SHARD_ID_PROP, "shard1",
@@ -1265,7 +1277,7 @@ public class OverseerTest extends SolrTestCaseJ4 {
 
       overseerClient = electNewOverseer(server.getZkAddress());
 
-      ZkDistributedQueue q = overseers.get(0).getStateUpdateQueue();
+      ZkDistributedQueue q = getOverseerZero().getStateUpdateQueue();
 
       createCollection("c1", 1);
 
@@ -1383,8 +1395,9 @@ public class OverseerTest extends SolrTestCaseJ4 {
 
     ZkController zkController = createMockZkController(address, null, reader);
     zkControllers.add(zkController);
+    // Create an Overseer with associated configuration to NOT USE distributed state update. Tests in this class really test the Overseer.
     Overseer overseer = new Overseer((HttpShardHandler) httpShardHandlerFactory.getShardHandler(), updateShardHandler, "/admin/cores", reader, zkController,
-        new CloudConfig.CloudConfigBuilder("127.0.0.1", 8983, "").build());
+        new CloudConfig.CloudConfigBuilder("127.0.0.1", 8983, "").setUseDistributedClusterStateUpdates(false).build());
     overseers.add(overseer);
     ElectionContext ec = new OverseerElectionContext(zkClient, overseer,
         address.replaceAll("/", "_"));
@@ -1464,7 +1477,7 @@ public class OverseerTest extends SolrTestCaseJ4 {
 
       overseerClient = electNewOverseer(server.getZkAddress());
 
-      ZkDistributedQueue q = overseers.get(0).getStateUpdateQueue();
+      ZkDistributedQueue q = getOverseerZero().getStateUpdateQueue();
 
       // create collection
       {
diff --git a/solr/core/src/test/org/apache/solr/cloud/TestRandomRequestDistribution.java b/solr/core/src/test/org/apache/solr/cloud/TestRandomRequestDistribution.java
index 5453574..3eac4c6 100644
--- a/solr/core/src/test/org/apache/solr/cloud/TestRandomRequestDistribution.java
+++ b/solr/core/src/test/org/apache/solr/cloud/TestRandomRequestDistribution.java
@@ -181,8 +181,15 @@ public class TestRandomRequestDistribution extends AbstractFullDistribZkTestBase
     if (log.isInfoEnabled()) {
       log.info("Forcing {} to go into 'down' state", notLeader.getStr(ZkStateReader.CORE_NAME_PROP));
     }
-    ZkDistributedQueue q = jettys.get(0).getCoreContainer().getZkController().getOverseer().getStateUpdateQueue();
-    q.offer(Utils.toJSON(m));
+
+    final Overseer overseer = jettys.get(0).getCoreContainer().getZkController().getOverseer();
+    if (overseer.getDistributedClusterStateUpdater().isDistributedStateUpdate()) {
+      overseer.getDistributedClusterStateUpdater().doSingleStateUpdate(
+          DistributedClusterStateUpdater.MutatingCommand.ReplicaSetState, m, overseer.getSolrCloudManager(), overseer.getZkStateReader());
+    } else {
+      ZkDistributedQueue q = overseer.getStateUpdateQueue();
+      q.offer(Utils.toJSON(m));
+    }
 
     verifyReplicaStatus(cloudClient.getZkStateReader(), "football", "shard1", notLeader.getName(), Replica.State.DOWN);
 
diff --git a/solr/core/src/test/org/apache/solr/cloud/TestSkipOverseerOperations.java b/solr/core/src/test/org/apache/solr/cloud/TestSkipOverseerOperations.java
index ce72800..ada8bd3 100644
--- a/solr/core/src/test/org/apache/solr/cloud/TestSkipOverseerOperations.java
+++ b/solr/core/src/test/org/apache/solr/cloud/TestSkipOverseerOperations.java
@@ -121,7 +121,15 @@ public class TestSkipOverseerOperations extends SolrCloudTestCase {
     waitForState("Expected 2x1 for collection: " + collection, collection,
         clusterShape(2, 2));
     CollectionAdminResponse resp2 = CollectionAdminRequest.getOverseerStatus().process(cluster.getSolrClient());
-    assertEquals(getNumLeaderOpeations(resp), getNumLeaderOpeations(resp2));
+
+    // When cluster state updates are done in a distributed way, the stats that this test is verifying are not available.
+    // See comment in OverseerStatusCmd.call().
+    // Keeping the rest of the test running in case other errors can happen and can be caught...
+    // Eventually maintain per node cluster state updates stats and be able to check them here? Longer term question...
+
+    if (!cluster.getOpenOverseer().getDistributedClusterStateUpdater().isDistributedStateUpdate()) {
+      assertEquals(getNumLeaderOpeations(resp), getNumLeaderOpeations(resp2));
+    }
     CollectionAdminRequest.deleteCollection(collection).process(cluster.getSolrClient());
   }
 
@@ -186,15 +194,30 @@ public class TestSkipOverseerOperations extends SolrCloudTestCase {
     waitForState("Expected 2x2 for collection: " + collection, collection,
         clusterShape(2, 4));
     CollectionAdminResponse resp2 = CollectionAdminRequest.getOverseerStatus().process(cluster.getSolrClient());
-    // 2 for recovering state, 4 for active state
-    assertEquals(getNumStateOpeations(resp) + 6, getNumStateOpeations(resp2));
+
+    // See comment in testSkipLeaderOperations() above why this assert is skipped
+    if (!cluster.getOpenOverseer().getDistributedClusterStateUpdater().isDistributedStateUpdate()) {
+      // 2 for recovering state, 4 for active state
+      assertEquals(getNumStateOpeations(resp) + 6, getNumStateOpeations(resp2));
+    }
     CollectionAdminRequest.deleteCollection(collection).process(cluster.getSolrClient());
   }
 
+  /**
+   * Returns the value corresponding to stat: "overseer_operations", "leader", "requests"
+   * This stat (see {@link org.apache.solr.cloud.api.collections.OverseerStatusCmd} is updated when the cluster state
+   * updater processes a message of type {@link org.apache.solr.cloud.overseer.OverseerAction#LEADER} to set a shard leader<p>
+   *
+   * The update happens in org.apache.solr.cloud.Overseer.ClusterStateUpdater.processQueueItem()
+   */
   private int getNumLeaderOpeations(CollectionAdminResponse resp) {
     return (int) resp.getResponse().findRecursive("overseer_operations", "leader", "requests");
   }
 
+  /**
+   * "state" stats are when Overseer processes a {@link org.apache.solr.cloud.overseer.OverseerAction#STATE} message
+   * that sets replica properties
+   */
   private int getNumStateOpeations(CollectionAdminResponse resp) {
     return (int) resp.getResponse().findRecursive("overseer_operations", "state", "requests");
   }
diff --git a/solr/core/src/test/org/apache/solr/cloud/ZkControllerTest.java b/solr/core/src/test/org/apache/solr/cloud/ZkControllerTest.java
index 8018e78..ae9001b 100644
--- a/solr/core/src/test/org/apache/solr/cloud/ZkControllerTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/ZkControllerTest.java
@@ -288,7 +288,12 @@ public class ZkControllerTest extends SolrTestCaseJ4 {
         ZkNodeProps m = new ZkNodeProps(Overseer.QUEUE_OPERATION,
             CollectionParams.CollectionAction.CREATE.toLower(), ZkStateReader.NODE_NAME_PROP, nodeName, ZkStateReader.NUM_SHARDS_PROP, "1",
             "name", collectionName);
-        zkController.getOverseerJobQueue().offer(Utils.toJSON(m));
+        if (zkController.getDistributedClusterStateUpdater().isDistributedStateUpdate()) {
+          zkController.getDistributedClusterStateUpdater().doSingleStateUpdate(DistributedClusterStateUpdater.MutatingCommand.ClusterCreateCollection, m,
+              zkController.getSolrCloudManager(), zkController.getZkStateReader());
+        } else {
+          zkController.getOverseerJobQueue().offer(Utils.toJSON(m));
+        }
 
         HashMap<String, Object> propMap = new HashMap<>();
         propMap.put(Overseer.QUEUE_OPERATION, ADDREPLICA.toLower());
@@ -297,7 +302,12 @@ public class ZkControllerTest extends SolrTestCaseJ4 {
         propMap.put(ZkStateReader.NODE_NAME_PROP, "non_existent_host1");
         propMap.put(ZkStateReader.CORE_NAME_PROP, collectionName);
         propMap.put(ZkStateReader.STATE_PROP, "active");
-        zkController.getOverseerJobQueue().offer(Utils.toJSON(propMap));
+        if (zkController.getDistributedClusterStateUpdater().isDistributedStateUpdate()) {
+          zkController.getDistributedClusterStateUpdater().doSingleStateUpdate(DistributedClusterStateUpdater.MutatingCommand.SliceAddReplica, new ZkNodeProps(propMap),
+              zkController.getSolrCloudManager(), zkController.getZkStateReader());
+        } else {
+          zkController.getOverseerJobQueue().offer(Utils.toJSON(propMap));
+        }
 
         propMap = new HashMap<>();
         propMap.put(Overseer.QUEUE_OPERATION, ADDREPLICA.toLower());
@@ -306,7 +316,12 @@ public class ZkControllerTest extends SolrTestCaseJ4 {
         propMap.put(ZkStateReader.NODE_NAME_PROP, "non_existent_host2");
         propMap.put(ZkStateReader.CORE_NAME_PROP, collectionName);
         propMap.put(ZkStateReader.STATE_PROP, "down");
-        zkController.getOverseerJobQueue().offer(Utils.toJSON(propMap));
+        if (zkController.getDistributedClusterStateUpdater().isDistributedStateUpdate()) {
+          zkController.getDistributedClusterStateUpdater().doSingleStateUpdate(DistributedClusterStateUpdater.MutatingCommand.SliceAddReplica, new ZkNodeProps(propMap),
+              zkController.getSolrCloudManager(), zkController.getZkStateReader());
+        } else {
+          zkController.getOverseerJobQueue().offer(Utils.toJSON(propMap));
+        }
 
         zkController.getZkStateReader().forciblyRefreshAllClusterStateSlow();
 
diff --git a/solr/server/solr/solr.xml b/solr/server/solr/solr.xml
index e34d0a23..0fc94e2 100644
--- a/solr/server/solr/solr.xml
+++ b/solr/server/solr/solr.xml
@@ -45,6 +45,7 @@
     <int name="distribUpdateConnTimeout">${distribUpdateConnTimeout:60000}</int>
     <str name="zkCredentialsProvider">${zkCredentialsProvider:org.apache.solr.common.cloud.DefaultZkCredentialsProvider}</str>
     <str name="zkACLProvider">${zkACLProvider:org.apache.solr.common.cloud.DefaultZkACLProvider}</str>
+    <bool name="distributedClusterStateUpdates">${distributedClusterStateUpdates:false}</bool>
 
   </solrcloud>
 
diff --git a/solr/solr-ref-guide/src/format-of-solr-xml.adoc b/solr/solr-ref-guide/src/format-of-solr-xml.adoc
index 458b936..bdae4e5 100644
--- a/solr/solr-ref-guide/src/format-of-solr-xml.adoc
+++ b/solr/solr-ref-guide/src/format-of-solr-xml.adoc
@@ -42,6 +42,7 @@ You can find `solr.xml` in your `$SOLR_HOME` directory (usually `server/solr` or
     <int name="distribUpdateConnTimeout">${distribUpdateConnTimeout:60000}</int>
     <str name="zkCredentialsProvider">${zkCredentialsProvider:org.apache.solr.common.cloud.DefaultZkCredentialsProvider}</str>
     <str name="zkACLProvider">${zkACLProvider:org.apache.solr.common.cloud.DefaultZkACLProvider}</str>
+    <bool name="distributedClusterStateUpdates">${distributedClusterStateUpdates:false}</bool>
   </solrcloud>
 
   <shardHandlerFactory name="shardHandlerFactory"
@@ -163,6 +164,9 @@ If `TRUE`, node names are not based on the address of the node, but on a generic
 Optional parameters that can be specified if you are using <<zookeeper-access-control.adoc#,ZooKeeper Access Control>>.
 
 
+`distributedClusterStateUpdates`::
+If `TRUE`, the internal behavior of SolrCloud is changed to not use the Overseer for collections' `state.json` updates but do this directly against ZooKeeper.
+
 === The <logging> Element
 
 `class`::
diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/PerReplicaStatesOps.java b/solr/solrj/src/java/org/apache/solr/common/cloud/PerReplicaStatesOps.java
index 0f5bb53..02bf6f9 100644
--- a/solr/solrj/src/java/org/apache/solr/common/cloud/PerReplicaStatesOps.java
+++ b/solr/solrj/src/java/org/apache/solr/common/cloud/PerReplicaStatesOps.java
@@ -42,7 +42,6 @@ public class PerReplicaStatesOps {
     private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
     private PerReplicaStates rs;
     List<PerReplicaStates.Operation> ops;
-    private boolean preOp = true;
     final Function<PerReplicaStates, List<PerReplicaStates.Operation>> fun;
 
     PerReplicaStatesOps(Function<PerReplicaStates, List<PerReplicaStates.Operation>> fun) {
@@ -255,7 +254,6 @@ public class PerReplicaStatesOps {
             }
             return operations;
         });
-        result.preOp = false;
         result.ops = result.refresh(null);
         return result;
     }
@@ -278,13 +276,6 @@ public class PerReplicaStatesOps {
     }
 
     /**
-     * To be executed before collection state.json is persisted
-     */
-    public boolean isPreOp() {
-        return preOp;
-    }
-
-    /**
      * This method should compute the set of ZK operations for a given action
      * for instance, a state change may result in 2 operations on per-replica states (1 CREATE and 1 DELETE)
      * if a multi operation fails because the state got modified from behind,
diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java b/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java
index ccd93c6..baae22b 100644
--- a/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java
+++ b/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java
@@ -410,7 +410,7 @@ public class ZkStateReader implements SolrCloseable {
       if (log.isDebugEnabled()) {
         log.debug("Server older than client {}<{}", collection.getZNodeVersion(), version);
       }
-      DocCollection nu = getCollectionLive(this, coll);
+      DocCollection nu = getCollectionLive(coll);
       if (nu == null) return -1;
       if (nu.getZNodeVersion() > collection.getZNodeVersion()) {
         if (updateWatchedCollection(coll, nu)) {
@@ -680,7 +680,7 @@ public class ZkStateReader implements SolrCloseable {
           }
         }
         if (shouldFetch) {
-          cachedDocCollection = getCollectionLive(ZkStateReader.this, collName);
+          cachedDocCollection = getCollectionLive(collName);
           lastUpdateTime = System.nanoTime();
         }
       }
@@ -1193,7 +1193,7 @@ public class ZkStateReader implements SolrCloseable {
   }
 
   /**
-   * Watches a single collection's format2 state.json.
+   * Watches a single collection's state.json.
    */
   class StateWatcher implements Watcher {
     private final String coll;
@@ -1446,9 +1446,9 @@ public class ZkStateReader implements SolrCloseable {
     }
   }
 
-  public static DocCollection getCollectionLive(ZkStateReader zkStateReader, String coll) {
+  public DocCollection getCollectionLive(String coll) {
     try {
-      return zkStateReader.fetchCollectionState(coll, null);
+      return fetchCollectionState(coll, null);
     } catch (KeeperException e) {
       throw new SolrException(ErrorCode.BAD_REQUEST, "Could not load collection from ZK: " + coll, e);
     } catch (InterruptedException e) {
diff --git a/solr/test-framework/src/java/org/apache/solr/cloud/MiniSolrCloudCluster.java b/solr/test-framework/src/java/org/apache/solr/cloud/MiniSolrCloudCluster.java
index 7cd09a1..c248db8 100644
--- a/solr/test-framework/src/java/org/apache/solr/cloud/MiniSolrCloudCluster.java
+++ b/solr/test-framework/src/java/org/apache/solr/cloud/MiniSolrCloudCluster.java
@@ -120,6 +120,7 @@ public class MiniSolrCloudCluster {
       "    <str name=\"zkACLProvider\">${zkACLProvider:org.apache.solr.common.cloud.DefaultZkACLProvider}</str> \n" +
       "    <str name=\"pkiHandlerPrivateKeyPath\">${pkiHandlerPrivateKeyPath:cryptokeys/priv_key512_pkcs8.pem}</str> \n" +
       "    <str name=\"pkiHandlerPublicKeyPath\">${pkiHandlerPublicKeyPath:cryptokeys/pub_key512.der}</str> \n" +
+      "    <str name=\"distributedClusterStateUpdates\">${solr.distributedClusterStateUpdates:false}</str> \n" +
       "  </solrcloud>\n" +
       // NOTE: this turns off the metrics collection unless overriden by a sysprop
       "  <metrics enabled=\"${metricsEnabled:false}\">\n" +
diff --git a/solr/test-framework/src/java/org/apache/solr/cloud/SolrCloudTestCase.java b/solr/test-framework/src/java/org/apache/solr/cloud/SolrCloudTestCase.java
index c6f26c6..35c87e7 100644
--- a/solr/test-framework/src/java/org/apache/solr/cloud/SolrCloudTestCase.java
+++ b/solr/test-framework/src/java/org/apache/solr/cloud/SolrCloudTestCase.java
@@ -35,6 +35,7 @@ import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Consumer;
 import java.util.function.Predicate;
 
+import org.apache.lucene.util.LuceneTestCase;
 import org.apache.solr.SolrTestCaseJ4;
 import org.apache.solr.client.solrj.SolrRequest;
 import org.apache.solr.client.solrj.SolrServerException;
@@ -109,6 +110,7 @@ public class SolrCloudTestCase extends SolrTestCaseJ4 {
     private Map<String, Object> clusterProperties = new HashMap<>();
 
     private boolean trackJettyMetrics;
+    private boolean useDistributedClusterStateUpdate;
 
     /**
      * Create a builder
@@ -119,6 +121,8 @@ public class SolrCloudTestCase extends SolrTestCaseJ4 {
     public Builder(int nodeCount, Path baseDir) {
       this.nodeCount = nodeCount;
       this.baseDir = baseDir;
+      // By default the MiniSolrCloudCluster being built will randomly (seed based) decide which cluster update strategy to use
+      this.useDistributedClusterStateUpdate = LuceneTestCase.random().nextInt(2) == 0;
     }
 
     /**
@@ -187,6 +191,48 @@ public class SolrCloudTestCase extends SolrTestCaseJ4 {
     }
 
     /**
+     * This method makes the MiniSolrCloudCluster use the "other" cluster state update strategy than it normally would.
+     * When some test classes call this method (and some don't) we make sure that a run of multiple tests with a single
+     * seed will exercise both code lines (distributed updates and Overseer based updates) so regressions can be spotted
+     * faster.<p>
+     *
+     * The real need is for a few tests covering reasonable use cases to call this method. If you're adding a new test,
+     * you don't have to call it (but it's ok if you do).
+     */
+    public Builder useOtherClusterStateUpdateStrategy() {
+      useDistributedClusterStateUpdate = !useDistributedClusterStateUpdate;
+      return this;
+    }
+
+    /**
+     * Force the cluster state update strategy to be either Overseer based or distributed. <b>This method can be useful when
+     * debugging tests</b> failing in only one of the two modes to have all local runs exhibit the issue, as well obviously for
+     * tests that are not compatible with one of the two modes.
+     * <p>
+     * If this method is not called, the strategy being used will be random if the configuration passed to the cluster
+     * ({@code solr.xml} equivalent) contains a placeholder similar to:
+     * <pre>
+     * {@code
+     * <solrcloud>
+     *   ....
+     *   <str name="distributedClusterStateUpdates">${solr.distributedClusterStateUpdates:false}</str>
+     *   ....
+     * </solrcloud>
+     * }</pre>
+     * For an example of a configuration supporting this setting, see {@link MiniSolrCloudCluster#DEFAULT_CLOUD_SOLR_XML}.
+     * When a test sets a different {@code solr.xml} config (using {@link #withSolrXml}), if the config does not contain
+     * the placeholder, the strategy will be defined by the value assigned to {@code useDistributedClusterStateUpdates}
+     * in {@link org.apache.solr.core.CloudConfig.CloudConfigBuilder}.
+     *
+     * @param distributed When {@code true}, cluster state updates are handled in a distributed way by nodes. When
+     *                    {@code false}, cluster state updates are handled by Overseer.
+     */
+    public Builder withDistributedClusterStateUpdates(boolean distributed) {
+      useDistributedClusterStateUpdate = distributed;
+      return this;
+    }
+
+    /**
      * Set a cluster property
      *
      * @param propertyName  the property name
@@ -217,6 +263,14 @@ public class SolrCloudTestCase extends SolrTestCaseJ4 {
      * @throws Exception if an error occurs on startup
      */
     public MiniSolrCloudCluster build() throws Exception {
+      // This will have an impact on how the MiniSolrCloudCluster and therefore the test run if the config being
+      // used does have the appropriate placeholder.
+      // It is a good place to hard code true or false instead of useDistributedClusterStateUpdate to run all qualifying
+      // tests with a given cluster state update strategy (non qualifying tests will use the default value assigned to
+      // useDistributedClusterStateUpdates in org.apache.solr.core.CloudConfig.CloudConfigBuilder, so if you really want
+      // ALL tests to run with a given strategy, patch it there too (and revert before commit!)
+      System.setProperty("solr.distributedClusterStateUpdates", Boolean.toString(useDistributedClusterStateUpdate));
+
       JettyConfig jettyConfig = jettyConfigBuilder.build();
       MiniSolrCloudCluster cluster = new MiniSolrCloudCluster(nodeCount, baseDir, solrxml, jettyConfig,
           null, securityJson, trackJettyMetrics);