You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by md...@apache.org on 2020/06/09 18:59:30 UTC

[lucene-solr] branch master updated: SOLR-12823: remove /clusterstate.json (#1528)

This is an automated email from the ASF dual-hosted git repository.

mdrob pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git


The following commit(s) were added to refs/heads/master by this push:
     new 1ab9b81  SOLR-12823: remove /clusterstate.json (#1528)
1ab9b81 is described below

commit 1ab9b811c65abb3d1a827c87b4f1135116ff90eb
Author: murblanc <43...@users.noreply.github.com>
AuthorDate: Tue Jun 9 20:59:17 2020 +0200

    SOLR-12823: remove /clusterstate.json (#1528)
    
    * SOLR-12823: remove /clusterstate.json
    
    Remove all code dealing with Zookeeper's /clusterstate.json, remove Collection API's MIGRATESTATEVERSION, remove legacyCloud option.
    
    Also fixes SOLR-11877 DocCollection.getStateFormat is buggy
    
    Co-authored-by: Ilan Ginzburg <ig...@salesforce.com>
---
 solr/CHANGES.txt                                   |   3 +
 .../src/java/org/apache/solr/cloud/Overseer.java   |  15 +-
 .../java/org/apache/solr/cloud/ZkController.java   | 134 ++++++++------
 .../solr/cloud/api/collections/AddReplicaCmd.java  |  40 ++---
 .../solr/cloud/api/collections/BackupCmd.java      |   4 +-
 .../cloud/api/collections/CreateCollectionCmd.java |  49 +++--
 .../OverseerCollectionMessageHandler.java          |  43 +----
 .../api/collections/ReindexCollectionCmd.java      |   2 -
 .../solr/cloud/api/collections/RestoreCmd.java     |  12 +-
 .../autoscaling/sim/SimClusterStateProvider.java   |   4 +-
 .../sim/SnapshotClusterStateProvider.java          |  12 +-
 .../solr/cloud/overseer/ClusterStateMutator.java   |  18 +-
 .../solr/cloud/overseer/CollectionMutator.java     |   2 +-
 .../apache/solr/cloud/overseer/ReplicaMutator.java |  44 ++---
 .../apache/solr/cloud/overseer/ZkStateWriter.java  |  43 +----
 .../java/org/apache/solr/core/CoreContainer.java   |  45 +----
 .../org/apache/solr/core/backup/BackupManager.java |   2 +-
 .../apache/solr/handler/admin/ClusterStatus.java   |  32 +---
 .../org/apache/solr/handler/admin/ColStatus.java   |   1 -
 .../solr/handler/admin/CollectionsHandler.java     |   9 +-
 .../solr/handler/admin/ZookeeperInfoHandler.java   | 200 ++++++++++-----------
 .../src/java/org/apache/solr/util/SolrCLI.java     |   1 -
 .../test/org/apache/solr/cloud/BasicZkTest.java    | 181 -------------------
 .../apache/solr/cloud/ClusterStateMockUtil.java    |   2 +-
 .../org/apache/solr/cloud/ClusterStateTest.java    |   8 +-
 .../org/apache/solr/cloud/CollectionPropsTest.java |   4 -
 ...mat2Test.java => CollectionStateZnodeTest.java} |   7 +-
 .../apache/solr/cloud/CollectionsAPISolrJTest.java |  26 +--
 .../apache/solr/cloud/CreateRoutedAliasTest.java   |   2 -
 .../solr/cloud/DeleteInactiveReplicaTest.java      |   2 -
 .../org/apache/solr/cloud/DeleteReplicaTest.java   |  34 +---
 .../solr/cloud/LegacyCloudClusterPropTest.java     | 180 -------------------
 .../org/apache/solr/cloud/MigrateRouteKeyTest.java |   5 -
 .../OverseerCollectionConfigSetProcessorTest.java  |   2 -
 .../test/org/apache/solr/cloud/OverseerTest.java   | 155 ++++++++--------
 .../apache/solr/cloud/ShardRoutingCustomTest.java  |   3 -
 .../cloud/SharedFSAutoReplicaFailoverTest.java     |   5 -
 .../test/org/apache/solr/cloud/SliceStateTest.java |   4 +-
 .../apache/solr/cloud/TestClusterProperties.java   |  10 +-
 .../org/apache/solr/cloud/TestPullReplica.java     |   5 -
 .../solr/cloud/TestPullReplicaErrorHandling.java   |  15 --
 .../org/apache/solr/cloud/TestTlogReplica.java     |   5 -
 .../test/org/apache/solr/cloud/TestZkChroot.java   | 153 ----------------
 .../org/apache/solr/cloud/ZkControllerTest.java    |   3 +-
 .../AbstractCloudBackupRestoreTestCase.java        |   2 -
 .../CollectionsAPIAsyncDistributedZkTest.java      |  16 +-
 .../solr/cloud/api/collections/ShardSplitTest.java |  15 --
 .../SimpleCollectionCreateDeleteTest.java          |   6 +-
 .../cloud/api/collections/TestCollectionAPI.java   |  31 ----
 .../cloud/overseer/TestClusterStateMutator.java    |   4 +-
 .../overseer/ZkCollectionPropsCachingTest.java     |   4 -
 .../solr/cloud/overseer/ZkStateReaderTest.java     | 111 +-----------
 .../solr/cloud/overseer/ZkStateWriterTest.java     | 149 ++-------------
 .../core/snapshots/TestSolrCloudSnapshots.java     |   2 +-
 .../solr/handler/admin/TestCollectionAPIs.java     |   8 +-
 .../component/TestHttpShardHandlerFactory.java     |   2 +-
 .../src/test/org/apache/solr/util/TestUtils.java   |   2 +-
 .../src/cluster-node-management.adoc               |  21 +--
 solr/solr-ref-guide/src/collection-management.adoc |   2 -
 .../src/major-changes-in-solr-9.adoc               |  10 ++
 .../src/rule-based-replica-placement.adoc          |   2 +-
 solr/solr-ref-guide/src/shard-management.adoc      |   2 +-
 .../client/solrj/cloud/autoscaling/Policy.java     |   8 +-
 .../client/solrj/impl/BaseCloudSolrClient.java     |  23 ++-
 .../solrj/impl/BaseHttpClusterStateProvider.java   |   3 +-
 .../solrj/request/CollectionAdminRequest.java      |  36 ----
 .../org/apache/solr/common/cloud/ClusterState.java |  83 +++------
 .../apache/solr/common/cloud/DocCollection.java    |  25 +--
 .../org/apache/solr/common/cloud/SolrZkClient.java |   7 +-
 .../apache/solr/common/cloud/ZkStateReader.java    | 187 ++++---------------
 .../solr/common/params/CollectionParams.java       |   3 +-
 .../src/resources/apispec/cluster.Commands.json    |   3 -
 ...collections.collection.shards.shard.delete.json |   2 +-
 ...ons.collection.shards.shard.replica.delete.json |   2 +-
 .../client/solrj/cloud/autoscaling/TestPolicy.java |  29 ++-
 .../solrj/cloud/autoscaling/TestPolicy2.java       |   4 +-
 .../solrj/impl/CloudSolrClientCacheTest.java       |   3 +-
 .../cloud/TestCloudCollectionsListeners.java       | 110 ------------
 .../common/cloud/TestCollectionStateWatchers.java  |  23 ---
 .../common/cloud/TestDocCollectionWatcher.java     |  26 ---
 .../solr/cloud/AbstractFullDistribZkTestBase.java  |  15 --
 .../java/org/apache/solr/cloud/ZkTestServer.java   |   4 -
 82 files changed, 520 insertions(+), 1991 deletions(-)

diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt
index b65b558..a02bd07 100644
--- a/solr/CHANGES.txt
+++ b/solr/CHANGES.txt
@@ -74,6 +74,9 @@ Other Changes
 * SOLR-14486: Autoscaling simulation framework no longer creates /clusterstate.json (format 1),
   instead it creates individual per-collection /state.json files (format 2). (ab)
 
+ * SOLR-12823: Remove /clusterstate.json support: support for collections created with stateFormat=1,
+   as well as support for Collection API MIGRATESTATEFORMAT action and support for the legacyCloud flag (Ilan Ginzburg).
+
 ==================  8.6.0 ==================
 
 Consult the LUCENE_CHANGES.txt file for additional, low level, changes in this release.
diff --git a/solr/core/src/java/org/apache/solr/cloud/Overseer.java b/solr/core/src/java/org/apache/solr/cloud/Overseer.java
index dd01368..665e6db 100644
--- a/solr/core/src/java/org/apache/solr/cloud/Overseer.java
+++ b/solr/core/src/java/org/apache/solr/cloud/Overseer.java
@@ -35,7 +35,6 @@ import org.apache.lucene.util.Version;
 import org.apache.solr.client.solrj.SolrServerException;
 import org.apache.solr.client.solrj.cloud.SolrCloudManager;
 import org.apache.solr.client.solrj.impl.CloudSolrClient;
-import org.apache.solr.client.solrj.impl.ClusterStateProvider;
 import org.apache.solr.client.solrj.request.CollectionAdminRequest;
 import org.apache.solr.client.solrj.response.CollectionAdminResponse;
 import org.apache.solr.cloud.api.collections.CreateCollectionCmd;
@@ -304,7 +303,7 @@ public class Overseer implements SolrCloseable {
                 byte[] data = head.second();
                 final ZkNodeProps message = ZkNodeProps.load(data);
                 if (log.isDebugEnabled()) {
-                  log.debug("processMessage: queueSize: {}, message = {} current state version: {}", stateUpdateQueue.getZkStats().getQueueLength(), message, clusterState.getZkClusterStateVersion());
+                  log.debug("processMessage: queueSize: {}, message = {}", stateUpdateQueue.getZkStats().getQueueLength(), message);
                 }
 
                 processedNodes.add(head.first());
@@ -461,8 +460,6 @@ public class Overseer implements SolrCloseable {
           case MODIFYCOLLECTION:
             CollectionsHandler.verifyRuleParams(zkController.getCoreContainer() ,message.getProperties());
             return Collections.singletonList(new CollectionMutator(getSolrCloudManager()).modifyCollection(clusterState,message));
-          case MIGRATESTATEFORMAT:
-            return Collections.singletonList(new ClusterStateMutator(getSolrCloudManager()).migrateStateFormat(clusterState, message));
           default:
             throw new RuntimeException("unknown operation:" + operation
                 + " contents:" + message.getProperties());
@@ -1034,16 +1031,6 @@ public class Overseer implements SolrCloseable {
     }
   }
   
-  public static boolean isLegacy(ZkStateReader stateReader) {
-    String legacyProperty = stateReader.getClusterProperty(ZkStateReader.LEGACY_CLOUD, "false");
-    return "true".equals(legacyProperty);
-  }
-
-  public static boolean isLegacy(ClusterStateProvider clusterStateProvider) {
-    String legacyProperty = clusterStateProvider.getClusterProperty(ZkStateReader.LEGACY_CLOUD, "false");
-    return "true".equals(legacyProperty);
-  }
-
   public ZkStateReader getZkStateReader() {
     return reader;
   }
diff --git a/solr/core/src/java/org/apache/solr/cloud/ZkController.java b/solr/core/src/java/org/apache/solr/cloud/ZkController.java
index 2cd376c..3aa04a8 100644
--- a/solr/core/src/java/org/apache/solr/cloud/ZkController.java
+++ b/solr/core/src/java/org/apache/solr/cloud/ZkController.java
@@ -29,6 +29,7 @@ import java.nio.charset.StandardCharsets;
 import java.nio.file.Path;
 import java.nio.file.Paths;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Enumeration;
@@ -470,6 +471,8 @@ public class ZkController implements Closeable {
         return cc.isShutDown();
       }});
 
+    // Refuse to start if ZK has a non empty /clusterstate.json
+    checkNoOldClusterstate(zkClient);
 
     this.overseerRunningMap = Overseer.getRunningMap(zkClient);
     this.overseerCompletedMap = Overseer.getCompletedMap(zkClient);
@@ -491,6 +494,41 @@ public class ZkController implements Closeable {
     assert ObjectReleaseTracker.track(this);
   }
 
+  /**
+   * <p>Verifies if /clusterstate.json exists in Zookeepeer, and if it does and is not empty, refuses to start and outputs
+   * a helpful message regarding collection migration.</p>
+   *
+   * <p>If /clusterstate.json exists and is empty, it is removed.</p>
+   */
+  private void checkNoOldClusterstate(final SolrZkClient zkClient) throws InterruptedException {
+    try {
+      if (!zkClient.exists(ZkStateReader.UNSUPPORTED_CLUSTER_STATE, true)) {
+        return;
+      }
+
+      final byte[] data = zkClient.getData(ZkStateReader.UNSUPPORTED_CLUSTER_STATE, null, null, true);
+
+      if (Arrays.equals("{}".getBytes(StandardCharsets.UTF_8), data)) {
+        // Empty json. This log will only occur once.
+        log.warn("{} no longer supported starting with Solr 9. Found empty file on Zookeeper, deleting it.", ZkStateReader.UNSUPPORTED_CLUSTER_STATE);
+        zkClient.delete(ZkStateReader.UNSUPPORTED_CLUSTER_STATE, -1, true);
+      } else {
+        // /clusterstate.json not empty: refuse to start but do not automatically delete. A bit of a pain but user shouldn't
+        // have older collections at this stage anyway.
+        String message = ZkStateReader.UNSUPPORTED_CLUSTER_STATE + " no longer supported starting with Solr 9. "
+            + "It is present and not empty. Cannot start Solr. Please first migrate collections to stateFormat=2 using an "
+            + "older version of Solr or if you don't care about the data then delete the file from "
+            + "Zookeeper using a command line tool, for example: bin/solr zk rm /clusterstate.json -z host:port";
+        log.error(message);
+        throw new SolrException(SolrException.ErrorCode.INVALID_STATE, message);
+      }
+    } catch (KeeperException e) {
+      // Convert checked exception to one acceptable by the caller (see also init() further down)
+      log.error("", e);
+      throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR, "", e);
+    }
+  }
+
   public int getLeaderVoteWait() {
     return leaderVoteWait;
   }
@@ -863,7 +901,6 @@ public class ZkController implements Closeable {
     cmdExecutor.ensureExists(ZkStateReader.SOLR_AUTOSCALING_NODE_ADDED_PATH, zkClient);
     cmdExecutor.ensureExists(ZkStateReader.SOLR_AUTOSCALING_NODE_LOST_PATH, zkClient);
     byte[] emptyJson = "{}".getBytes(StandardCharsets.UTF_8);
-    cmdExecutor.ensureExists(ZkStateReader.CLUSTER_STATE, emptyJson, CreateMode.PERSISTENT, zkClient);
     cmdExecutor.ensureExists(ZkStateReader.SOLR_SECURITY_CONF_PATH, emptyJson, CreateMode.PERSISTENT, zkClient);
     cmdExecutor.ensureExists(ZkStateReader.SOLR_AUTOSCALING_CONF_PATH, emptyJson, CreateMode.PERSISTENT, zkClient);
     bootstrapDefaultConfigSet(zkClient);
@@ -1190,8 +1227,8 @@ public class ZkController implements Closeable {
 
       // check replica's existence in clusterstate first
       try {
-        zkStateReader.waitForState(collection, Overseer.isLegacy(zkStateReader) ? 60000 : 100,
-            TimeUnit.MILLISECONDS, (collectionState) -> getReplicaOrNull(collectionState, shardId, coreZkNodeName) != null);
+        zkStateReader.waitForState(collection, 100, TimeUnit.MILLISECONDS,
+            (collectionState) -> getReplicaOrNull(collectionState, shardId, coreZkNodeName) != null);
       } catch (TimeoutException e) {
         throw new SolrException(ErrorCode.SERVER_ERROR, "Error registering SolrCore, timeout waiting for replica present in clusterstate");
       }
@@ -1568,9 +1605,7 @@ public class ZkController implements Closeable {
       props.put(ZkStateReader.SHARD_ID_PROP, cd.getCloudDescriptor().getShardId());
       props.put(ZkStateReader.COLLECTION_PROP, collection);
       props.put(ZkStateReader.REPLICA_TYPE, cd.getCloudDescriptor().getReplicaType().toString());
-      if (!Overseer.isLegacy(zkStateReader)) {
-        props.put(ZkStateReader.FORCE_SET_STATE_PROP, "false");
-      }
+      props.put(ZkStateReader.FORCE_SET_STATE_PROP, "false");
       if (numShards != null) {
         props.put(ZkStateReader.NUM_SHARDS_PROP, numShards.toString());
       }
@@ -1814,69 +1849,54 @@ public class ZkController implements Closeable {
 
   /**
    * On startup, the node already published all of its replicas as DOWN,
-   * so in case of legacyCloud=false ( the replica must already present on Zk )
    * we can skip publish the replica as down
    * @return Should publish the replica as down on startup
    */
   private boolean isPublishAsDownOnStartup(CloudDescriptor cloudDesc) {
-    if (!Overseer.isLegacy(zkStateReader)) {
       Replica replica = zkStateReader.getClusterState().getCollection(cloudDesc.getCollectionName())
           .getSlice(cloudDesc.getShardId())
           .getReplica(cloudDesc.getCoreNodeName());
-      if (replica.getNodeName().equals(getNodeName())) {
-        return false;
-      }
-    }
-    return true;
+      return !replica.getNodeName().equals(getNodeName());
   }
 
   private void checkStateInZk(CoreDescriptor cd) throws InterruptedException, NotInClusterStateException {
-    if (!Overseer.isLegacy(zkStateReader)) {
-      CloudDescriptor cloudDesc = cd.getCloudDescriptor();
-      String nodeName = cloudDesc.getCoreNodeName();
-      if (nodeName == null) {
-        if (cc.repairCoreProperty(cd, CoreDescriptor.CORE_NODE_NAME) == false) {
-          throw new SolrException(ErrorCode.SERVER_ERROR, "No coreNodeName for " + cd);
-        }
-        nodeName = cloudDesc.getCoreNodeName();
-        // verify that the repair worked.
-        if (nodeName == null) {
-          throw new SolrException(ErrorCode.SERVER_ERROR, "No coreNodeName for " + cd);
-        }
-      }
-      final String coreNodeName = nodeName;
+    CloudDescriptor cloudDesc = cd.getCloudDescriptor();
+    String nodeName = cloudDesc.getCoreNodeName();
+    if (nodeName == null) {
+      throw new SolrException(ErrorCode.SERVER_ERROR, "No coreNodeName for " + cd);
+    }
+    final String coreNodeName = nodeName;
 
-      if (cloudDesc.getShardId() == null) {
-        throw new SolrException(ErrorCode.SERVER_ERROR, "No shard id for " + cd);
-      }
+    if (cloudDesc.getShardId() == null) {
+      throw new SolrException(ErrorCode.SERVER_ERROR, "No shard id for " + cd);
+    }
 
-      AtomicReference<String> errorMessage = new AtomicReference<>();
-      AtomicReference<DocCollection> collectionState = new AtomicReference<>();
-      try {
-        zkStateReader.waitForState(cd.getCollectionName(), 10, TimeUnit.SECONDS, (c) -> {
-          collectionState.set(c);
-          if (c == null)
-            return false;
-          Slice slice = c.getSlice(cloudDesc.getShardId());
-          if (slice == null) {
-            errorMessage.set("Invalid shard: " + cloudDesc.getShardId());
-            return false;
-          }
-          Replica replica = slice.getReplica(coreNodeName);
-          if (replica == null) {
-            errorMessage.set("coreNodeName " + coreNodeName + " does not exist in shard " + cloudDesc.getShardId() +
-                ", ignore the exception if the replica was deleted");
-            return false;
-          }
-          return true;
-        });
-      } catch (TimeoutException e) {
-        String error = errorMessage.get();
-        if (error == null)
-          error = "coreNodeName " + coreNodeName + " does not exist in shard " + cloudDesc.getShardId() +
-              ", ignore the exception if the replica was deleted";
-        throw new NotInClusterStateException(ErrorCode.SERVER_ERROR, error);
-      }
+    AtomicReference<String> errorMessage = new AtomicReference<>();
+    AtomicReference<DocCollection> collectionState = new AtomicReference<>();
+    try {
+      zkStateReader.waitForState(cd.getCollectionName(), 10, TimeUnit.SECONDS, (c) -> {
+        collectionState.set(c);
+        if (c == null)
+          return false;
+        Slice slice = c.getSlice(cloudDesc.getShardId());
+        if (slice == null) {
+          errorMessage.set("Invalid shard: " + cloudDesc.getShardId());
+          return false;
+        }
+        Replica replica = slice.getReplica(coreNodeName);
+        if (replica == null) {
+          errorMessage.set("coreNodeName " + coreNodeName + " does not exist in shard " + cloudDesc.getShardId() +
+              ", ignore the exception if the replica was deleted");
+          return false;
+        }
+        return true;
+      });
+    } catch (TimeoutException e) {
+      String error = errorMessage.get();
+      if (error == null)
+        error = "coreNodeName " + coreNodeName + " does not exist in shard " + cloudDesc.getShardId() +
+            ", ignore the exception if the replica was deleted";
+      throw new NotInClusterStateException(ErrorCode.SERVER_ERROR, error);
     }
   }
 
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/AddReplicaCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/AddReplicaCmd.java
index 02d9fd7..95fffa4 100644
--- a/solr/core/src/java/org/apache/solr/cloud/api/collections/AddReplicaCmd.java
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/AddReplicaCmd.java
@@ -242,29 +242,27 @@ public class AddReplicaCmd implements OverseerCollectionMessageHandler.Cmd {
     ModifiableSolrParams params = new ModifiableSolrParams();
 
     ZkStateReader zkStateReader = ocmh.zkStateReader;
-    if (!Overseer.isLegacy(zkStateReader)) {
-      if (!skipCreateReplicaInClusterState) {
-        ZkNodeProps props = new ZkNodeProps(
-            Overseer.QUEUE_OPERATION, ADDREPLICA.toLower(),
-            ZkStateReader.COLLECTION_PROP, collectionName,
-            ZkStateReader.SHARD_ID_PROP, createReplica.sliceName,
-            ZkStateReader.CORE_NAME_PROP, createReplica.coreName,
-            ZkStateReader.STATE_PROP, Replica.State.DOWN.toString(),
-            ZkStateReader.BASE_URL_PROP, zkStateReader.getBaseUrlForNodeName(createReplica.node),
-            ZkStateReader.NODE_NAME_PROP, createReplica.node,
-            ZkStateReader.REPLICA_TYPE, createReplica.replicaType.name());
-        if (createReplica.coreNodeName != null) {
-          props = props.plus(ZkStateReader.CORE_NODE_NAME_PROP, createReplica.coreNodeName);
-        }
-        try {
-          ocmh.overseer.offerStateUpdate(Utils.toJSON(props));
-        } catch (Exception e) {
-          throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Exception updating Overseer state queue", e);
-        }
+    if (!skipCreateReplicaInClusterState) {
+      ZkNodeProps props = new ZkNodeProps(
+          Overseer.QUEUE_OPERATION, ADDREPLICA.toLower(),
+          ZkStateReader.COLLECTION_PROP, collectionName,
+          ZkStateReader.SHARD_ID_PROP, createReplica.sliceName,
+          ZkStateReader.CORE_NAME_PROP, createReplica.coreName,
+          ZkStateReader.STATE_PROP, Replica.State.DOWN.toString(),
+          ZkStateReader.BASE_URL_PROP, zkStateReader.getBaseUrlForNodeName(createReplica.node),
+          ZkStateReader.NODE_NAME_PROP, createReplica.node,
+          ZkStateReader.REPLICA_TYPE, createReplica.replicaType.name());
+      if (createReplica.coreNodeName != null) {
+        props = props.plus(ZkStateReader.CORE_NODE_NAME_PROP, createReplica.coreNodeName);
+      }
+      try {
+        ocmh.overseer.offerStateUpdate(Utils.toJSON(props));
+      } catch (Exception e) {
+        throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Exception updating Overseer state queue", e);
       }
-      params.set(CoreAdminParams.CORE_NODE_NAME,
-          ocmh.waitToSeeReplicasInState(collectionName, Collections.singletonList(createReplica.coreName)).get(createReplica.coreName).getName());
     }
+    params.set(CoreAdminParams.CORE_NODE_NAME,
+        ocmh.waitToSeeReplicasInState(collectionName, Collections.singletonList(createReplica.coreName)).get(createReplica.coreName).getName());
 
     String configName = zkStateReader.readConfigName(collectionName);
     String routeKey = message.getStr(ShardParams._ROUTE_);
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/BackupCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/BackupCmd.java
index e873669..68565f8 100644
--- a/solr/core/src/java/org/apache/solr/cloud/api/collections/BackupCmd.java
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/BackupCmd.java
@@ -113,8 +113,8 @@ public class BackupCmd implements OverseerCollectionMessageHandler.Cmd {
     String configName = ocmh.zkStateReader.readConfigName(collectionName);
     backupMgr.downloadConfigDir(location, backupName, configName);
 
-    //Save the collection's state. Can be part of the monolithic clusterstate.json or a individual state.json
-    //Since we don't want to distinguish we extract the state and back it up as a separate json
+    //Save the collection's state (coming from the collection's state.json)
+    //We extract the state and back it up as a separate json
     DocCollection collectionState = ocmh.zkStateReader.getClusterState().getCollection(collectionName);
     backupMgr.writeCollectionState(location, backupName, collectionName, collectionState);
 
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateCollectionCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateCollectionCmd.java
index 4f00253..6498c8b 100644
--- a/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateCollectionCmd.java
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateCollectionCmd.java
@@ -155,9 +155,8 @@ public class CreateCollectionCmd implements OverseerCollectionMessageHandler.Cmd
       final String async = message.getStr(ASYNC);
 
       ZkStateReader zkStateReader = ocmh.zkStateReader;
-      boolean isLegacyCloud = Overseer.isLegacy(zkStateReader);
 
-      OverseerCollectionMessageHandler.createConfNode(stateManager, configName, collectionName, isLegacyCloud);
+      OverseerCollectionMessageHandler.createConfNode(stateManager, configName, collectionName);
 
       Map<String,String> collectionParams = new HashMap<>();
       Map<String,Object> collectionProps = message.getProperties();
@@ -237,21 +236,19 @@ public class CreateCollectionCmd implements OverseerCollectionMessageHandler.Cmd
         }
 
         String baseUrl = zkStateReader.getBaseUrlForNodeName(nodeName);
-        //in the new mode, create the replica in clusterstate prior to creating the core.
+        // create the replica in the collection's state.json in ZK prior to creating the core.
         // Otherwise the core creation fails
-        if (!isLegacyCloud) {
-          ZkNodeProps props = new ZkNodeProps(
-              Overseer.QUEUE_OPERATION, ADDREPLICA.toString(),
-              ZkStateReader.COLLECTION_PROP, collectionName,
-              ZkStateReader.SHARD_ID_PROP, replicaPosition.shard,
-              ZkStateReader.CORE_NAME_PROP, coreName,
-              ZkStateReader.STATE_PROP, Replica.State.DOWN.toString(),
-              ZkStateReader.BASE_URL_PROP, baseUrl,
-              ZkStateReader.NODE_NAME_PROP, nodeName,
-              ZkStateReader.REPLICA_TYPE, replicaPosition.type.name(),
-              CommonAdminParams.WAIT_FOR_FINAL_STATE, Boolean.toString(waitForFinalState));
-          ocmh.overseer.offerStateUpdate(Utils.toJSON(props));
-        }
+        ZkNodeProps props = new ZkNodeProps(
+            Overseer.QUEUE_OPERATION, ADDREPLICA.toString(),
+            ZkStateReader.COLLECTION_PROP, collectionName,
+            ZkStateReader.SHARD_ID_PROP, replicaPosition.shard,
+            ZkStateReader.CORE_NAME_PROP, coreName,
+            ZkStateReader.STATE_PROP, Replica.State.DOWN.toString(),
+            ZkStateReader.BASE_URL_PROP, baseUrl,
+            ZkStateReader.NODE_NAME_PROP, nodeName,
+            ZkStateReader.REPLICA_TYPE, replicaPosition.type.name(),
+            CommonAdminParams.WAIT_FOR_FINAL_STATE, Boolean.toString(waitForFinalState));
+        ocmh.overseer.offerStateUpdate(Utils.toJSON(props));
 
         // Need to create new params for each request
         ModifiableSolrParams params = new ModifiableSolrParams();
@@ -280,21 +277,15 @@ public class CreateCollectionCmd implements OverseerCollectionMessageHandler.Cmd
         sreq.actualShards = sreq.shards;
         sreq.params = params;
 
-        if (isLegacyCloud) {
-          shardHandler.submit(sreq, sreq.shards[0], sreq.params);
-        } else {
-          coresToCreate.put(coreName, sreq);
-        }
+        coresToCreate.put(coreName, sreq);
       }
 
-      if(!isLegacyCloud) {
-        // wait for all replica entries to be created
-        Map<String, Replica> replicas = ocmh.waitToSeeReplicasInState(collectionName, coresToCreate.keySet());
-        for (Map.Entry<String, ShardRequest> e : coresToCreate.entrySet()) {
-          ShardRequest sreq = e.getValue();
-          sreq.params.set(CoreAdminParams.CORE_NODE_NAME, replicas.get(e.getKey()).getName());
-          shardHandler.submit(sreq, sreq.shards[0], sreq.params);
-        }
+      // wait for all replica entries to be created
+      Map<String, Replica> replicas = ocmh.waitToSeeReplicasInState(collectionName, coresToCreate.keySet());
+      for (Map.Entry<String, ShardRequest> e : coresToCreate.entrySet()) {
+        ShardRequest sreq = e.getValue();
+        sreq.params.set(CoreAdminParams.CORE_NODE_NAME, replicas.get(e.getKey()).getName());
+        shardHandler.submit(sreq, sreq.shards[0], sreq.params);
       }
 
       shardRequestTracker.processResponses(results, shardHandler, false, null, Collections.emptySet());
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/OverseerCollectionMessageHandler.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/OverseerCollectionMessageHandler.java
index 007fbec..07ce33d 100644
--- a/solr/core/src/java/org/apache/solr/cloud/api/collections/OverseerCollectionMessageHandler.java
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/OverseerCollectionMessageHandler.java
@@ -220,7 +220,6 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler,
         .put(MOCK_COLL_TASK, this::mockOperation)
         .put(MOCK_SHARD_TASK, this::mockOperation)
         .put(MOCK_REPLICA_TASK, this::mockOperation)
-        .put(MIGRATESTATEFORMAT, this::migrateStateFormat)
         .put(CREATESHARD, new CreateShardCmd(this))
         .put(MIGRATE, new MigrateCmd(this))
         .put(CREATE, new CreateCollectionCmd(this))
@@ -471,37 +470,6 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler,
     }
   }
 
-
-  //TODO should we not remove in the next release ?
-  @SuppressWarnings({"unchecked"})
-  private void migrateStateFormat(ClusterState state, ZkNodeProps message, @SuppressWarnings({"rawtypes"})NamedList results) throws Exception {
-    final String collectionName = message.getStr(COLLECTION_PROP);
-
-    boolean firstLoop = true;
-    // wait for a while until the state format changes
-    TimeOut timeout = new TimeOut(30, TimeUnit.SECONDS, timeSource);
-    while (! timeout.hasTimedOut()) {
-      DocCollection collection = zkStateReader.getClusterState().getCollection(collectionName);
-      if (collection == null) {
-        throw new SolrException(ErrorCode.BAD_REQUEST, "Collection: " + collectionName + " not found");
-      }
-      if (collection.getStateFormat() == 2) {
-        // Done.
-        results.add("success", new SimpleOrderedMap<>());
-        return;
-      }
-
-      if (firstLoop) {
-        // Actually queue the migration command.
-        firstLoop = false;
-        ZkNodeProps m = new ZkNodeProps(Overseer.QUEUE_OPERATION, MIGRATESTATEFORMAT.toLower(), COLLECTION_PROP, collectionName);
-        overseer.offerStateUpdate(Utils.toJSON(m));
-      }
-      timeout.sleep(100);
-    }
-    throw new SolrException(ErrorCode.SERVER_ERROR, "Could not migrate state format for collection: " + collectionName);
-  }
-
   @SuppressWarnings({"unchecked"})
   void commit(@SuppressWarnings({"rawtypes"})NamedList results, String slice, Replica parentShardLeader) {
     log.debug("Calling soft commit to make sub shard updates visible");
@@ -632,8 +600,7 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler,
     if(configName != null) {
       validateConfigOrThrowSolrException(configName);
 
-      boolean isLegacyCloud =  Overseer.isLegacy(zkStateReader);
-      createConfNode(cloudManager.getDistribStateManager(), configName, collectionName, isLegacyCloud);
+      createConfNode(cloudManager.getDistribStateManager(), configName, collectionName);
       reloadCollection(null, new ZkNodeProps(NAME, collectionName), results);
     }
 
@@ -729,7 +696,7 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler,
    * This doesn't validate the config (path) itself and is just responsible for creating the confNode.
    * That check should be done before the config node is created.
    */
-  public static void createConfNode(DistribStateManager stateManager, String configName, String coll, boolean isLegacyCloud) throws IOException, AlreadyExistsException, BadVersionException, KeeperException, InterruptedException {
+  public static void createConfNode(DistribStateManager stateManager, String configName, String coll) throws IOException, AlreadyExistsException, BadVersionException, KeeperException, InterruptedException {
 
     if (configName != null) {
       String collDir = ZkStateReader.COLLECTIONS_ZKNODE + "/" + coll;
@@ -741,11 +708,7 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler,
         stateManager.makePath(collDir, data, CreateMode.PERSISTENT, false);
       }
     } else {
-      if(isLegacyCloud){
-        log.warn("Could not obtain config name");
-      } else {
-        throw new SolrException(ErrorCode.BAD_REQUEST,"Unable to get config name");
-      }
+      throw new SolrException(ErrorCode.BAD_REQUEST,"Unable to get config name");
     }
   }
 
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/ReindexCollectionCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/ReindexCollectionCmd.java
index c0fc491..8eaf8f8 100644
--- a/solr/core/src/java/org/apache/solr/cloud/api/collections/ReindexCollectionCmd.java
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/ReindexCollectionCmd.java
@@ -322,7 +322,6 @@ public class ReindexCollectionCmd implements OverseerCollectionMessageHandler.Cm
 
       propMap.put(ZkStateReader.MAX_SHARDS_PER_NODE, maxShardsPerNode);
       propMap.put(CommonAdminParams.WAIT_FOR_FINAL_STATE, true);
-      propMap.put(DocCollection.STATE_FORMAT, message.getInt(DocCollection.STATE_FORMAT, coll.getStateFormat()));
       if (rf != null) {
         propMap.put(ZkStateReader.REPLICATION_FACTOR, rf);
       }
@@ -348,7 +347,6 @@ public class ReindexCollectionCmd implements OverseerCollectionMessageHandler.Cm
           CommonParams.NAME, chkCollection,
           ZkStateReader.NUM_SHARDS_PROP, "1",
           ZkStateReader.REPLICATION_FACTOR, "1",
-          DocCollection.STATE_FORMAT, "2",
           CollectionAdminParams.COLL_CONF, "_default",
           CommonAdminParams.WAIT_FOR_FINAL_STATE, "true"
       );
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/RestoreCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/RestoreCmd.java
index f314ebb..c7b5aa1 100644
--- a/solr/core/src/java/org/apache/solr/cloud/api/collections/RestoreCmd.java
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/RestoreCmd.java
@@ -64,7 +64,6 @@ import org.apache.solr.handler.component.ShardHandler;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import static org.apache.solr.common.cloud.DocCollection.STATE_FORMAT;
 import static org.apache.solr.common.cloud.ZkStateReader.COLLECTION_PROP;
 import static org.apache.solr.common.cloud.ZkStateReader.MAX_SHARDS_PER_NODE;
 import static org.apache.solr.common.cloud.ZkStateReader.NRT_REPLICAS;
@@ -108,6 +107,14 @@ public class RestoreCmd implements OverseerCollectionMessageHandler.Cmd {
 
     Properties properties = backupMgr.readBackupProperties(location, backupName);
     String backupCollection = properties.getProperty(BackupManager.COLLECTION_NAME_PROP);
+
+    // Test if the collection is of stateFormat 1 (i.e. not 2) supported pre Solr 9, in which case can't restore it.
+    Object format = properties.get("stateFormat");
+    if (format != null && !"2".equals(format)) {
+      throw new SolrException(ErrorCode.BAD_REQUEST, "Collection " + backupCollection + " is in stateFormat=" + format +
+          " no longer supported in Solr 9 and above. It can't be restored. If it originates in Solr 8 you can restore" +
+          " it there, migrate it to stateFormat=2 and backup again, it will then be restorable on Solr 9");
+    }
     String backupCollectionAlias = properties.getProperty(BackupManager.COLLECTION_ALIAS_PROP);
     DocCollection backupCollectionState = backupMgr.readCollectionState(location, backupName, backupCollection);
 
@@ -161,9 +168,6 @@ public class RestoreCmd implements OverseerCollectionMessageHandler.Cmd {
       Map<String, Object> propMap = new HashMap<>();
       propMap.put(Overseer.QUEUE_OPERATION, CREATE.toString());
       propMap.put("fromApi", "true"); // mostly true.  Prevents autoCreated=true in the collection state.
-      if (properties.get(STATE_FORMAT) == null) {
-        propMap.put(STATE_FORMAT, "2");
-      }
       propMap.put(REPLICATION_FACTOR, numNrtReplicas);
       propMap.put(NRT_REPLICAS, numNrtReplicas);
       propMap.put(TLOG_REPLICAS, numTlogReplicas);
diff --git a/solr/core/src/java/org/apache/solr/cloud/autoscaling/sim/SimClusterStateProvider.java b/solr/core/src/java/org/apache/solr/cloud/autoscaling/sim/SimClusterStateProvider.java
index 7e5343d..6943f2c 100644
--- a/solr/core/src/java/org/apache/solr/cloud/autoscaling/sim/SimClusterStateProvider.java
+++ b/solr/core/src/java/org/apache/solr/cloud/autoscaling/sim/SimClusterStateProvider.java
@@ -233,7 +233,7 @@ public class SimClusterStateProvider implements ClusterStateProvider {
           Map<String, Object> routerProp = (Map<String, Object>) collProps.getOrDefault(DocCollection.DOC_ROUTER, Collections.singletonMap("name", DocRouter.DEFAULT_NAME));
           DocRouter router = DocRouter.getDocRouter((String)routerProp.getOrDefault("name", DocRouter.DEFAULT_NAME));
           String path = ZkStateReader.getCollectionPath(name);
-          coll = new DocCollection(name, slices, collProps, router, zkVersion + 1, path);
+          coll = new DocCollection(name, slices, collProps, router, zkVersion + 1);
           try {
             SimDistribStateManager stateManager = cloudManager.getSimDistribStateManager();
             byte[] data = Utils.toJSON(Collections.singletonMap(name, coll));
@@ -2510,7 +2510,7 @@ public class SimClusterStateProvider implements ClusterStateProvider {
       lock.lockInterruptibly();
       try {
         Map<String, DocCollection> states = getCollectionStates();
-        ClusterState state = new ClusterState(0, liveNodes.get(), states);
+        ClusterState state = new ClusterState(liveNodes.get(), states);
         return state;
       } finally {
         lock.unlock();
diff --git a/solr/core/src/java/org/apache/solr/cloud/autoscaling/sim/SnapshotClusterStateProvider.java b/solr/core/src/java/org/apache/solr/cloud/autoscaling/sim/SnapshotClusterStateProvider.java
index e011b4c..af385c1 100644
--- a/solr/core/src/java/org/apache/solr/cloud/autoscaling/sim/SnapshotClusterStateProvider.java
+++ b/solr/core/src/java/org/apache/solr/cloud/autoscaling/sim/SnapshotClusterStateProvider.java
@@ -30,7 +30,6 @@ import java.util.Set;
 import org.apache.solr.client.solrj.impl.ClusterStateProvider;
 import org.apache.solr.common.cloud.ClusterState;
 import org.apache.solr.common.cloud.DocCollection;
-import org.apache.solr.common.cloud.ZkStateReader;
 import org.apache.solr.common.util.Utils;
 import org.noggit.CharArr;
 import org.noggit.JSONWriter;
@@ -47,7 +46,7 @@ public class SnapshotClusterStateProvider implements ClusterStateProvider {
   public SnapshotClusterStateProvider(ClusterStateProvider other) throws Exception {
     liveNodes = Set.copyOf(other.getLiveNodes());
     ClusterState otherState = other.getClusterState();
-    clusterState = new ClusterState(otherState.getZNodeVersion(), liveNodes, otherState.getCollectionsMap());
+    clusterState = new ClusterState(liveNodes, otherState.getCollectionsMap());
     clusterProperties = new HashMap<>(other.getClusterProperties());
   }
 
@@ -69,15 +68,13 @@ public class SnapshotClusterStateProvider implements ClusterStateProvider {
         collMap = mutableState;
         mutableState = Collections.singletonMap(name, state);
       }
-      Integer version = Integer.parseInt(String.valueOf(collMap.getOrDefault("zNodeVersion", stateVersion)));
-      String path = String.valueOf(collMap.getOrDefault("zNode", ZkStateReader.getCollectionPath(name)));
+      int version = Integer.parseInt(String.valueOf(collMap.getOrDefault("zNodeVersion", stateVersion)));
       collMap.remove("zNodeVersion");
-      collMap.remove("zNode");
       byte[] data = Utils.toJSON(mutableState);
-      ClusterState collState = ClusterState.load(version, data, Collections.emptySet(), path);
+      ClusterState collState = ClusterState.createFromJson(version, data, Collections.emptySet());
       collectionStates.put(name, collState.getCollection(name));
     });
-    clusterState = new ClusterState(stateVersion, liveNodes, collectionStates);
+    clusterState = new ClusterState(liveNodes, collectionStates);
   }
 
   public Map<String, Object> getSnapshot() {
@@ -97,7 +94,6 @@ public class SnapshotClusterStateProvider implements ClusterStateProvider {
         @SuppressWarnings({"unchecked"})
         Map<String, Object> collMap = new LinkedHashMap<>((Map<String, Object>)Utils.fromJSON(json.getBytes("UTF-8")));
         collMap.put("zNodeVersion", coll.getZNodeVersion());
-        collMap.put("zNode", coll.getZNode());
         // format compatible with the real /state.json, which uses a mini-ClusterState
         // consisting of a single collection
         stateMap.put(coll.getName(), Collections.singletonMap(coll.getName(), collMap));
diff --git a/solr/core/src/java/org/apache/solr/cloud/overseer/ClusterStateMutator.java b/solr/core/src/java/org/apache/solr/cloud/overseer/ClusterStateMutator.java
index 397960f..630166a 100644
--- a/solr/core/src/java/org/apache/solr/cloud/overseer/ClusterStateMutator.java
+++ b/solr/core/src/java/org/apache/solr/cloud/overseer/ClusterStateMutator.java
@@ -108,12 +108,7 @@ public class ClusterStateMutator {
       collectionProps.put("autoCreated", "true");
     }
 
-    //TODO default to 2; but need to debug why BasicDistributedZk2Test fails early on
-    String znode = message.getInt(DocCollection.STATE_FORMAT, 1) == 1 ? null
-        : ZkStateReader.getCollectionPath(cName);
-
-    DocCollection newCollection = new DocCollection(cName,
-        slices, collectionProps, router, -1, znode);
+    DocCollection newCollection = new DocCollection(cName, slices, collectionProps, router, -1);
 
     return new ZkWriteCommand(cName, newCollection);
   }
@@ -189,16 +184,5 @@ public class ClusterStateMutator {
     }
     return null;
   }
-
-  public ZkWriteCommand migrateStateFormat(ClusterState clusterState, ZkNodeProps message) {
-    final String collection = message.getStr(ZkStateReader.COLLECTION_PROP);
-    if (!CollectionMutator.checkKeyExistence(message, ZkStateReader.COLLECTION_PROP)) return ZkStateWriter.NO_OP;
-    DocCollection coll = clusterState.getCollectionOrNull(collection);
-    if (coll == null || coll.getStateFormat() == 2) return ZkStateWriter.NO_OP;
-
-    return new ZkWriteCommand(coll.getName(),
-        new DocCollection(coll.getName(), coll.getSlicesMap(), coll.getProperties(), coll.getRouter(), 0,
-            ZkStateReader.getCollectionPath(collection)));
-  }
 }
 
diff --git a/solr/core/src/java/org/apache/solr/cloud/overseer/CollectionMutator.java b/solr/core/src/java/org/apache/solr/cloud/overseer/CollectionMutator.java
index 1c2be1b..fb783c0 100644
--- a/solr/core/src/java/org/apache/solr/cloud/overseer/CollectionMutator.java
+++ b/solr/core/src/java/org/apache/solr/cloud/overseer/CollectionMutator.java
@@ -136,7 +136,7 @@ public class CollectionMutator {
     }
 
     return new ZkWriteCommand(coll.getName(),
-        new DocCollection(coll.getName(), coll.getSlicesMap(), m, coll.getRouter(), coll.getZNodeVersion(), coll.getZNode()));
+        new DocCollection(coll.getName(), coll.getSlicesMap(), m, coll.getRouter(), coll.getZNodeVersion()));
   }
 
   public static DocCollection updateSlice(String collectionName, DocCollection collection, Slice slice) {
diff --git a/solr/core/src/java/org/apache/solr/cloud/overseer/ReplicaMutator.java b/solr/core/src/java/org/apache/solr/cloud/overseer/ReplicaMutator.java
index 7891cc1..769be53 100644
--- a/solr/core/src/java/org/apache/solr/cloud/overseer/ReplicaMutator.java
+++ b/solr/core/src/java/org/apache/solr/cloud/overseer/ReplicaMutator.java
@@ -203,12 +203,26 @@ public class ReplicaMutator {
     return new ZkWriteCommand(collectionName, newCollection);
   }
 
+  /**
+   * Handles state updates
+   */
   public ZkWriteCommand setState(ClusterState clusterState, ZkNodeProps message) {
-    if (Overseer.isLegacy(cloudManager.getClusterStateProvider())) {
-      return updateState(clusterState, message);
-    } else {
-      return updateStateNew(clusterState, message);
+    String collectionName = message.getStr(ZkStateReader.COLLECTION_PROP);
+    if (!checkCollectionKeyExistence(message)) return ZkStateWriter.NO_OP;
+    String sliceName = message.getStr(ZkStateReader.SHARD_ID_PROP);
+
+    if (collectionName == null || sliceName == null) {
+      log.error("Invalid collection and slice {}", message);
+      return ZkStateWriter.NO_OP;
     }
+    DocCollection collection = clusterState.getCollectionOrNull(collectionName);
+    Slice slice = collection != null ? collection.getSlice(sliceName) : null;
+    if (slice == null) {
+      log.error("No such slice exists {}", message);
+      return ZkStateWriter.NO_OP;
+    }
+
+    return updateState(clusterState, message);
   }
 
   protected ZkWriteCommand updateState(final ClusterState prevState, ZkNodeProps message) {
@@ -355,28 +369,6 @@ public class ReplicaMutator {
     return new ZkWriteCommand(collectionName, newCollection);
   }
 
-  /**
-   * Handles non-legacy state updates
-   */
-  protected ZkWriteCommand updateStateNew(ClusterState clusterState, final ZkNodeProps message) {
-    String collectionName = message.getStr(ZkStateReader.COLLECTION_PROP);
-    if (!checkCollectionKeyExistence(message)) return ZkStateWriter.NO_OP;
-    String sliceName = message.getStr(ZkStateReader.SHARD_ID_PROP);
-
-    if (collectionName == null || sliceName == null) {
-      log.error("Invalid collection and slice {}", message);
-      return ZkStateWriter.NO_OP;
-    }
-    DocCollection collection = clusterState.getCollectionOrNull(collectionName);
-    Slice slice = collection != null ? collection.getSlice(sliceName) : null;
-    if (slice == null) {
-      log.error("No such slice exists {}", message);
-      return ZkStateWriter.NO_OP;
-    }
-
-    return updateState(clusterState, message);
-  }
-
   private DocCollection checkAndCompleteShardSplit(ClusterState prevState, DocCollection collection, String coreNodeName, String sliceName, Replica replica) {
     Slice slice = collection.getSlice(sliceName);
     Map<String, Object> sliceProps = slice.getProperties();
diff --git a/solr/core/src/java/org/apache/solr/cloud/overseer/ZkStateWriter.java b/solr/core/src/java/org/apache/solr/cloud/overseer/ZkStateWriter.java
index cb89371..155fbc2 100644
--- a/solr/core/src/java/org/apache/solr/cloud/overseer/ZkStateWriter.java
+++ b/solr/core/src/java/org/apache/solr/cloud/overseer/ZkStateWriter.java
@@ -38,9 +38,8 @@ import org.slf4j.LoggerFactory;
 import static java.util.Collections.singletonMap;
 
 /**
- * ZkStateWriter is responsible for writing updates to the cluster state stored in ZooKeeper for
- * both stateFormat=1 collection (stored in shared /clusterstate.json in ZK) and stateFormat=2 collections
- * each of which get their own individual state.json in ZK.
+ * ZkStateWriter is responsible for writing updates to the cluster state stored in ZooKeeper for collections
+ * each of which gets their own individual state.json in ZK.
  *
  * Updates to the cluster state are specified using the
  * {@link #enqueueUpdate(ClusterState, List, ZkWriteCallback)} method. The class buffers updates
@@ -67,7 +66,6 @@ public class ZkStateWriter {
   protected Map<String, DocCollection> updates = new HashMap<>();
   private int numUpdates = 0;
   protected ClusterState clusterState = null;
-  protected boolean isClusterStateModified = false;
   protected long lastUpdatedTime = 0;
 
   /**
@@ -115,14 +113,9 @@ public class ZkStateWriter {
 
     for (ZkWriteCommand cmd : cmds) {
       if (cmd == NO_OP) continue;
-      if (!isClusterStateModified && clusterStateGetModifiedWith(cmd, prevState)) {
-        isClusterStateModified = true;
-      }
       prevState = prevState.copyWith(cmd.name, cmd.collection);
-      if (cmd.collection == null || cmd.collection.getStateFormat() != 1) {
-        updates.put(cmd.name, cmd.collection);
-        numUpdates++;
-      }
+      updates.put(cmd.name, cmd.collection);
+      numUpdates++;
     }
     clusterState = prevState;
 
@@ -145,15 +138,6 @@ public class ZkStateWriter {
   }
 
   /**
-   * Check whether {@value ZkStateReader#CLUSTER_STATE} (for stateFormat = 1) get changed given command
-   */
-  private boolean clusterStateGetModifiedWith(ZkWriteCommand command, ClusterState state) {
-    DocCollection previousCollection = state.getCollectionOrNull(command.name);
-    boolean wasPreviouslyStateFormat1 = previousCollection != null && previousCollection.getStateFormat() == 1;
-    boolean isCurrentlyStateFormat1 = command.collection != null && command.collection.getStateFormat() == 1;
-    return wasPreviouslyStateFormat1 || isCurrentlyStateFormat1;
-  }
-  /**
    * Logic to decide a flush after processing a list of ZkWriteCommand
    *
    * @return true if a flush to ZK is required, false otherwise
@@ -163,7 +147,7 @@ public class ZkStateWriter {
   }
 
   public boolean hasPendingUpdates() {
-    return numUpdates != 0 || isClusterStateModified;
+    return numUpdates != 0;
   }
 
   /**
@@ -192,23 +176,21 @@ public class ZkStateWriter {
             // let's clean up the state.json of this collection only, the rest should be clean by delete collection cmd
             log.debug("going to delete state.json {}", path);
             reader.getZkClient().clean(path);
-          } else if (c.getStateFormat() > 1) {
+          } else {
             byte[] data = Utils.toJSON(singletonMap(c.getName(), c));
             if (reader.getZkClient().exists(path, true)) {
               if (log.isDebugEnabled()) {
                 log.debug("going to update_collection {} version: {}", path, c.getZNodeVersion());
               }
               Stat stat = reader.getZkClient().setData(path, data, c.getZNodeVersion(), true);
-              DocCollection newCollection = new DocCollection(name, c.getSlicesMap(), c.getProperties(), c.getRouter(), stat.getVersion(), path);
+              DocCollection newCollection = new DocCollection(name, c.getSlicesMap(), c.getProperties(), c.getRouter(), stat.getVersion());
               clusterState = clusterState.copyWith(name, newCollection);
             } else {
               log.debug("going to create_collection {}", path);
               reader.getZkClient().create(path, data, CreateMode.PERSISTENT, true);
-              DocCollection newCollection = new DocCollection(name, c.getSlicesMap(), c.getProperties(), c.getRouter(), 0, path);
+              DocCollection newCollection = new DocCollection(name, c.getSlicesMap(), c.getProperties(), c.getRouter(), 0);
               clusterState = clusterState.copyWith(name, newCollection);
             }
-          } else if (c.getStateFormat() == 1) {
-            isClusterStateModified = true;
           }
         }
 
@@ -216,15 +198,6 @@ public class ZkStateWriter {
         numUpdates = 0;
       }
 
-      if (isClusterStateModified) {
-        assert clusterState.getZkClusterStateVersion() >= 0;
-        byte[] data = Utils.toJSON(clusterState);
-        Stat stat = reader.getZkClient().setData(ZkStateReader.CLUSTER_STATE, data, clusterState.getZkClusterStateVersion(), true);
-        Map<String, DocCollection> collections = clusterState.getCollectionsMap();
-        // use the reader's live nodes because our cluster state's live nodes may be stale
-        clusterState = new ClusterState(stat.getVersion(), reader.getClusterState().getLiveNodes(), collections);
-        isClusterStateModified = false;
-      }
       lastUpdatedTime = System.nanoTime();
       success = true;
     } catch (KeeperException.BadVersionException bve) {
diff --git a/solr/core/src/java/org/apache/solr/core/CoreContainer.java b/solr/core/src/java/org/apache/solr/core/CoreContainer.java
index c011a64..dd98ac6 100644
--- a/solr/core/src/java/org/apache/solr/core/CoreContainer.java
+++ b/solr/core/src/java/org/apache/solr/core/CoreContainer.java
@@ -64,7 +64,6 @@ import org.apache.solr.client.solrj.impl.SolrHttpClientContextBuilder.Credential
 import org.apache.solr.client.solrj.io.SolrClientCache;
 import org.apache.solr.client.solrj.util.SolrIdentifierValidator;
 import org.apache.solr.cloud.CloudDescriptor;
-import org.apache.solr.cloud.Overseer;
 import org.apache.solr.cloud.OverseerTaskQueue;
 import org.apache.solr.cloud.ZkController;
 import org.apache.solr.cloud.autoscaling.AutoScalingHandler;
@@ -1204,11 +1203,8 @@ public class CoreContainer {
     boolean preExisitingZkEntry = false;
     try {
       if (getZkController() != null) {
-        if (!Overseer.isLegacy(getZkController().getZkStateReader())) {
-          if (cd.getCloudDescriptor().getCoreNodeName() == null) {
-            throw new SolrException(ErrorCode.SERVER_ERROR, "non legacy mode coreNodeName missing " + parameters.toString());
-
-          }
+        if (cd.getCloudDescriptor().getCoreNodeName() == null) {
+          throw new SolrException(ErrorCode.SERVER_ERROR, "coreNodeName missing " + parameters.toString());
         }
         preExisitingZkEntry = getZkController().checkIfCoreNodeNameAlreadyExists(cd);
       }
@@ -1990,43 +1986,6 @@ public class CoreContainer {
     return solrCores.getTransientCacheHandler();
   }
 
-
-  /**
-   * @param cd   CoreDescriptor, presumably a deficient one
-   * @param prop The property that needs to be repaired.
-   * @return true if we were able to successfuly perisist the repaired coreDescriptor, false otherwise.
-   * <p>
-   * See SOLR-11503, This can be removed when there's no chance we'll need to upgrade a
-   * Solr installation created with legacyCloud=true from 6.6.1 through 7.1
-   */
-  public boolean repairCoreProperty(CoreDescriptor cd, String prop) {
-    // So far, coreNodeName is the only property that we need to repair, this may get more complex as other properties
-    // are added.
-
-    if (CoreDescriptor.CORE_NODE_NAME.equals(prop) == false) {
-      throw new SolrException(ErrorCode.SERVER_ERROR,
-          String.format(Locale.ROOT, "The only supported property for repair is currently [%s]",
-              CoreDescriptor.CORE_NODE_NAME));
-    }
-
-    // Try to read the coreNodeName from the cluster state.
-
-    String coreName = cd.getName();
-    DocCollection coll = getZkController().getZkStateReader().getClusterState().getCollection(cd.getCollectionName());
-    for (Replica rep : coll.getReplicas()) {
-      if (coreName.equals(rep.getCoreName())) {
-        log.warn("Core properties file for node {} found with no coreNodeName, attempting to repair with value {}. See SOLR-11503. {}"
-            , "This message should only appear if upgrading from collections created Solr 6.6.1 through 7.1."
-            , rep.getCoreName(), rep.getName());
-        cd.getCloudDescriptor().setCoreNodeName(rep.getName());
-        coresLocator.persist(this, cd);
-        return true;
-      }
-    }
-    log.error("Could not repair coreNodeName in core.properties file for core {}", coreName);
-    return false;
-  }
-
   /**
    * @param solrCore the core against which we check if there has been a tragic exception
    * @return whether this Solr core has tragic exception
diff --git a/solr/core/src/java/org/apache/solr/core/backup/BackupManager.java b/solr/core/src/java/org/apache/solr/core/backup/BackupManager.java
index b15bbfe..deae360 100644
--- a/solr/core/src/java/org/apache/solr/core/backup/BackupManager.java
+++ b/solr/core/src/java/org/apache/solr/core/backup/BackupManager.java
@@ -139,7 +139,7 @@ public class BackupManager {
     try (IndexInput is = repository.openInput(zkStateDir, COLLECTION_PROPS_FILE, IOContext.DEFAULT)) {
       byte[] arr = new byte[(int) is.length()]; // probably ok since the json file should be small.
       is.readBytes(arr, 0, (int) is.length());
-      ClusterState c_state = ClusterState.load(-1, arr, Collections.emptySet());
+      ClusterState c_state = ClusterState.createFromJson(-1, arr, Collections.emptySet());
       return c_state.getCollection(collectionName);
     }
   }
diff --git a/solr/core/src/java/org/apache/solr/handler/admin/ClusterStatus.java b/solr/core/src/java/org/apache/solr/handler/admin/ClusterStatus.java
index d502dec..2265c9b 100644
--- a/solr/core/src/java/org/apache/solr/handler/admin/ClusterStatus.java
+++ b/solr/core/src/java/org/apache/solr/handler/admin/ClusterStatus.java
@@ -22,7 +22,6 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
-import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -80,10 +79,6 @@ public class ClusterStatus {
 
     ClusterState clusterState = zkStateReader.getClusterState();
 
-    // convert cluster state into a map of writable types
-    byte[] bytes = Utils.toJSON(clusterState);
-    Map<String, Object> stateMap = (Map<String,Object>) Utils.fromJSON(bytes);
-
     String routeKey = message.getStr(ShardParams._ROUTE_);
     String shard = message.getStr(ZkStateReader.SHARD_ID_PROP);
 
@@ -135,13 +130,9 @@ public class ClusterStatus {
         requestedShards.addAll(Arrays.asList(paramShards));
       }
 
-      if (clusterStateCollection.getStateFormat() > 1) {
-        bytes = Utils.toJSON(clusterStateCollection);
+        byte[] bytes = Utils.toJSON(clusterStateCollection);
         Map<String, Object> docCollection = (Map<String, Object>) Utils.fromJSON(bytes);
         collectionStatus = getCollectionStatus(docCollection, name, requestedShards);
-      } else {
-        collectionStatus = getCollectionStatus((Map<String, Object>) stateMap.get(name), name, requestedShards);
-      }
 
       collectionStatus.put("znodeVersion", clusterStateCollection.getZNodeVersion());
       if (collectionVsAliases.containsKey(name) && !collectionVsAliases.get(name).isEmpty()) {
@@ -166,8 +157,7 @@ public class ClusterStatus {
     clusterStatus.add("collections", collectionProps);
 
     // read cluster properties
-    @SuppressWarnings({"rawtypes"})
-    Map clusterProps = zkStateReader.getClusterProperties();
+    Map<String, Object> clusterProps = zkStateReader.getClusterProperties();
     if (clusterProps != null && !clusterProps.isEmpty())  {
       clusterStatus.add("properties", clusterProps);
     }
@@ -233,19 +223,17 @@ public class ClusterStatus {
 
   @SuppressWarnings("unchecked")
   protected void crossCheckReplicaStateWithLiveNodes(List<String> liveNodes, NamedList<Object> collectionProps) {
-    Iterator<Map.Entry<String,Object>> colls = collectionProps.iterator();
-    while (colls.hasNext()) {
-      Map.Entry<String,Object> next = colls.next();
-      Map<String,Object> collMap = (Map<String,Object>)next.getValue();
-      Map<String,Object> shards = (Map<String,Object>)collMap.get("shards");
+    for (Map.Entry<String, Object> next : collectionProps) {
+      Map<String, Object> collMap = (Map<String, Object>) next.getValue();
+      Map<String, Object> shards = (Map<String, Object>) collMap.get("shards");
       for (Object nextShard : shards.values()) {
-        Map<String,Object> shardMap = (Map<String,Object>)nextShard;
-        Map<String,Object> replicas = (Map<String,Object>)shardMap.get("replicas");
+        Map<String, Object> shardMap = (Map<String, Object>) nextShard;
+        Map<String, Object> replicas = (Map<String, Object>) shardMap.get("replicas");
         for (Object nextReplica : replicas.values()) {
-          Map<String,Object> replicaMap = (Map<String,Object>)nextReplica;
+          Map<String, Object> replicaMap = (Map<String, Object>) nextReplica;
           if (Replica.State.getState((String) replicaMap.get(ZkStateReader.STATE_PROP)) != Replica.State.DOWN) {
             // not down, so verify the node is live
-            String node_name = (String)replicaMap.get(ZkStateReader.NODE_NAME_PROP);
+            String node_name = (String) replicaMap.get(ZkStateReader.NODE_NAME_PROP);
             if (!liveNodes.contains(node_name)) {
               // node is not live, so this replica is actually down
               replicaMap.put(ZkStateReader.STATE_PROP, Replica.State.DOWN.toString());
@@ -255,6 +243,4 @@ public class ClusterStatus {
       }
     }
   }
-
-
 }
diff --git a/solr/core/src/java/org/apache/solr/handler/admin/ColStatus.java b/solr/core/src/java/org/apache/solr/handler/admin/ColStatus.java
index 3f9cba3..90b7625 100644
--- a/solr/core/src/java/org/apache/solr/handler/admin/ColStatus.java
+++ b/solr/core/src/java/org/apache/solr/handler/admin/ColStatus.java
@@ -101,7 +101,6 @@ public class ColStatus {
         continue;
       }
       SimpleOrderedMap<Object> colMap = new SimpleOrderedMap<>();
-      colMap.add("stateFormat", coll.getStateFormat());
       colMap.add("znodeVersion", coll.getZNodeVersion());
       Map<String, Object> props = new TreeMap<>(coll.getProperties());
       props.remove("shards");
diff --git a/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java b/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java
index 09bcfa4..f443832 100644
--- a/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java
+++ b/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java
@@ -125,7 +125,6 @@ import static org.apache.solr.common.SolrException.ErrorCode.BAD_REQUEST;
 import static org.apache.solr.common.cloud.DocCollection.DOC_ROUTER;
 import static org.apache.solr.common.cloud.DocCollection.RULE;
 import static org.apache.solr.common.cloud.DocCollection.SNITCH;
-import static org.apache.solr.common.cloud.DocCollection.STATE_FORMAT;
 import static org.apache.solr.common.cloud.ZkStateReader.AUTO_ADD_REPLICAS;
 import static org.apache.solr.common.cloud.ZkStateReader.COLLECTION_PROP;
 import static org.apache.solr.common.cloud.ZkStateReader.MAX_SHARDS_PER_NODE;
@@ -464,7 +463,6 @@ public class CollectionsHandler extends RequestHandlerBase implements Permission
           CREATE_NODE_SET,
           CREATE_NODE_SET_SHUFFLE,
           SHARDS_PROP,
-          STATE_FORMAT,
           AUTO_ADD_REPLICAS,
           RULE,
           SNITCH,
@@ -476,8 +474,6 @@ public class CollectionsHandler extends RequestHandlerBase implements Permission
           WITH_COLLECTION,
           ALIAS);
 
-      props.putIfAbsent(STATE_FORMAT, "2");
-
       if (props.get(REPLICATION_FACTOR) != null && props.get(NRT_REPLICAS) != null) {
         //TODO: Remove this in 8.0 . Keep this for SolrJ client back-compat. See SOLR-11676 for more details
         int replicationFactor = Integer.parseInt((String) props.get(REPLICATION_FACTOR));
@@ -569,7 +565,6 @@ public class CollectionsHandler extends RequestHandlerBase implements Permission
           CREATE_NODE_SET_SHUFFLE,
           AUTO_ADD_REPLICAS,
           "shards",
-          STATE_FORMAT,
           CommonParams.ROWS,
           CommonParams.Q,
           CommonParams.FL,
@@ -1067,8 +1062,6 @@ public class CollectionsHandler extends RequestHandlerBase implements Permission
       }
       return m;
     }),
-    MIGRATESTATEFORMAT_OP(MIGRATESTATEFORMAT, (req, rsp, h) -> copy(req.getParams().required(), null, COLLECTION_PROP)),
-
     BACKUP_OP(BACKUP, (req, rsp, h) -> {
       req.getParams().required().check(NAME, COLLECTION_PROP);
 
@@ -1177,7 +1170,7 @@ public class CollectionsHandler extends RequestHandlerBase implements Permission
       }
       // from CREATE_OP:
       copy(req.getParams(), params, COLL_CONF, REPLICATION_FACTOR, NRT_REPLICAS, TLOG_REPLICAS,
-          PULL_REPLICAS, MAX_SHARDS_PER_NODE, STATE_FORMAT, AUTO_ADD_REPLICAS, CREATE_NODE_SET, CREATE_NODE_SET_SHUFFLE);
+          PULL_REPLICAS, MAX_SHARDS_PER_NODE, AUTO_ADD_REPLICAS, CREATE_NODE_SET, CREATE_NODE_SET_SHUFFLE);
       copyPropertiesWithPrefix(req.getParams(), params, COLL_PROP_PREFIX);
       return params;
     }),
diff --git a/solr/core/src/java/org/apache/solr/handler/admin/ZookeeperInfoHandler.java b/solr/core/src/java/org/apache/solr/handler/admin/ZookeeperInfoHandler.java
index e39a9eb..9ee7a92 100644
--- a/solr/core/src/java/org/apache/solr/handler/admin/ZookeeperInfoHandler.java
+++ b/solr/core/src/java/org/apache/solr/handler/admin/ZookeeperInfoHandler.java
@@ -262,7 +262,7 @@ public final class ZookeeperInfoHandler extends RequestHandlerBase {
     }
 
     /**
-     * Create a merged view of all collections (internal from /clusterstate.json and external from /collections/?/state.json
+     * Create a merged view of all collections from /collections/?/state.json
      */
     private synchronized List<String> getCollections(SolrZkClient zkClient) throws KeeperException, InterruptedException {
       if (cachedCollections == null) {
@@ -283,7 +283,7 @@ public final class ZookeeperInfoHandler extends RequestHandlerBase {
     /**
      * Gets the requested page of collections after applying filters and offsets.
      */
-    public PageOfCollections fetchPage(PageOfCollections page, SolrZkClient zkClient)
+    public void fetchPage(PageOfCollections page, SolrZkClient zkClient)
         throws KeeperException, InterruptedException {
 
 
@@ -305,8 +305,6 @@ public final class ZookeeperInfoHandler extends RequestHandlerBase {
       // status until reading all status objects from ZK
       if (page.filterType != FilterType.status)
         page.selectPage(children);
-
-      return page;
     }
 
     @Override
@@ -383,7 +381,7 @@ public final class ZookeeperInfoHandler extends RequestHandlerBase {
     String dumpS = params.get("dump");
     boolean dump = dumpS != null && dumpS.equals("true");
 
-    int start = params.getInt("start", 0);
+    int start = params.getInt("start", 0); // Note start ignored if rows not specified
     int rows = params.getInt("rows", -1);
 
     String filterType = params.get("filterType");
@@ -405,12 +403,19 @@ public final class ZookeeperInfoHandler extends RequestHandlerBase {
     printer.detail = detail;
     printer.dump = dump;
     boolean isGraphView = "graph".equals(params.get("view"));
-    printer.page = (isGraphView && "/clusterstate.json".equals(path))
-        ? new PageOfCollections(start, rows, type, filter) : null;
+    // There is no znode /clusterstate.json (removed in Solr 9), but we do as if there's one and return collection listing
+    // Need to change services.js if cleaning up here, collection list is used from Admin UI Cloud - Graph
+    boolean paginateCollections = (isGraphView && "/clusterstate.json".equals(path));
+    printer.page = paginateCollections ? new PageOfCollections(start, rows, type, filter) : null;
     printer.pagingSupport = pagingSupport;
 
     try {
-      printer.print(path);
+      if (paginateCollections) {
+        // List collections and allow pagination, but no specific znode info like when looking at a normal ZK path
+        printer.printPaginatedCollections();
+      } else {
+        printer.print(path);
+      }
     } finally {
       printer.close();
     }
@@ -432,7 +437,7 @@ public final class ZookeeperInfoHandler extends RequestHandlerBase {
     String keeperAddr; // the address we're connected to
 
     final BAOS baos = new BAOS();
-    final Writer out = new OutputStreamWriter(baos,  StandardCharsets.UTF_8);
+    final Writer out = new OutputStreamWriter(baos, StandardCharsets.UTF_8);
     SolrZkClient zkClient;
 
     PageOfCollections page;
@@ -453,7 +458,7 @@ public final class ZookeeperInfoHandler extends RequestHandlerBase {
       }
     }
 
-    // main entry point
+    // main entry point for printing from path
     void print(String path) throws IOException {
       if (zkClient == null) {
         return;
@@ -501,6 +506,90 @@ public final class ZookeeperInfoHandler extends RequestHandlerBase {
       out.write(chars.toString());
     }
 
+    // main entry point for printing collections
+    @SuppressWarnings("unchecked")
+    void printPaginatedCollections() throws IOException {
+      SortedMap<String, Object> collectionStates;
+      try {
+        // support paging of the collections graph view (in case there are many collections)
+        // fetch the requested page of collections and then retrieve the state for each
+        pagingSupport.fetchPage(page, zkClient);
+        // keep track of how many collections match the filter
+        boolean applyStatusFilter = (page.filterType == FilterType.status && page.filter != null);
+        List<String> matchesStatusFilter = applyStatusFilter ? new ArrayList<String>() : null;
+        Set<String> liveNodes = applyStatusFilter ?
+            zkController.getZkStateReader().getClusterState().getLiveNodes() : null;
+
+        collectionStates = new TreeMap<>(pagingSupport);
+        for (String collection : page.selected) {
+          // Get collection state from ZK
+          String collStatePath = String.format(Locale.ROOT, "/collections/%s/state.json", collection);
+          String childDataStr = null;
+          try {
+            byte[] childData = zkClient.getData(collStatePath, null, null, true);
+            if (childData != null)
+              childDataStr = (new BytesRef(childData)).utf8ToString();
+          } catch (KeeperException.NoNodeException nne) {
+            log.warn("State for collection {} not found.", collection);
+          } catch (Exception childErr) {
+            log.error("Failed to get {} due to", collStatePath, childErr);
+          }
+
+          if (childDataStr != null) {
+            Map<String, Object> extColl = (Map<String, Object>) Utils.fromJSONString(childDataStr);
+            Object collectionState = extColl.get(collection);
+
+            if (applyStatusFilter) {
+              // verify this collection matches the filtered state
+              if (page.matchesStatusFilter((Map<String, Object>) collectionState, liveNodes)) {
+                matchesStatusFilter.add(collection);
+                collectionStates.put(collection, collectionState);
+              }
+            } else {
+              collectionStates.put(collection, collectionState);
+            }
+          }
+        }
+
+        if (applyStatusFilter) {
+          // update the paged navigation info after applying the status filter
+          page.selectPage(matchesStatusFilter);
+
+          // rebuild the Map of state data
+          SortedMap<String, Object> map = new TreeMap<String, Object>(pagingSupport);
+          for (String next : page.selected)
+            map.put(next, collectionStates.get(next));
+          collectionStates = map;
+        }
+      } catch (KeeperException | InterruptedException e) {
+        writeError(500, e.toString());
+        return;
+      }
+
+      CharArr chars = new CharArr();
+      JSONWriter json = new JSONWriter(chars, 2);
+      json.startObject();
+
+      json.writeString("znode");
+      json.writeNameSeparator();
+      json.startObject();
+
+      // For some reason, without this the Json is badly formed
+      writeKeyValue(json, PATH, "Undefined", true);
+
+      if (collectionStates != null) {
+        CharArr collectionOut = new CharArr();
+        new JSONWriter(collectionOut, 2).write(collectionStates);
+        writeKeyValue(json, "data", collectionOut.toString(), false);
+      }
+
+      writeKeyValue(json, "paging", page.getPagingHeader(), false);
+
+      json.endObject();
+      json.endObject();
+      out.write(chars.toString());
+    }
+
     void writeError(int code, String msg) throws IOException {
       throw new SolrException(ErrorCode.getErrorCode(code), msg);
       /*response.setStatus(code);
@@ -522,7 +611,6 @@ public final class ZookeeperInfoHandler extends RequestHandlerBase {
       out.write(chars.toString());*/
     }
 
-
     boolean printTree(JSONWriter json, String path) throws IOException {
       String label = path;
       if (!fullpath) {
@@ -624,7 +712,6 @@ public final class ZookeeperInfoHandler extends RequestHandlerBase {
       json.write(v);
     }
 
-    @SuppressWarnings("unchecked")
     boolean printZnode(JSONWriter json, String path) throws IOException {
       try {
         String dataStr = null;
@@ -639,95 +726,6 @@ public final class ZookeeperInfoHandler extends RequestHandlerBase {
             dataStrErr = "data is not parsable as a utf8 String: " + e.toString();
           }
         }
-        // support paging of the collections graph view (in case there are many collections)
-        if (page != null) {
-          // we've already pulled the data for /clusterstate.json from ZooKeeper above,
-          // but it needs to be parsed into a map so we can lookup collection states before
-          // trying to find them in the /collections/?/state.json znode
-          Map<String, Object> clusterstateJsonMap = null;
-          if (dataStr != null) {
-            try {
-              clusterstateJsonMap = (Map<String, Object>) Utils.fromJSONString(dataStr);
-            } catch (Exception e) {
-              throw new SolrException(ErrorCode.SERVER_ERROR,
-                  "Failed to parse /clusterstate.json from ZooKeeper due to: " + e, e);
-            }
-          } else {
-            clusterstateJsonMap = Utils.makeMap();
-          }
-
-          // fetch the requested page of collections and then retrieve the state for each 
-          page = pagingSupport.fetchPage(page, zkClient);
-          // keep track of how many collections match the filter
-          boolean applyStatusFilter =
-              (page.filterType == FilterType.status && page.filter != null);
-          List<String> matchesStatusFilter = applyStatusFilter ? new ArrayList<String>() : null;
-          Set<String> liveNodes = applyStatusFilter ?
-              zkController.getZkStateReader().getClusterState().getLiveNodes() : null;
-
-          SortedMap<String, Object> collectionStates = new TreeMap<String, Object>(pagingSupport);
-          for (String collection : page.selected) {
-            Object collectionState = clusterstateJsonMap.get(collection);
-            if (collectionState != null) {
-              // collection state was in /clusterstate.json
-              if (applyStatusFilter) {
-                // verify this collection matches the status filter
-                if (page.matchesStatusFilter((Map<String, Object>) collectionState, liveNodes)) {
-                  matchesStatusFilter.add(collection);
-                  collectionStates.put(collection, collectionState);
-                }
-              } else {
-                collectionStates.put(collection, collectionState);
-              }
-            } else {
-              // looks like an external collection ...
-              String collStatePath = String.format(Locale.ROOT, "/collections/%s/state.json", collection);
-              String childDataStr = null;
-              try {
-                byte[] childData = zkClient.getData(collStatePath, null, null, true);
-                if (childData != null)
-                  childDataStr = (new BytesRef(childData)).utf8ToString();
-              } catch (KeeperException.NoNodeException nne) {
-                log.warn("State for collection {} not found in /clusterstate.json or /collections/{}/state.json!"
-                    , collection, collection);
-              } catch (Exception childErr) {
-                log.error("Failed to get {} due to", collStatePath, childErr);
-              }
-
-              if (childDataStr != null) {
-                Map<String, Object> extColl = (Map<String, Object>) Utils.fromJSONString(childDataStr);
-                collectionState = extColl.get(collection);
-
-                if (applyStatusFilter) {
-                  // verify this collection matches the filtered state
-                  if (page.matchesStatusFilter((Map<String, Object>) collectionState, liveNodes)) {
-                    matchesStatusFilter.add(collection);
-                    collectionStates.put(collection, collectionState);
-                  }
-                } else {
-                  collectionStates.put(collection, collectionState);
-                }
-              }
-            }
-          }
-
-          if (applyStatusFilter) {
-            // update the paged navigation info after applying the status filter
-            page.selectPage(matchesStatusFilter);
-
-            // rebuild the Map of state data
-            SortedMap<String, Object> map = new TreeMap<String, Object>(pagingSupport);
-            for (String next : page.selected)
-              map.put(next, collectionStates.get(next));
-            collectionStates = map;
-          }
-
-          if (collectionStates != null) {
-            CharArr out = new CharArr();
-            new JSONWriter(out, 2).write(collectionStates);
-            dataStr = out.toString();
-          }
-        }
 
         json.writeString("znode");
         json.writeNameSeparator();
diff --git a/solr/core/src/java/org/apache/solr/util/SolrCLI.java b/solr/core/src/java/org/apache/solr/util/SolrCLI.java
index 25a53fc..7260ec1 100755
--- a/solr/core/src/java/org/apache/solr/util/SolrCLI.java
+++ b/solr/core/src/java/org/apache/solr/util/SolrCLI.java
@@ -1082,7 +1082,6 @@ public class SolrCLI implements CLIO {
       Map<String, Object> results = new LinkedHashMap<>();
       if (withClusterState) {
         Map<String, Object> map = new LinkedHashMap<>();
-        map.put("znodeVersion", clusterState.getZNodeVersion());
         map.put("liveNodes", new TreeSet<>(clusterState.getLiveNodes()));
         map.put("collections", clusterState.getCollectionsMap());
         results.put("CLUSTERSTATE", map);
diff --git a/solr/core/src/test/org/apache/solr/cloud/BasicZkTest.java b/solr/core/src/test/org/apache/solr/cloud/BasicZkTest.java
deleted file mode 100644
index d3fec26..0000000
--- a/solr/core/src/test/org/apache/solr/cloud/BasicZkTest.java
+++ /dev/null
@@ -1,181 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.solr.cloud;
-
-import java.util.Map;
-
-import com.codahale.metrics.Gauge;
-import com.codahale.metrics.Metric;
-import org.apache.lucene.util.LuceneTestCase.Slow;
-import org.apache.solr.common.SolrException;
-import org.apache.solr.common.params.CommonParams;
-import org.apache.solr.common.params.ModifiableSolrParams;
-import org.apache.solr.core.SolrCore;
-import org.apache.solr.request.LocalSolrQueryRequest;
-import org.apache.solr.request.SolrQueryRequest;
-import org.junit.BeforeClass;
-import org.junit.Test;
-
-/**
- * This test is not fully functional - the port registered is illegal - 
- * so you cannot hit this with http - a nice side benifit is that it will
- * detect if a node is trying to do an update to itself with http - it shouldn't
- * do that.
- */
-@Slow
-public class BasicZkTest extends AbstractZkTestCase {
-  
-  @BeforeClass
-  public static void beforeClass() {
-
-  }
-  
-  @Test
-  public void testBasic() throws Exception {
-    
-    // test using ZooKeeper
-    assertTrue("Not using ZooKeeper", h.getCoreContainer().isZooKeeperAware());
-    
-    // for the really slow/busy computer, we wait to make sure we have a leader before starting
-    h.getCoreContainer().getZkController().getZkStateReader().getLeaderUrl("collection1", "shard1", 30000);
-    
-    ZkController zkController = h.getCoreContainer().getZkController();
-
-    SolrCore core = h.getCore();
-
-    // test that we got the expected config, not just hardcoded defaults
-    assertNotNull(core.getRequestHandler("/mock"));
-
-    lrf.args.put(CommonParams.VERSION, "2.2");
-    assertQ("test query on empty index", request("qlkciyopsbgzyvkylsjhchghjrdf"),
-        "//result[@numFound='0']");
-
-    // test escaping of ";"
-    assertU("deleting 42 for no reason at all", delI("42"));
-    assertU("adding doc#42", adoc("id", "42", "val_s", "aa;bb"));
-    assertU("does commit work?", commit());
-
-    assertQ("backslash escaping semicolon", request("id:42 AND val_s:aa\\;bb"),
-        "//*[@numFound='1']", "//str[@name='id'][.='42']");
-
-    assertQ("quote escaping semicolon", request("id:42 AND val_s:\"aa;bb\""),
-        "//*[@numFound='1']", "//str[@name='id'][.='42']");
-
-    assertQ("no escaping semicolon", request("id:42 AND val_s:aa"),
-        "//*[@numFound='0']");
-
-    assertU(delI("42"));
-    assertU(commit());
-    assertQ(request("id:42"), "//*[@numFound='0']");
-
-    // test overwrite default of true
-
-    assertU(adoc("id", "42", "val_s", "AAA"));
-    assertU(adoc("id", "42", "val_s", "BBB"));
-    assertU(commit());
-    assertQ(request("id:42"), "//*[@numFound='1']", "//str[.='BBB']");
-    assertU(adoc("id", "42", "val_s", "CCC"));
-    assertU(adoc("id", "42", "val_s", "DDD"));
-    assertU(commit());
-    assertQ(request("id:42"), "//*[@numFound='1']", "//str[.='DDD']");
-
-    // test deletes
-    String[] adds = new String[] { add(doc("id", "101"), "overwrite", "true"),
-        add(doc("id", "101"), "overwrite", "true"),
-        add(doc("id", "105"), "overwrite", "false"),
-        add(doc("id", "102"), "overwrite", "true"),
-        add(doc("id", "103"), "overwrite", "false"),
-        add(doc("id", "101"), "overwrite", "true"), };
-    for (String a : adds) {
-      assertU(a, a);
-    }
-    assertU(commit());
-    int zkPort = zkServer.getPort();
-
-    zkServer.shutdown();
-
-    // document indexing shouldn't stop immediately after a ZK disconnect
-    assertU(adoc("id", "201"));
-
-    Thread.sleep(300);
-    
-    // try a reconnect from disconnect
-    zkServer = new ZkTestServer(zkDir, zkPort);
-    zkServer.run(false);
-    
-    Thread.sleep(300);
-    
-    // ensure zk still thinks node is up
-    assertTrue(
-        zkController.getClusterState().getLiveNodes().toString(),
-        zkController.getClusterState().liveNodesContain(
-            zkController.getNodeName()));
-
-    // test maxint
-    assertQ(request("q", "id:[100 TO 110]", "rows", "2147483647"),
-        "//*[@numFound='4']");
-
-    // test big limit
-    assertQ(request("q", "id:[100 TO 111]", "rows", "1147483647"),
-        "//*[@numFound='4']");
-
-    assertQ(request("id:[100 TO 110]"), "//*[@numFound='4']");
-    assertU(delI("102"));
-    assertU(commit());
-    assertQ(request("id:[100 TO 110]"), "//*[@numFound='3']");
-    assertU(delI("105"));
-    assertU(commit());
-    assertQ(request("id:[100 TO 110]"), "//*[@numFound='2']");
-    assertU(delQ("id:[100 TO 110]"));
-    assertU(commit());
-    assertQ(request("id:[100 TO 110]"), "//*[@numFound='0']");
-
-
-
-    // SOLR-2651: test that reload still gets config files from zookeeper 
-    zkController.getZkClient().setData("/configs/conf1/solrconfig.xml", new byte[0], true);
- 
-    // we set the solrconfig to nothing, so this reload should fail
-    SolrException e = expectThrows(SolrException.class,
-        "The reloaded SolrCore did not pick up configs from zookeeper",
-        () -> {
-      ignoreException("solrconfig.xml");
-      h.getCoreContainer().reload(h.getCore().getName());
-    });
-    resetExceptionIgnores();
-    assertTrue(e.getMessage().contains("Unable to reload core [collection1]"));
-    assertTrue(e.getCause().getMessage().contains("Error loading solr config from solrconfig.xml"));
-    
-    // test stats call
-    Map<String, Metric> metrics = h.getCore().getCoreMetricManager().getRegistry().getMetrics();
-    assertEquals("collection1", ((Gauge)metrics.get("CORE.coreName")).getValue());
-    assertEquals("collection1", ((Gauge)metrics.get("CORE.collection")).getValue());
-    assertEquals("shard1", ((Gauge)metrics.get("CORE.shard")).getValue());
-    assertTrue(metrics.get("CORE.refCount") != null);
-
-    //zkController.getZkClient().printLayoutToStdOut();
-  }
-  
-  public SolrQueryRequest request(String... q) {
-    LocalSolrQueryRequest req = lrf.makeRequest(q);
-    ModifiableSolrParams params = new ModifiableSolrParams();
-    params.add(req.getParams());
-    params.set("distrib", false);
-    req.setParams(params);
-    return req;
-  }
-}
diff --git a/solr/core/src/test/org/apache/solr/cloud/ClusterStateMockUtil.java b/solr/core/src/test/org/apache/solr/cloud/ClusterStateMockUtil.java
index f41d80a..87629d2 100644
--- a/solr/core/src/test/org/apache/solr/cloud/ClusterStateMockUtil.java
+++ b/solr/core/src/test/org/apache/solr/cloud/ClusterStateMockUtil.java
@@ -184,7 +184,7 @@ public class ClusterStateMockUtil {
       }
     }
 
-    ClusterState clusterState = new ClusterState(1, new HashSet<>(Arrays.asList(liveNodes)), collectionStates);
+    ClusterState clusterState = new ClusterState(new HashSet<>(Arrays.asList(liveNodes)), collectionStates);
     MockZkStateReader reader = new MockZkStateReader(clusterState, collectionStates.keySet());
 
     String json;
diff --git a/solr/core/src/test/org/apache/solr/cloud/ClusterStateTest.java b/solr/core/src/test/org/apache/solr/cloud/ClusterStateTest.java
index 5606c5b..f6e74da 100644
--- a/solr/core/src/test/org/apache/solr/cloud/ClusterStateTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/ClusterStateTest.java
@@ -56,10 +56,10 @@ public class ClusterStateTest extends SolrTestCaseJ4 {
     collectionStates.put("collection1", new DocCollection("collection1", slices, null, DocRouter.DEFAULT));
     collectionStates.put("collection2", new DocCollection("collection2", slices, null, DocRouter.DEFAULT));
 
-    ClusterState clusterState = new ClusterState(-1,liveNodes, collectionStates);
+    ClusterState clusterState = new ClusterState(liveNodes, collectionStates);
     byte[] bytes = Utils.toJSON(clusterState);
     // System.out.println("#################### " + new String(bytes));
-    ClusterState loadedClusterState = ClusterState.load(-1, bytes, liveNodes);
+    ClusterState loadedClusterState = ClusterState.createFromJson(-1, bytes, liveNodes);
     
     assertEquals("Provided liveNodes not used properly", 2, loadedClusterState
         .getLiveNodes().size());
@@ -67,13 +67,13 @@ public class ClusterStateTest extends SolrTestCaseJ4 {
     assertEquals("Properties not copied properly", replica.getStr("prop1"), loadedClusterState.getCollection("collection1").getSlice("shard1").getReplicasMap().get("node1").getStr("prop1"));
     assertEquals("Properties not copied properly", replica.getStr("prop2"), loadedClusterState.getCollection("collection1").getSlice("shard1").getReplicasMap().get("node1").getStr("prop2"));
 
-    loadedClusterState = ClusterState.load(-1, new byte[0], liveNodes);
+    loadedClusterState = ClusterState.createFromJson(-1, new byte[0], liveNodes);
     
     assertEquals("Provided liveNodes not used properly", 2, loadedClusterState
         .getLiveNodes().size());
     assertEquals("Should not have collections", 0, loadedClusterState.getCollectionsMap().size());
 
-    loadedClusterState = ClusterState.load(-1, (byte[])null, liveNodes);
+    loadedClusterState = ClusterState.createFromJson(-1, (byte[])null, liveNodes);
     
     assertEquals("Provided liveNodes not used properly", 2, loadedClusterState
         .getLiveNodes().size());
diff --git a/solr/core/src/test/org/apache/solr/cloud/CollectionPropsTest.java b/solr/core/src/test/org/apache/solr/cloud/CollectionPropsTest.java
index 050ac34..1ffa4d7 100644
--- a/solr/core/src/test/org/apache/solr/cloud/CollectionPropsTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/CollectionPropsTest.java
@@ -50,11 +50,7 @@ public class CollectionPropsTest extends SolrCloudTestCase {
 
   @BeforeClass
   public static void setupClass() throws Exception {
-    Boolean useLegacyCloud = rarely();
-    log.info("Using legacyCloud?: {}", useLegacyCloud);
-
     configureCluster(4)
-        .withProperty(ZkStateReader.LEGACY_CLOUD, String.valueOf(useLegacyCloud))
         .addConfig("conf", configset("cloud-minimal"))
         .configure();
   }
diff --git a/solr/core/src/test/org/apache/solr/cloud/CollectionStateFormat2Test.java b/solr/core/src/test/org/apache/solr/cloud/CollectionStateZnodeTest.java
similarity index 89%
rename from solr/core/src/test/org/apache/solr/cloud/CollectionStateFormat2Test.java
rename to solr/core/src/test/org/apache/solr/cloud/CollectionStateZnodeTest.java
index 04da1f5..6033e1e 100644
--- a/solr/core/src/test/org/apache/solr/cloud/CollectionStateFormat2Test.java
+++ b/solr/core/src/test/org/apache/solr/cloud/CollectionStateZnodeTest.java
@@ -24,7 +24,7 @@ import org.junit.After;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
-public class CollectionStateFormat2Test extends SolrCloudTestCase {
+public class CollectionStateZnodeTest extends SolrCloudTestCase {
 
   @BeforeClass
   public static void setupCluster() throws Exception {
@@ -48,7 +48,7 @@ public class CollectionStateFormat2Test extends SolrCloudTestCase {
     cluster.waitForActiveCollection(collectionName, 2, 4);
     
     waitForState("Collection not created", collectionName, (n, c) -> DocCollection.isFullyActive(n, c, 2, 2));
-    assertTrue("State Format 2 collection path does not exist",
+    assertTrue("Collection path does not exist",
         zkClient().exists(ZkStateReader.getCollectionPath(collectionName), true));
 
     Stat stat = new Stat();
@@ -57,13 +57,12 @@ public class CollectionStateFormat2Test extends SolrCloudTestCase {
     DocCollection c = getCollectionState(collectionName);
 
     assertEquals("DocCollection version should equal the znode version", stat.getVersion(), c.getZNodeVersion() );
-    assertTrue("DocCollection#getStateFormat() must be > 1", c.getStateFormat() > 1);
 
     // remove collection
     CollectionAdminRequest.deleteCollection(collectionName).process(cluster.getSolrClient());
     waitForState("Collection not deleted", collectionName, (n, coll) -> coll == null);
 
-    assertFalse("collection state should not exist externally",
+    assertFalse("collection state should not exist",
         zkClient().exists(ZkStateReader.getCollectionPath(collectionName), true));
 
   }
diff --git a/solr/core/src/test/org/apache/solr/cloud/CollectionsAPISolrJTest.java b/solr/core/src/test/org/apache/solr/cloud/CollectionsAPISolrJTest.java
index 4db1152..a4d62fe 100644
--- a/solr/core/src/test/org/apache/solr/cloud/CollectionsAPISolrJTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/CollectionsAPISolrJTest.java
@@ -93,10 +93,6 @@ public class CollectionsAPISolrJTest extends SolrCloudTestCase {
     
     // clear any persisted auto scaling configuration
     zkClient().setData(SOLR_AUTOSCALING_CONF_PATH, Utils.toJSON(new ZkNodeProps()), true);
-    
-    final ClusterProperties props = new ClusterProperties(zkClient());
-    CollectionAdminRequest.setClusterProperty(ZkStateReader.LEGACY_CLOUD, null).process(cluster.getSolrClient());
-    assertEquals("Cluster property was not unset", props.getClusterProperty(ZkStateReader.LEGACY_CLOUD, null), null);
   }
   
   @After
@@ -306,7 +302,6 @@ public class CollectionsAPISolrJTest extends SolrCloudTestCase {
   public void testCreateAndDeleteCollection() throws Exception {
     String collectionName = "solrj_test";
     CollectionAdminResponse response = CollectionAdminRequest.createCollection(collectionName, "conf", 2, 2)
-        .setStateFormat(1)
         .process(cluster.getSolrClient());
 
     assertEquals(0, response.getStatus());
@@ -328,24 +323,21 @@ public class CollectionsAPISolrJTest extends SolrCloudTestCase {
 
     waitForState("Expected " + collectionName + " to disappear from cluster state", collectionName, (n, c) -> c == null);
 
-    // Test Creating a collection with new stateformat.
-    collectionName = "solrj_newstateformat";
+    // Test Creating a new collection.
+    collectionName = "solrj_test2";
 
     response = CollectionAdminRequest.createCollection(collectionName, "conf", 2, 2)
-        .setStateFormat(2)
         .process(cluster.getSolrClient());
     assertEquals(0, response.getStatus());
     assertTrue(response.isSuccess());
 
     waitForState("Expected " + collectionName + " to appear in cluster state", collectionName, (n, c) -> c != null);
-
   }
 
   @Test
   public void testCloudInfoInCoreStatus() throws IOException, SolrServerException {
     String collectionName = "corestatus_test";
     CollectionAdminResponse response = CollectionAdminRequest.createCollection(collectionName, "conf", 2, 2)
-        .setStateFormat(1)
         .process(cluster.getSolrClient());
 
     assertEquals(0, response.getStatus());
@@ -558,21 +550,21 @@ public class CollectionsAPISolrJTest extends SolrCloudTestCase {
     // sanity check our expected default
     final ClusterProperties props = new ClusterProperties(zkClient());
     assertEquals("Expecting prop to default to unset, test needs upated",
-                 props.getClusterProperty(ZkStateReader.LEGACY_CLOUD, null), null);
+                 props.getClusterProperty(ZkStateReader.AUTO_ADD_REPLICAS, null), null);
     
-    CollectionAdminResponse response = CollectionAdminRequest.setClusterProperty(ZkStateReader.LEGACY_CLOUD, "true")
+    CollectionAdminResponse response = CollectionAdminRequest.setClusterProperty(ZkStateReader.AUTO_ADD_REPLICAS, "true")
       .process(cluster.getSolrClient());
     assertEquals(0, response.getStatus());
-    assertEquals("Cluster property was not set", props.getClusterProperty(ZkStateReader.LEGACY_CLOUD, null), "true");
+    assertEquals("Cluster property was not set", props.getClusterProperty(ZkStateReader.AUTO_ADD_REPLICAS, null), "true");
 
     // Unset ClusterProp that we set.
-    CollectionAdminRequest.setClusterProperty(ZkStateReader.LEGACY_CLOUD, null).process(cluster.getSolrClient());
-    assertEquals("Cluster property was not unset", props.getClusterProperty(ZkStateReader.LEGACY_CLOUD, null), null);
+    CollectionAdminRequest.setClusterProperty(ZkStateReader.AUTO_ADD_REPLICAS, null).process(cluster.getSolrClient());
+    assertEquals("Cluster property was not unset", props.getClusterProperty(ZkStateReader.AUTO_ADD_REPLICAS, null), null);
 
-    response = CollectionAdminRequest.setClusterProperty(ZkStateReader.LEGACY_CLOUD, "false")
+    response = CollectionAdminRequest.setClusterProperty(ZkStateReader.AUTO_ADD_REPLICAS, "false")
         .process(cluster.getSolrClient());
     assertEquals(0, response.getStatus());
-    assertEquals("Cluster property was not set", props.getClusterProperty(ZkStateReader.LEGACY_CLOUD, null), "false");
+    assertEquals("Cluster property was not set", props.getClusterProperty(ZkStateReader.AUTO_ADD_REPLICAS, null), "false");
   }
 
   @Test
diff --git a/solr/core/src/test/org/apache/solr/cloud/CreateRoutedAliasTest.java b/solr/core/src/test/org/apache/solr/cloud/CreateRoutedAliasTest.java
index 9833e90..afb13b2 100644
--- a/solr/core/src/test/org/apache/solr/cloud/CreateRoutedAliasTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/CreateRoutedAliasTest.java
@@ -153,7 +153,6 @@ public class CreateRoutedAliasTest extends SolrCloudTestCase {
     assertEquals(1, coll.getNumTlogReplicas().intValue()); // per-shard
     assertEquals(1, coll.getNumPullReplicas().intValue()); // per-shard
     assertEquals(4, coll.getMaxShardsPerNode());
-    //TODO SOLR-11877 assertEquals(2, coll.getStateFormat());
     assertTrue("nodeSet didn't work?",
         coll.getSlices().stream().flatMap(s -> s.getReplicas().stream())
             .map(Replica::getNodeName).allMatch(createNode::equals));
@@ -200,7 +199,6 @@ public class CreateRoutedAliasTest extends SolrCloudTestCase {
     assertEquals("foo_s", ((Map)coll.get("router")).get("field"));
     assertEquals(1, coll.getSlices().size()); // numShards
     assertEquals(2, coll.getReplicationFactor().intValue()); // num replicas
-    //TODO SOLR-11877 assertEquals(2, coll.getStateFormat());
 
     // Test Alias metadata
     Aliases aliases = cluster.getSolrClient().getZkStateReader().getAliases();
diff --git a/solr/core/src/test/org/apache/solr/cloud/DeleteInactiveReplicaTest.java b/solr/core/src/test/org/apache/solr/cloud/DeleteInactiveReplicaTest.java
index 0c945e6..0edc7be 100644
--- a/solr/core/src/test/org/apache/solr/cloud/DeleteInactiveReplicaTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/DeleteInactiveReplicaTest.java
@@ -27,7 +27,6 @@ import org.apache.solr.client.solrj.request.CoreAdminRequest;
 import org.apache.solr.common.cloud.DocCollection;
 import org.apache.solr.common.cloud.Replica;
 import org.apache.solr.common.cloud.Slice;
-import org.apache.solr.common.cloud.ZkStateReader;
 import org.apache.solr.common.util.TimeSource;
 import org.apache.solr.core.CoreDescriptor;
 import org.apache.solr.core.SolrCore;
@@ -46,7 +45,6 @@ public class DeleteInactiveReplicaTest extends SolrCloudTestCase {
   public static void setupCluster() throws Exception {
     configureCluster(4)
         .addConfig("conf", configset("cloud-minimal"))
-        .withProperty(ZkStateReader.LEGACY_CLOUD, "false")
         .configure();
   }
 
diff --git a/solr/core/src/test/org/apache/solr/cloud/DeleteReplicaTest.java b/solr/core/src/test/org/apache/solr/cloud/DeleteReplicaTest.java
index df36112..8340458 100644
--- a/solr/core/src/test/org/apache/solr/cloud/DeleteReplicaTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/DeleteReplicaTest.java
@@ -211,19 +211,7 @@ public class DeleteReplicaTest extends SolrCloudTestCase {
 
   @Test
   public void deleteReplicaFromClusterState() throws Exception {
-    deleteReplicaFromClusterState("false");
-    CollectionAdminRequest.setClusterProperty(ZkStateReader.LEGACY_CLOUD, null).process(cluster.getSolrClient());
-  }
-  
-  @Test
-  public void deleteReplicaFromClusterStateLegacy() throws Exception {
-    deleteReplicaFromClusterState("true"); 
-    CollectionAdminRequest.setClusterProperty(ZkStateReader.LEGACY_CLOUD, null).process(cluster.getSolrClient());
-  }
-
-  private void deleteReplicaFromClusterState(String legacyCloud) throws Exception {
-    CollectionAdminRequest.setClusterProperty(ZkStateReader.LEGACY_CLOUD, legacyCloud).process(cluster.getSolrClient());
-    final String collectionName = "deleteFromClusterState_"+legacyCloud;
+    final String collectionName = "deleteFromClusterStateCollection";
     CollectionAdminRequest.createCollection(collectionName, "conf", 1, 3)
         .process(cluster.getSolrClient());
     
@@ -237,7 +225,7 @@ public class DeleteReplicaTest extends SolrCloudTestCase {
 
     Slice shard = getCollectionState(collectionName).getSlice("shard1");
 
-    // don't choose the leader to shutdown, it just complicates things unneccessarily
+    // don't choose the leader to shutdown, it just complicates things unnecessarily
     Replica replica = getRandomReplica(shard, (r) ->
                                        ( r.getState() == Replica.State.ACTIVE &&
                                          ! r.equals(shard.getLeader())));
@@ -283,23 +271,7 @@ public class DeleteReplicaTest extends SolrCloudTestCase {
   @Slow
   // commented out on: 17-Feb-2019   @BadApple(bugUrl="https://issues.apache.org/jira/browse/SOLR-12028") // annotated on: 24-Dec-2018
   public void raceConditionOnDeleteAndRegisterReplica() throws Exception {
-    raceConditionOnDeleteAndRegisterReplica("false");
-    CollectionAdminRequest.setClusterProperty(ZkStateReader.LEGACY_CLOUD, null).process(cluster.getSolrClient());
-  }
-  
-  @Test
-  @Slow
-  // commented out on: 17-Feb-2019   @BadApple(bugUrl="https://issues.apache.org/jira/browse/SOLR-12028") // annotated on: 24-Dec-2018
-  public void raceConditionOnDeleteAndRegisterReplicaLegacy() throws Exception {
-    raceConditionOnDeleteAndRegisterReplica("true");
-    CollectionAdminRequest.setClusterProperty(ZkStateReader.LEGACY_CLOUD, null).process(cluster.getSolrClient());
-  }
-
-  // commented out on: 17-Feb-2019   @BadApple(bugUrl="https://issues.apache.org/jira/browse/SOLR-12028") // annotated on: 24-Dec-2018
-  public void raceConditionOnDeleteAndRegisterReplica(String legacyCloud) throws Exception {
-    
-    CollectionAdminRequest.setClusterProperty(ZkStateReader.LEGACY_CLOUD, legacyCloud).process(cluster.getSolrClient());
-    final String collectionName = "raceDeleteReplica_"+legacyCloud;
+    final String collectionName = "raceDeleteReplicaCollection";
     CollectionAdminRequest.createCollection(collectionName, "conf", 1, 2)
         .process(cluster.getSolrClient());
     
diff --git a/solr/core/src/test/org/apache/solr/cloud/LegacyCloudClusterPropTest.java b/solr/core/src/test/org/apache/solr/cloud/LegacyCloudClusterPropTest.java
deleted file mode 100644
index f697204..0000000
--- a/solr/core/src/test/org/apache/solr/cloud/LegacyCloudClusterPropTest.java
+++ /dev/null
@@ -1,180 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.solr.cloud;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.InputStreamReader;
-import java.io.OutputStreamWriter;
-import java.io.Writer;
-import java.nio.charset.StandardCharsets;
-import java.nio.file.Files;
-import java.nio.file.Path;
-import java.nio.file.Paths;
-import java.util.Properties;
-
-import org.apache.solr.client.solrj.embedded.JettySolrRunner;
-import org.apache.solr.client.solrj.request.CollectionAdminRequest;
-import org.apache.solr.common.cloud.ClusterProperties;
-import org.apache.solr.common.cloud.ClusterStateUtil;
-import org.apache.solr.common.cloud.DocCollection;
-import org.apache.solr.common.cloud.Replica;
-import org.apache.solr.common.cloud.ZkStateReader;
-import org.apache.solr.core.CorePropertiesLocator;
-import org.junit.After;
-import org.junit.BeforeClass;
-import org.junit.Test;
-
-public class LegacyCloudClusterPropTest extends SolrCloudTestCase {
-
-  @BeforeClass
-  public static void setupCluster() throws Exception {
-
-    // currently this test is fine with a single shard with a single replica and it's simpler. Could easily be
-    // extended to multiple shards/replicas, but there's no particular need.
-    configureCluster(1)
-        .addConfig("conf", configset("cloud-minimal"))
-        .configure();
-  }
-  
-  @After
-  public void afterTest() throws Exception {
-    cluster.deleteAllCollections();
-  }
-
-
-  // Are all these required?
-  private static String[] requiredProps = {
-      "numShards",
-      "collection.configName",
-      "name",
-      "replicaType",
-      "shard",
-      "collection",
-      "coreNodeName"
-  };
-
-  @Test
-  //2018-06-18 (commented) @BadApple(bugUrl="https://issues.apache.org/jira/browse/SOLR-12028")
-  //Commented 14-Oct-2018 @BadApple(bugUrl="https://issues.apache.org/jira/browse/SOLR-12028") // added 17-Aug-2018
-  // commented out on: 01-Apr-2019   @BadApple(bugUrl="https://issues.apache.org/jira/browse/SOLR-12028") // annotated on: 24-Dec-2018
-  public void testCreateCollectionSwitchLegacyCloud() throws Exception {
-    createAndTest("legacyTrue", true);
-    createAndTest("legacyFalse", false);
-  }
-
-  private void createAndTest(final String coll, final boolean legacy) throws Exception {
-
-    // First, just insure that core.properties file gets created with coreNodeName and all other mandatory parameters.
-    final String legacyString = Boolean.toString(legacy);
-    final String legacyAnti = Boolean.toString(!legacy);
-    CollectionAdminRequest.setClusterProperty(ZkStateReader.LEGACY_CLOUD, legacyString).process(cluster.getSolrClient());
-    ClusterProperties props = new ClusterProperties(zkClient());
-
-    assertEquals("Value of legacyCloud cluster prop unexpected", legacyString,
-        props.getClusterProperty(ZkStateReader.LEGACY_CLOUD, legacyAnti));
-
-    CollectionAdminRequest.createCollection(coll, "conf", 1, 1)
-        .setMaxShardsPerNode(1)
-        .process(cluster.getSolrClient());
-    
-    cluster.waitForActiveCollection(coll, 1, 1);
-    
-    assertTrue(ClusterStateUtil.waitForAllActiveAndLiveReplicas(cluster.getSolrClient().getZkStateReader(), 120000));
-    
-    // Insure all mandatory properties are there.
-    checkMandatoryProps(coll);
-
-    checkCollectionActive(coll);
-    // The fixes for SOLR-11503 insure that creating a collection has coreNodeName whether legacyCloud is true or false,
-    // we still need to test repairing a properties file that does _not_ have coreNodeName set, the second part of
-    // the fix.
-
-    // First, remove the coreNodeName from cluster.properties and write it out it.
-    removePropertyFromAllReplicas(coll, "coreNodeName");
-
-    // Now restart Solr, this should repair the removal on core load no matter the value of legacyCloud
-    JettySolrRunner jetty = cluster.getJettySolrRunner(0);
-    jetty.stop();
-    
-    cluster.waitForJettyToStop(jetty);
-    
-    jetty.start();
-    
-    cluster.waitForAllNodes(30);
-    
-    checkMandatoryProps(coll);
-    checkCollectionActive(coll);
-  }
-
-  private void checkCollectionActive(String coll) {
-    assertTrue(ClusterStateUtil.waitForAllActiveAndLiveReplicas(cluster.getSolrClient().getZkStateReader(), 120000));
-    DocCollection docColl = getCollectionState(coll);
-    for (Replica rep : docColl.getReplicas()) {
-      if (rep.getState() == Replica.State.ACTIVE) return;
-    }
-    fail("Replica was not active for collection " + coll);
-  }
-  private void removePropertyFromAllReplicas(String coll, String propDel) throws IOException {
-    DocCollection docColl = getCollectionState(coll);
-
-    // First remove the property from all core.properties files
-    for (Replica rep : docColl.getReplicas()) {
-      final String coreName = rep.getCoreName();
-      Properties prop = loadPropFileForReplica(coreName);
-      prop.remove(propDel);
-      JettySolrRunner jetty = cluster.getJettySolrRunner(0);
-      Path expected = Paths.get(jetty.getSolrHome()).toAbsolutePath().resolve(coreName);
-      Path corePropFile = Paths.get(expected.toString(), CorePropertiesLocator.PROPERTIES_FILENAME);
-
-      try (Writer os = new OutputStreamWriter(Files.newOutputStream(corePropFile), StandardCharsets.UTF_8)) {
-        prop.store(os, "");
-      }
-    }
-
-    // Now insure it's really gone
-    for (Replica rep : docColl.getReplicas()) {
-      Properties prop = loadPropFileForReplica(rep.getCoreName());
-      assertEquals("Property " + propDel + " should have been deleted",
-          "bogus", prop.getProperty(propDel, "bogus"));
-    }
-  }
-
-  private Properties loadPropFileForReplica(String coreName) throws IOException {
-    JettySolrRunner jetty = cluster.getJettySolrRunner(0);
-    Path expected = Paths.get(jetty.getSolrHome()).toAbsolutePath().resolve(coreName);
-    Path corePropFile = Paths.get(expected.toString(), CorePropertiesLocator.PROPERTIES_FILENAME);
-    Properties props = new Properties();
-    try (InputStream fis = Files.newInputStream(corePropFile)) {
-      props.load(new InputStreamReader(fis, StandardCharsets.UTF_8));
-    }
-    return props;
-  }
-
-  private void checkMandatoryProps(String coll) throws IOException {
-    DocCollection docColl = getCollectionState(coll);
-    for (Replica rep : docColl.getReplicas()) {
-      Properties prop = loadPropFileForReplica(rep.getCoreName());      for (String testProp : requiredProps) {
-        String propVal = prop.getProperty(testProp, "bogus");
-        if ("bogus".equals(propVal)) {
-          fail("Should have found property " + testProp + " in properties file");
-        }
-      }
-    }
-  }
-}
diff --git a/solr/core/src/test/org/apache/solr/cloud/MigrateRouteKeyTest.java b/solr/core/src/test/org/apache/solr/cloud/MigrateRouteKeyTest.java
index d5439d1..edd23b5 100644
--- a/solr/core/src/test/org/apache/solr/cloud/MigrateRouteKeyTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/MigrateRouteKeyTest.java
@@ -50,11 +50,6 @@ public class MigrateRouteKeyTest extends SolrCloudTestCase {
     configureCluster(2)
         .addConfig("conf", configset("cloud-minimal"))
         .configure();
-
-    if (usually()) {
-      CollectionAdminRequest.setClusterProperty("legacyCloud", "false").process(cluster.getSolrClient());
-      log.info("Using legacyCloud=false for cluster");
-    }
   }
 
   private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
diff --git a/solr/core/src/test/org/apache/solr/cloud/OverseerCollectionConfigSetProcessorTest.java b/solr/core/src/test/org/apache/solr/cloud/OverseerCollectionConfigSetProcessorTest.java
index fc60b5d..6602fac 100644
--- a/solr/core/src/test/org/apache/solr/cloud/OverseerCollectionConfigSetProcessorTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/OverseerCollectionConfigSetProcessorTest.java
@@ -328,8 +328,6 @@ public class OverseerCollectionConfigSetProcessorTest extends SolrTestCaseJ4 {
       when(zkStateReaderMock.getBaseUrlForNodeName(address)).thenAnswer(invocation -> address.replaceAll("_", "/"));
     }
 
-    when(zkStateReaderMock.getClusterProperty("legacyCloud", "false")).thenReturn("false");
-
     when(solrZkClientMock.getZkClientTimeout()).thenReturn(30000);
     
     when(clusterStateMock.hasCollection(anyString())).thenAnswer(invocation -> {
diff --git a/solr/core/src/test/org/apache/solr/cloud/OverseerTest.java b/solr/core/src/test/org/apache/solr/cloud/OverseerTest.java
index 255d199..029103a 100644
--- a/solr/core/src/test/org/apache/solr/cloud/OverseerTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/OverseerTest.java
@@ -88,7 +88,6 @@ import org.apache.zookeeper.KeeperException.NodeExistsException;
 import org.apache.zookeeper.KeeperException.SessionExpiredException;
 import org.apache.zookeeper.WatchedEvent;
 import org.apache.zookeeper.Watcher.Event;
-import org.apache.zookeeper.data.Stat;
 import org.apache.zookeeper.proto.WatcherEvent;
 import org.junit.After;
 import org.junit.AfterClass;
@@ -181,16 +180,21 @@ public class OverseerTest extends SolrTestCaseJ4 {
       zkStateReader.close();
     }
 
+    /**
+     * Create a collection.
+     * Note there's a similar but slightly different {@link OverseerTest#createCollection(String, int)}.
+     */
     public void createCollection(String collection, int numShards) throws Exception {
+      // Create collection znode before having ClusterStateUpdater create state.json below it or it will fail.
+      zkClient.makePath(ZkStateReader.COLLECTIONS_ZKNODE + "/" + collection, true);
 
       ZkNodeProps m = new ZkNodeProps(Overseer.QUEUE_OPERATION, CollectionParams.CollectionAction.CREATE.toLower(),
           "name", collection,
           ZkStateReader.REPLICATION_FACTOR, "1",
-          ZkStateReader.NUM_SHARDS_PROP, numShards+"",
+          ZkStateReader.NUM_SHARDS_PROP, Integer.toString(numShards),
           "createNodeSet", "");
       ZkDistributedQueue q = MiniSolrCloudCluster.getOpenOverseer(overseers).getStateUpdateQueue();
       q.offer(Utils.toJSON(m));
-
     }
 
     public String publishState(String collection, String coreName, String coreNodeName, String shard, Replica.State stateName, int numShards, boolean startElection, Overseer overseer)
@@ -375,6 +379,23 @@ public class OverseerTest extends SolrTestCaseJ4 {
     super.tearDown();
   }
 
+  /**
+   * This method creates a collection. It is different from {@link MockZKController#createCollection(String, int)} in
+   * the way the {@link ZkDistributedQueue} is obtained.
+   */
+  private void createCollection(String collection, int numShards) throws Exception {
+    // Create collection znode before having ClusterStateUpdater create state.json below it or it will fail.
+    zkClient.makePath(ZkStateReader.COLLECTIONS_ZKNODE + "/" + collection, true);
+
+    ZkNodeProps m = new ZkNodeProps(Overseer.QUEUE_OPERATION, CollectionParams.CollectionAction.CREATE.toLower(),
+        "name", collection,
+        ZkStateReader.REPLICATION_FACTOR, "1",
+        ZkStateReader.NUM_SHARDS_PROP, Integer.toString(numShards),
+        "createNodeSet", "");
+    ZkDistributedQueue q = overseers.get(0).getStateUpdateQueue();
+    q.offer(Utils.toJSON(m));
+  }
+
   @Test
   public void testShardAssignment() throws Exception {
 
@@ -382,8 +403,6 @@ public class OverseerTest extends SolrTestCaseJ4 {
     SolrZkClient overseerClient = null;
 
     try {
-
-
       ZkController.createClusterZkNodes(zkClient);
 
       overseerClient = electNewOverseer(server.getZkAddress());
@@ -393,15 +412,9 @@ public class OverseerTest extends SolrTestCaseJ4 {
 
         mockController = new MockZKController(server.getZkAddress(), "127.0.0.1", overseers);
 
-        final int numShards = 6;
+        final int numShards = 6; // this is not the number of shards in the collection
 
-        ZkNodeProps m = new ZkNodeProps(Overseer.QUEUE_OPERATION, CollectionParams.CollectionAction.CREATE.toLower(),
-            "name", COLLECTION,
-            ZkStateReader.REPLICATION_FACTOR, "1",
-            ZkStateReader.NUM_SHARDS_PROP, "3",
-            "createNodeSet", "");
-        ZkDistributedQueue q = overseers.get(0).getStateUpdateQueue();
-        q.offer(Utils.toJSON(m));
+        createCollection(COLLECTION, 3);
 
         for (int i = 0; i < numShards; i++) {
           assertNotNull("shard got no id?", mockController.publishState(COLLECTION, "core" + (i + 1), "node" + (i + 1), "shard" + ((i % 3) + 1), Replica.State.ACTIVE, 3, true, overseers.get(0)));
@@ -579,14 +592,9 @@ public class OverseerTest extends SolrTestCaseJ4 {
 
       ZkDistributedQueue q = overseers.get(0).getStateUpdateQueue();
 
-      ZkNodeProps m = new ZkNodeProps(Overseer.QUEUE_OPERATION, CollectionParams.CollectionAction.CREATE.toLower(),
-          "name", COLLECTION,
-          ZkStateReader.REPLICATION_FACTOR, "1",
-          ZkStateReader.NUM_SHARDS_PROP, "1",
-          "createNodeSet", "");
-      q.offer(Utils.toJSON(m));
+      createCollection(COLLECTION, 1);
 
-      m = new ZkNodeProps(Overseer.QUEUE_OPERATION, OverseerAction.STATE.toLower(),
+      ZkNodeProps m = new ZkNodeProps(Overseer.QUEUE_OPERATION, OverseerAction.STATE.toLower(),
           ZkStateReader.BASE_URL_PROP, "http://127.0.0.1/solr",
           ZkStateReader.NODE_NAME_PROP, "node1",
           ZkStateReader.COLLECTION_PROP, COLLECTION,
@@ -826,31 +834,19 @@ public class OverseerTest extends SolrTestCaseJ4 {
       reader = new ZkStateReader(zkClient);
       reader.createClusterStateWatchersAndUpdate();
 
-      // We did not create /collections -> this message will cause exception when Overseer try to flush the clusterstate
+      // We did not create /collections/collection1 -> this message will cause exception when Overseer tries to flush
+      // the collection state
       ZkNodeProps badMessage = new ZkNodeProps(Overseer.QUEUE_OPERATION, CollectionParams.CollectionAction.CREATE.toLower(),
           "name", "collection1",
           ZkStateReader.REPLICATION_FACTOR, "1",
           ZkStateReader.NUM_SHARDS_PROP, "1",
-          DocCollection.STATE_FORMAT, "2",
-          "createNodeSet", "");
-      ZkNodeProps goodMessage = new ZkNodeProps(Overseer.QUEUE_OPERATION, CollectionParams.CollectionAction.CREATE.toLower(),
-          "name", "collection2",
-          ZkStateReader.REPLICATION_FACTOR, "1",
-          ZkStateReader.NUM_SHARDS_PROP, "1",
-          DocCollection.STATE_FORMAT, "1",
           "createNodeSet", "");
       ZkDistributedQueue workQueue = Overseer.getInternalWorkQueue(zkClient, new Stats());
       workQueue.offer(Utils.toJSON(badMessage));
-      workQueue.offer(Utils.toJSON(goodMessage));
       overseerClient = electNewOverseer(server.getZkAddress());
-      waitForCollections(reader, "collection2");
 
       ZkDistributedQueue q = getOpenOverseer().getStateUpdateQueue();
       q.offer(Utils.toJSON(badMessage));
-      q.offer(Utils.toJSON(goodMessage.plus("name", "collection3")));
-      waitForCollections(reader, "collection2", "collection3");
-      assertNotNull(reader.getClusterState().getCollectionOrNull("collection2"));
-      assertNotNull(reader.getClusterState().getCollectionOrNull("collection3"));
 
       TimeOut timeOut = new TimeOut(10, TimeUnit.SECONDS, TimeSource.NANO_TIME);
       while(!timeOut.hasTimedOut()) {
@@ -907,6 +903,9 @@ public class OverseerTest extends SolrTestCaseJ4 {
 
      electNewOverseer(server.getZkAddress());
 
+      // Create collection znode before repeatedly trying to enqueue the Cluster state change message
+      zkClient.makePath(ZkStateReader.COLLECTIONS_ZKNODE + "/" + COLLECTION, true);
+
       for (int i = 0; i < atLeast(4); i++) {
         killCounter.incrementAndGet(); // for each round allow 1 kill
 
@@ -915,7 +914,14 @@ public class OverseerTest extends SolrTestCaseJ4 {
         TimeOut timeout = new TimeOut(10, TimeUnit.SECONDS, TimeSource.NANO_TIME);
         while (!timeout.hasTimedOut()) {
           try {
-            mockController.createCollection(COLLECTION, 1);
+            // We must only retry the enqueue to Overseer, not the collection znode creation (that doesn't depend on Overseer)
+            ZkNodeProps m = new ZkNodeProps(Overseer.QUEUE_OPERATION, CollectionParams.CollectionAction.CREATE.toLower(),
+                "name", COLLECTION,
+                ZkStateReader.REPLICATION_FACTOR, "1",
+                ZkStateReader.NUM_SHARDS_PROP, "1",
+                "createNodeSet", "");
+            ZkDistributedQueue q = MiniSolrCloudCluster.getOpenOverseer(overseers).getStateUpdateQueue();
+            q.offer(Utils.toJSON(m));
             break;
           } catch (SolrException | KeeperException | AlreadyClosedException e) {
             e.printStackTrace();
@@ -1088,25 +1094,25 @@ public class OverseerTest extends SolrTestCaseJ4 {
     try {
 
       ZkController.createClusterZkNodes(zkClient);
+      overseerClient = electNewOverseer(server.getZkAddress());
 
       reader = new ZkStateReader(zkClient);
       reader.createClusterStateWatchersAndUpdate();
 
       mockController = new MockZKController(server.getZkAddress(), "node1", overseers);
 
-      final int MAX_COLLECTIONS = 10, MAX_CORES = 10, MAX_STATE_CHANGES = 20000, STATE_FORMAT = 2;
+      final int MAX_COLLECTIONS = 10, MAX_CORES = 10, MAX_STATE_CHANGES = 20000;
 
       for (int i=0; i<MAX_COLLECTIONS; i++)  {
+        zkClient.makePath("/collections/perf" + i, true);
         ZkNodeProps m = new ZkNodeProps(Overseer.QUEUE_OPERATION, CollectionParams.CollectionAction.CREATE.toLower(),
             "name", "perf" + i,
             ZkStateReader.NUM_SHARDS_PROP, "1",
-            "stateFormat", String.valueOf(STATE_FORMAT),
             ZkStateReader.REPLICATION_FACTOR, "1",
             ZkStateReader.MAX_SHARDS_PER_NODE, "1"
             );
         ZkDistributedQueue q = overseers.get(0).getStateUpdateQueue();
         q.offer(Utils.toJSON(m));
-        zkClient.makePath("/collections/perf" + i, true);
       }
 
       for (int i = 0, j = 0, k = 0; i < MAX_STATE_CHANGES; i++, j++, k++) {
@@ -1114,11 +1120,11 @@ public class OverseerTest extends SolrTestCaseJ4 {
             ZkStateReader.STATE_PROP, Replica.State.RECOVERING.toString(),
             ZkStateReader.NODE_NAME_PROP,  "node1",
             ZkStateReader.CORE_NAME_PROP, "core" + k,
+            ZkStateReader.SHARD_ID_PROP, "shard1",
             ZkStateReader.CORE_NODE_NAME_PROP, "node1",
             ZkStateReader.COLLECTION_PROP, "perf" + j,
             ZkStateReader.NUM_SHARDS_PROP, "1",
-            ZkStateReader.BASE_URL_PROP, "http://" +  "node1"
-            + "/solr/");
+            ZkStateReader.BASE_URL_PROP, "http://" + "node1" + "/solr/");
         ZkDistributedQueue q = overseers.get(0).getStateUpdateQueue();
         q.offer(Utils.toJSON(m));
         if (j >= MAX_COLLECTIONS - 1) j = 0;
@@ -1126,30 +1132,13 @@ public class OverseerTest extends SolrTestCaseJ4 {
         if (i > 0 && i % 100 == 0) log.info("Published {} items", i);
       }
 
-      // let's publish a sentinel collection which we'll use to wait for overseer to complete operations
-      ZkNodeProps m = new ZkNodeProps(Overseer.QUEUE_OPERATION, OverseerAction.STATE.toLower(),
-          ZkStateReader.STATE_PROP, Replica.State.ACTIVE.toString(),
-          ZkStateReader.NODE_NAME_PROP, "node1",
-          ZkStateReader.CORE_NAME_PROP, "core1",
-          ZkStateReader.CORE_NODE_NAME_PROP, "node1",
-          ZkStateReader.COLLECTION_PROP, "perf_sentinel",
-          ZkStateReader.NUM_SHARDS_PROP, "1",
-          ZkStateReader.BASE_URL_PROP, "http://" + "node1"
-          + "/solr/");
-      ZkDistributedQueue q = overseers.get(0).getStateUpdateQueue();
-      q.offer(Utils.toJSON(m));
+      // let's create a sentinel collection which we'll use to wait for overseer to complete operations
+      createCollection("perf_sentinel", 1);
 
       Timer t = new Timer();
       Timer.Context context = t.time();
-      try {
-        overseerClient = electNewOverseer(server.getZkAddress());
-        assertTrue(overseers.size() > 0);
-
-        reader.waitForState("perf_sentinel", 15000, TimeUnit.MILLISECONDS, (liveNodes, collectionState) -> collectionState != null);
-
-      } finally {
-        context.stop();
-      }
+      reader.waitForState("perf_sentinel", 15000, TimeUnit.MILLISECONDS, (liveNodes, collectionState) -> collectionState != null);
+      context.stop();
 
       log.info("Overseer loop finished processing: ");
       printTimingStats(t);
@@ -1219,6 +1208,8 @@ public class OverseerTest extends SolrTestCaseJ4 {
       //prepopulate work queue with some items to emulate previous overseer died before persisting state
       DistributedQueue queue = Overseer.getInternalWorkQueue(zkClient, new Stats());
 
+      zkClient.makePath(ZkStateReader.COLLECTIONS_ZKNODE + "/" + COLLECTION, true);
+
       ZkNodeProps m = new ZkNodeProps(Overseer.QUEUE_OPERATION, CollectionParams.CollectionAction.CREATE.toLower(),
           "name", COLLECTION,
           ZkStateReader.REPLICATION_FACTOR, "1",
@@ -1280,8 +1271,6 @@ public class OverseerTest extends SolrTestCaseJ4 {
 
       ZkController.createClusterZkNodes(zkClient);
 
-      zkClient.create("/collections/test", null, CreateMode.PERSISTENT, true);
-
       reader = new ZkStateReader(zkClient);
       reader.createClusterStateWatchersAndUpdate();
 
@@ -1289,15 +1278,9 @@ public class OverseerTest extends SolrTestCaseJ4 {
 
       ZkDistributedQueue q = overseers.get(0).getStateUpdateQueue();
 
+      createCollection("c1", 1);
 
-      ZkNodeProps m = new ZkNodeProps(Overseer.QUEUE_OPERATION, CollectionParams.CollectionAction.CREATE.toLower(),
-          "name", "c1",
-          ZkStateReader.REPLICATION_FACTOR, "1",
-          ZkStateReader.NUM_SHARDS_PROP, "1",
-          "createNodeSet", "");
-      q.offer(Utils.toJSON(m));
-
-      m = new ZkNodeProps(Overseer.QUEUE_OPERATION, OverseerAction.STATE.toLower(),
+      ZkNodeProps m = new ZkNodeProps(Overseer.QUEUE_OPERATION, OverseerAction.STATE.toLower(),
           ZkStateReader.BASE_URL_PROP, "http://127.0.0.1/solr",
           ZkStateReader.SHARD_ID_PROP, "shard1",
           ZkStateReader.NODE_NAME_PROP, "node1",
@@ -1335,28 +1318,32 @@ public class OverseerTest extends SolrTestCaseJ4 {
 
       q.offer(Utils.toJSON(m));
 
-      Stat stat = new Stat();
-      byte[] data = zkClient.getData("/clusterstate.json", null, stat, true);
-      // Simulate an external modification
-      zkClient.setData("/clusterstate.json", data, true);
-
+      final String testCollectionName = "test";
+      zkClient.makePath(ZkStateReader.COLLECTIONS_ZKNODE + "/" + testCollectionName, true);
       m = new ZkNodeProps(Overseer.QUEUE_OPERATION, CollectionParams.CollectionAction.CREATE.toLower(),
-          "name", "test",
+          "name", testCollectionName,
           ZkStateReader.NUM_SHARDS_PROP, "1",
-          ZkStateReader.REPLICATION_FACTOR, "1",
-          DocCollection.STATE_FORMAT, "2"
+          ZkStateReader.REPLICATION_FACTOR, "1"
       );
       q.offer(Utils.toJSON(m));
 
+      // Wait for the overseer to create state.json for the collection
+      waitForCollections(reader, testCollectionName);
+
+      final String path = ZkStateReader.COLLECTIONS_ZKNODE + "/" + testCollectionName + "/state.json";
+      byte[] data = zkClient.getData(path, null, null, true);
+      // Simulate an external modification of state.json
+      zkClient.setData(path, data, true);
+
       m = new ZkNodeProps(Overseer.QUEUE_OPERATION, CollectionParams.CollectionAction.CREATESHARD.toLower(),
-          "collection", "test",
+          "collection", testCollectionName,
           ZkStateReader.SHARD_ID_PROP, "x",
           ZkStateReader.REPLICATION_FACTOR, "1"
       );
       q.offer(Utils.toJSON(m));
 
       m = new ZkNodeProps(Overseer.QUEUE_OPERATION, CollectionParams.CollectionAction.ADDREPLICA.toLower(),
-          "collection", "test",
+          "collection", testCollectionName,
           ZkStateReader.SHARD_ID_PROP, "x",
           ZkStateReader.BASE_URL_PROP, "http://127.0.0.1/solr",
           ZkStateReader.CORE_NODE_NAME_PROP, "core_node1",
@@ -1366,8 +1353,9 @@ public class OverseerTest extends SolrTestCaseJ4 {
       );
       q.offer(Utils.toJSON(m));
 
-      waitForCollections(reader, "test");
-      verifyReplicaStatus(reader, "test", "x", "core_node1", Replica.State.DOWN);
+      // Verify replica creation worked ok in spite of external update of state.json (although in theory such updates
+      // do not happen unless an old overseer is still updating ZK after a new Overseer got elected...).
+      verifyReplicaStatus(reader, testCollectionName, "x", "core_node1", Replica.State.DOWN);
 
       waitForCollections(reader, "c1");
       verifyReplicaStatus(reader, "c1", "shard1", "core_node1", Replica.State.ACTIVE);
@@ -1483,6 +1471,7 @@ public class OverseerTest extends SolrTestCaseJ4 {
       // create collection
       {
         final Integer maxShardsPerNode = numReplicas * numShards;
+        zkClient.makePath(ZkStateReader.COLLECTIONS_ZKNODE + "/" + COLLECTION, true);
         ZkNodeProps m = new ZkNodeProps(Overseer.QUEUE_OPERATION, CollectionParams.CollectionAction.CREATE.toLower(),
             "name", COLLECTION,
             ZkStateReader.NUM_SHARDS_PROP, numShards.toString(),
diff --git a/solr/core/src/test/org/apache/solr/cloud/ShardRoutingCustomTest.java b/solr/core/src/test/org/apache/solr/cloud/ShardRoutingCustomTest.java
index 9a97264..2c239a1 100644
--- a/solr/core/src/test/org/apache/solr/cloud/ShardRoutingCustomTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/ShardRoutingCustomTest.java
@@ -56,8 +56,6 @@ public class ShardRoutingCustomTest extends AbstractFullDistribZkTestBase {
   private void doCustomSharding() throws Exception {
     printLayout();
 
-  
-
     File jettyDir = createTempDir("jetty").toFile();
     jettyDir.mkdirs();
     setupJettySolrHome(jettyDir);
@@ -65,7 +63,6 @@ public class ShardRoutingCustomTest extends AbstractFullDistribZkTestBase {
     j.start();
     assertEquals(0, CollectionAdminRequest
         .createCollection(DEFAULT_COLLECTION, "conf1", 1, 1)
-        .setStateFormat(Integer.parseInt(getStateFormat()))
         .setCreateNodeSet("")
         .process(cloudClient).getStatus());
     assertTrue(CollectionAdminRequest
diff --git a/solr/core/src/test/org/apache/solr/cloud/SharedFSAutoReplicaFailoverTest.java b/solr/core/src/test/org/apache/solr/cloud/SharedFSAutoReplicaFailoverTest.java
index 29ba036..2f2217b 100644
--- a/solr/core/src/test/org/apache/solr/cloud/SharedFSAutoReplicaFailoverTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/SharedFSAutoReplicaFailoverTest.java
@@ -116,11 +116,6 @@ public class SharedFSAutoReplicaFailoverTest extends AbstractFullDistribZkTestBa
   public void setUp() throws Exception {
     super.setUp();
     collectionUlogDirMap.clear();
-    if (random().nextBoolean()) {
-      CollectionAdminRequest.setClusterProperty("legacyCloud", "false").process(cloudClient);
-    } else {
-      CollectionAdminRequest.setClusterProperty("legacyCloud", "true").process(cloudClient);
-    }
   }
   
   @Override
diff --git a/solr/core/src/test/org/apache/solr/cloud/SliceStateTest.java b/solr/core/src/test/org/apache/solr/cloud/SliceStateTest.java
index cab5420..7a3d02a 100644
--- a/solr/core/src/test/org/apache/solr/cloud/SliceStateTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/SliceStateTest.java
@@ -51,9 +51,9 @@ public class SliceStateTest extends SolrTestCaseJ4 {
     slices.put("shard1", slice);
     collectionStates.put("collection1", new DocCollection("collection1", slices, null, DocRouter.DEFAULT));
 
-    ClusterState clusterState = new ClusterState(-1,liveNodes, collectionStates);
+    ClusterState clusterState = new ClusterState(liveNodes, collectionStates);
     byte[] bytes = Utils.toJSON(clusterState);
-    ClusterState loadedClusterState = ClusterState.load(-1, bytes, liveNodes);
+    ClusterState loadedClusterState = ClusterState.createFromJson(-1, bytes, liveNodes);
 
     assertSame("Default state not set to active", Slice.State.ACTIVE, loadedClusterState.getCollection("collection1").getSlice("shard1").getState());
   }
diff --git a/solr/core/src/test/org/apache/solr/cloud/TestClusterProperties.java b/solr/core/src/test/org/apache/solr/cloud/TestClusterProperties.java
index c082e37..aa272e5 100644
--- a/solr/core/src/test/org/apache/solr/cloud/TestClusterProperties.java
+++ b/solr/core/src/test/org/apache/solr/cloud/TestClusterProperties.java
@@ -41,13 +41,13 @@ public class TestClusterProperties extends SolrCloudTestCase {
 
   @Test
   public void testClusterProperties() throws Exception {
-    assertEquals("false", props.getClusterProperty(ZkStateReader.LEGACY_CLOUD, "false"));
+    assertEquals("false", props.getClusterProperty(ZkStateReader.AUTO_ADD_REPLICAS, "false"));
 
-    CollectionAdminRequest.setClusterProperty(ZkStateReader.LEGACY_CLOUD, "true").process(cluster.getSolrClient());
-    assertEquals("true", props.getClusterProperty(ZkStateReader.LEGACY_CLOUD, "false"));
+    CollectionAdminRequest.setClusterProperty(ZkStateReader.AUTO_ADD_REPLICAS, "true").process(cluster.getSolrClient());
+    assertEquals("true", props.getClusterProperty(ZkStateReader.AUTO_ADD_REPLICAS, "false"));
 
-    CollectionAdminRequest.setClusterProperty(ZkStateReader.LEGACY_CLOUD, "false").process(cluster.getSolrClient());
-    assertEquals("false", props.getClusterProperty(ZkStateReader.LEGACY_CLOUD, "true"));
+    CollectionAdminRequest.setClusterProperty(ZkStateReader.AUTO_ADD_REPLICAS, "false").process(cluster.getSolrClient());
+    assertEquals("false", props.getClusterProperty(ZkStateReader.AUTO_ADD_REPLICAS, "true"));
   }
   
   @Test
diff --git a/solr/core/src/test/org/apache/solr/cloud/TestPullReplica.java b/solr/core/src/test/org/apache/solr/cloud/TestPullReplica.java
index f0af144..33e29e9 100644
--- a/solr/core/src/test/org/apache/solr/cloud/TestPullReplica.java
+++ b/solr/core/src/test/org/apache/solr/cloud/TestPullReplica.java
@@ -89,11 +89,6 @@ public class TestPullReplica extends SolrCloudTestCase {
    configureCluster(2) // 2 + random().nextInt(3)
         .addConfig("conf", configset("cloud-minimal"))
         .configure();
-    Boolean useLegacyCloud = rarely();
-    log.info("Using legacyCloud?: {}", useLegacyCloud);
-    CollectionAdminRequest.ClusterProp clusterPropRequest = CollectionAdminRequest.setClusterProperty(ZkStateReader.LEGACY_CLOUD, String.valueOf(useLegacyCloud));
-    CollectionAdminResponse response = clusterPropRequest.process(cluster.getSolrClient());
-    assertEquals(0, response.getStatus());
   }
 
   @AfterClass
diff --git a/solr/core/src/test/org/apache/solr/cloud/TestPullReplicaErrorHandling.java b/solr/core/src/test/org/apache/solr/cloud/TestPullReplicaErrorHandling.java
index 6a79c61..4ede3ea 100644
--- a/solr/core/src/test/org/apache/solr/cloud/TestPullReplicaErrorHandling.java
+++ b/solr/core/src/test/org/apache/solr/cloud/TestPullReplicaErrorHandling.java
@@ -36,7 +36,6 @@ import org.apache.solr.client.solrj.cloud.SocketProxy;
 import org.apache.solr.client.solrj.embedded.JettySolrRunner;
 import org.apache.solr.client.solrj.impl.HttpSolrClient;
 import org.apache.solr.client.solrj.request.CollectionAdminRequest;
-import org.apache.solr.client.solrj.response.CollectionAdminResponse;
 import org.apache.solr.common.SolrException;
 import org.apache.solr.common.SolrInputDocument;
 import org.apache.solr.common.cloud.CollectionStatePredicate;
@@ -91,20 +90,6 @@ public class TestPullReplicaErrorHandling extends SolrCloudTestCase {
       proxies.put(proxy.getUrl(), proxy);
       jettys.put(proxy.getUrl(), jetty);
     }
-    TimeOut t = new TimeOut(10, TimeUnit.SECONDS, TimeSource.NANO_TIME);
-    while (true) {
-      try {
-        CollectionAdminRequest.ClusterProp clusterPropRequest = CollectionAdminRequest.setClusterProperty(ZkStateReader.LEGACY_CLOUD, "false");
-        CollectionAdminResponse response = clusterPropRequest.process(cluster.getSolrClient());
-        assertEquals(0, response.getStatus());
-        break;
-      } catch (SolrServerException e) {
-        Thread.sleep(50);
-        if (t.hasTimedOut()) {
-          throw e;
-        }
-      }
-    }
   }
   
   @AfterClass
diff --git a/solr/core/src/test/org/apache/solr/cloud/TestTlogReplica.java b/solr/core/src/test/org/apache/solr/cloud/TestTlogReplica.java
index 8f7e27b..a48e32b 100644
--- a/solr/core/src/test/org/apache/solr/cloud/TestTlogReplica.java
+++ b/solr/core/src/test/org/apache/solr/cloud/TestTlogReplica.java
@@ -92,11 +92,6 @@ public class TestTlogReplica extends SolrCloudTestCase {
     configureCluster(2) // 2 + random().nextInt(3)
         .addConfig("conf", configset("cloud-minimal-inplace-updates"))
         .configure();
-    Boolean useLegacyCloud = rarely();
-    log.info("Using legacyCloud?: {}", useLegacyCloud);
-    CollectionAdminRequest.ClusterProp clusterPropRequest = CollectionAdminRequest.setClusterProperty(ZkStateReader.LEGACY_CLOUD, String.valueOf(useLegacyCloud));
-    CollectionAdminResponse response = clusterPropRequest.process(cluster.getSolrClient());
-    assertEquals(0, response.getStatus());
   }
 
   @AfterClass
diff --git a/solr/core/src/test/org/apache/solr/cloud/TestZkChroot.java b/solr/core/src/test/org/apache/solr/cloud/TestZkChroot.java
deleted file mode 100644
index 134e332..0000000
--- a/solr/core/src/test/org/apache/solr/cloud/TestZkChroot.java
+++ /dev/null
@@ -1,153 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.solr.cloud;
-
-import java.nio.file.Path;
-import java.nio.file.Paths;
-
-import org.apache.solr.SolrJettyTestBase;
-import org.apache.solr.SolrTestCaseJ4;
-import org.apache.solr.common.cloud.SolrZkClient;
-import org.apache.solr.common.cloud.ZkConfigManager;
-import org.apache.solr.common.cloud.ZooKeeperException;
-import org.apache.solr.core.CoreContainer;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-public class TestZkChroot extends SolrTestCaseJ4 {
-  protected CoreContainer cores = null;
-  private Path home;
-  
-  protected ZkTestServer zkServer;
-  protected Path zkDir;
-  
-  @Override
-  @Before
-  public void setUp() throws Exception {
-    super.setUp();
-
-    zkDir = createTempDir("zkData");
-    zkServer = new ZkTestServer(zkDir);
-    zkServer.run();
-    home = Paths.get(SolrJettyTestBase.legacyExampleCollection1SolrHome());
-    
-  }
-  
-  @Override
-  @After
-  public void tearDown() throws Exception {
-    System.clearProperty("zkHost");
-    
-    if (cores != null) {
-      cores.shutdown();
-      cores = null;
-    }
-    
-    if (null != zkServer) {
-      zkServer.shutdown();
-      zkServer = null;
-    }
-    zkDir = null;
-    
-    super.tearDown();
-  }
-  
-  @Test
-  public void testChrootBootstrap() throws Exception {
-    String chroot = "/foo/bar";
-    
-    System.setProperty("bootstrap_conf", "true");
-    System.setProperty("zkHost", zkServer.getZkHost() + chroot);
-    SolrZkClient zkClient = null;
-    SolrZkClient zkClient2 = null;
-    
-    try {
-      cores = CoreContainer.createAndLoad(home);
-      zkClient = cores.getZkController().getZkClient();
-      
-      assertTrue(zkClient.exists("/clusterstate.json", true));
-      assertFalse(zkClient.exists(chroot + "/clusterstate.json", true));
-      
-      zkClient2 = new SolrZkClient(zkServer.getZkHost(),
-          AbstractZkTestCase.TIMEOUT);
-      assertTrue(zkClient2.exists(chroot + "/clusterstate.json", true));
-      assertFalse(zkClient2.exists("/clusterstate.json", true));
-    } finally {
-      if (zkClient != null) zkClient.close();
-      if (zkClient2 != null) zkClient2.close();
-    }
-  }
-  
-  @Test
-  public void testNoBootstrapConf() throws Exception {
-    String chroot = "/foo/bar2";
-    
-    System.setProperty("bootstrap_conf", "false");
-    System.setProperty("zkHost", zkServer.getZkHost() + chroot);
-
-    try (SolrZkClient zkClient = new SolrZkClient(zkServer.getZkHost(), AbstractZkTestCase.TIMEOUT)) {
-      expectThrows(ZooKeeperException.class,
-          "did not get a top level exception when more then 4 updates failed",
-          () -> {
-        assertFalse("Path '" + chroot + "' should not exist before the test",
-            zkClient.exists(chroot, true));
-        cores = CoreContainer.createAndLoad(home);
-      });
-      assertFalse("Path shouldn't have been created",
-          zkClient.exists(chroot, true));// check the path was not created
-    }
-  }
-  
-  @Test
-  public void testWithUploadDir() throws Exception {
-    String chroot = "/foo/bar3";
-    String configName = "testWithUploadDir";
-    
-    System.setProperty("bootstrap_conf", "false");
-    System.setProperty("bootstrap_confdir", home + "/collection1/conf");
-    System.setProperty("collection.configName", configName);
-    System.setProperty("zkHost", zkServer.getZkHost() + chroot);
-
-    try (SolrZkClient zkClient = new SolrZkClient(zkServer.getZkHost(), AbstractZkTestCase.TIMEOUT)) {
-      assertFalse("Path '" + chroot + "' should not exist before the test",
-          zkClient.exists(chroot, true));
-      cores = CoreContainer.createAndLoad(home);
-      assertTrue(
-          "solrconfig.xml should have been uploaded to zk to the correct config directory",
-          zkClient.exists(chroot + ZkConfigManager.CONFIGS_ZKNODE + "/"
-              + configName + "/solrconfig.xml", true));
-    }
-  }
-  
-  @Test
-  public void testInitPathExists() throws Exception {
-    String chroot = "/foo/bar4";
-    
-    System.setProperty("bootstrap_conf", "true");
-    System.setProperty("zkHost", zkServer.getZkHost() + chroot);
-
-    try (SolrZkClient zkClient = new SolrZkClient(zkServer.getZkHost(), AbstractZkTestCase.TIMEOUT)) {
-      zkClient.makePath("/foo/bar4", true);
-      assertTrue(zkClient.exists(chroot, true));
-      assertFalse(zkClient.exists(chroot + "/clusterstate.json", true));
-      
-      cores = CoreContainer.createAndLoad(home);
-      assertTrue(zkClient.exists(chroot + "/clusterstate.json", true));
-    }
-  }
-}
diff --git a/solr/core/src/test/org/apache/solr/cloud/ZkControllerTest.java b/solr/core/src/test/org/apache/solr/cloud/ZkControllerTest.java
index 4526ed4..603c414 100644
--- a/solr/core/src/test/org/apache/solr/cloud/ZkControllerTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/ZkControllerTest.java
@@ -28,7 +28,6 @@ import java.util.concurrent.atomic.AtomicReference;
 import org.apache.lucene.util.LuceneTestCase.Slow;
 import org.apache.solr.SolrTestCaseJ4;
 import org.apache.solr.common.cloud.ClusterProperties;
-import org.apache.solr.common.cloud.DocCollection;
 import org.apache.solr.common.cloud.SolrZkClient;
 import org.apache.solr.common.cloud.ZkConfigManager;
 import org.apache.solr.common.cloud.ZkNodeProps;
@@ -284,7 +283,7 @@ public class ZkControllerTest extends SolrTestCaseJ4 {
 
         ZkNodeProps m = new ZkNodeProps(Overseer.QUEUE_OPERATION,
             CollectionParams.CollectionAction.CREATE.toLower(), ZkStateReader.NODE_NAME_PROP, nodeName, ZkStateReader.NUM_SHARDS_PROP, "1",
-            "name", collectionName, DocCollection.STATE_FORMAT, "2");
+            "name", collectionName);
         zkController.getOverseerJobQueue().offer(Utils.toJSON(m));
 
         HashMap<String, Object> propMap = new HashMap<>();
diff --git a/solr/core/src/test/org/apache/solr/cloud/api/collections/AbstractCloudBackupRestoreTestCase.java b/solr/core/src/test/org/apache/solr/cloud/api/collections/AbstractCloudBackupRestoreTestCase.java
index e4bb328..21a362b 100644
--- a/solr/core/src/test/org/apache/solr/cloud/api/collections/AbstractCloudBackupRestoreTestCase.java
+++ b/solr/core/src/test/org/apache/solr/cloud/api/collections/AbstractCloudBackupRestoreTestCase.java
@@ -421,8 +421,6 @@ public abstract class AbstractCloudBackupRestoreTestCase extends SolrCloudTestCa
       assertEquals(restoreCollectionName, backupCollection.getMaxShardsPerNode(), restoreCollection.getMaxShardsPerNode());
     }
 
-    assertEquals("Restore collection should use stateFormat=2", 2, restoreCollection.getStateFormat());
-
     //SOLR-12605: Add more docs after restore is complete to see if they are getting added fine
     //explicitly querying the leaders. If we use CloudSolrClient there is no guarantee that we'll hit a nrtReplica
     {
diff --git a/solr/core/src/test/org/apache/solr/cloud/api/collections/CollectionsAPIAsyncDistributedZkTest.java b/solr/core/src/test/org/apache/solr/cloud/api/collections/CollectionsAPIAsyncDistributedZkTest.java
index bec55d3..c0214f8 100644
--- a/solr/core/src/test/org/apache/solr/cloud/api/collections/CollectionsAPIAsyncDistributedZkTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/api/collections/CollectionsAPIAsyncDistributedZkTest.java
@@ -40,7 +40,6 @@ import org.apache.solr.common.SolrException;
 import org.apache.solr.common.SolrInputDocument;
 import org.apache.solr.common.cloud.Replica;
 import org.apache.solr.common.cloud.Slice;
-import org.apache.solr.common.cloud.ZkStateReader;
 import org.apache.solr.common.util.ExecutorUtil;
 import org.apache.solr.common.util.SolrNamedThreadFactory;
 import org.junit.After;
@@ -99,13 +98,6 @@ public class CollectionsAPIAsyncDistributedZkTest extends SolrCloudTestCase {
 
   @Test
   public void testAsyncRequests() throws Exception {
-    boolean legacy = random().nextBoolean();
-    if (legacy) {
-      CollectionAdminRequest.setClusterProperty(ZkStateReader.LEGACY_CLOUD, "true").process(cluster.getSolrClient());
-    } else {
-      CollectionAdminRequest.setClusterProperty(ZkStateReader.LEGACY_CLOUD, "false").process(cluster.getSolrClient());
-    }
-    
     final String collection = "testAsyncOperations";
     final CloudSolrClient client = cluster.getSolrClient();
 
@@ -214,11 +206,9 @@ public class CollectionsAPIAsyncDistributedZkTest extends SolrCloudTestCase {
       .processAndWait(client, MAX_TIMEOUT_SECONDS);
     assertSame("DeleteReplica did not complete", RequestStatusState.COMPLETED, state);
 
-    if (!legacy) {
-      state = CollectionAdminRequest.deleteCollection(collection)
-          .processAndWait(client, MAX_TIMEOUT_SECONDS);
-      assertSame("DeleteCollection did not complete", RequestStatusState.COMPLETED, state);
-    }
+    state = CollectionAdminRequest.deleteCollection(collection)
+        .processAndWait(client, MAX_TIMEOUT_SECONDS);
+    assertSame("DeleteCollection did not complete", RequestStatusState.COMPLETED, state);
   }
 
   public void testAsyncIdRaceCondition() throws Exception {
diff --git a/solr/core/src/test/org/apache/solr/cloud/api/collections/ShardSplitTest.java b/solr/core/src/test/org/apache/solr/cloud/api/collections/ShardSplitTest.java
index f0e92fa..2acb10c 100644
--- a/solr/core/src/test/org/apache/solr/cloud/api/collections/ShardSplitTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/api/collections/ShardSplitTest.java
@@ -103,11 +103,6 @@ public class ShardSplitTest extends BasicDistributedZkTest {
 
     waitForThingsToLevelOut(15, TimeUnit.SECONDS);
 
-    if (usually()) {
-      log.info("Using legacyCloud=false for cluster");
-      CollectionAdminRequest.setClusterProperty(ZkStateReader.LEGACY_CLOUD, "false")
-          .process(cloudClient);
-    }
     incompleteOrOverlappingCustomRangeTest();
     splitByUniqueKeyTest();
     splitByRouteFieldTest();
@@ -416,10 +411,6 @@ public class ShardSplitTest extends BasicDistributedZkTest {
   public void testSplitWithChaosMonkey() throws Exception {
     waitForThingsToLevelOut(15, TimeUnit.SECONDS);
 
-    log.info("Using legacyCloud=false for cluster");
-    CollectionAdminRequest.setClusterProperty(ZkStateReader.LEGACY_CLOUD, "false")
-        .process(cloudClient);
-
     List<StoppableIndexingThread> indexers = new ArrayList<>();
     try {
       for (int i = 0; i < 1; i++) {
@@ -645,12 +636,6 @@ public class ShardSplitTest extends BasicDistributedZkTest {
   private void doSplitShardWithRule(SolrIndexSplitter.SplitMethod splitMethod) throws Exception {
     waitForThingsToLevelOut(15, TimeUnit.SECONDS);
 
-    if (usually()) {
-      log.info("Using legacyCloud=false for cluster");
-      CollectionAdminRequest.setClusterProperty(ZkStateReader.LEGACY_CLOUD, "false")
-          .process(cloudClient);
-    }
-
     log.info("Starting testSplitShardWithRule");
     String collectionName = "shardSplitWithRule_" + splitMethod.toLower();
     CollectionAdminRequest.Create createRequest = CollectionAdminRequest.createCollection(collectionName, "conf1", 1, 2)
diff --git a/solr/core/src/test/org/apache/solr/cloud/api/collections/SimpleCollectionCreateDeleteTest.java b/solr/core/src/test/org/apache/solr/cloud/api/collections/SimpleCollectionCreateDeleteTest.java
index 971bb81..9c0ada7 100644
--- a/solr/core/src/test/org/apache/solr/cloud/api/collections/SimpleCollectionCreateDeleteTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/api/collections/SimpleCollectionCreateDeleteTest.java
@@ -51,8 +51,7 @@ public class SimpleCollectionCreateDeleteTest extends AbstractFullDistribZkTestB
     }
     String collectionName = "SimpleCollectionCreateDeleteTest";
     CollectionAdminRequest.Create create = CollectionAdminRequest.createCollection(collectionName,1,1)
-            .setCreateNodeSet(overseerNode)
-            .setStateFormat(2);
+            .setCreateNodeSet(overseerNode);
 
     NamedList<Object> request = create.process(cloudClient).getResponse();
 
@@ -92,8 +91,7 @@ public class SimpleCollectionCreateDeleteTest extends AbstractFullDistribZkTestB
 
       // create collection again on a node other than the overseer leader
       create = CollectionAdminRequest.createCollection(collectionName,1,1)
-              .setCreateNodeSet(notOverseerNode)
-              .setStateFormat(2);
+              .setCreateNodeSet(notOverseerNode);
       request = create.process(cloudClient).getResponse();
       assertTrue("Collection creation should not have failed", request.get("success") != null);
     }
diff --git a/solr/core/src/test/org/apache/solr/cloud/api/collections/TestCollectionAPI.java b/solr/core/src/test/org/apache/solr/cloud/api/collections/TestCollectionAPI.java
index 804728b..9be6d9e 100644
--- a/solr/core/src/test/org/apache/solr/cloud/api/collections/TestCollectionAPI.java
+++ b/solr/core/src/test/org/apache/solr/cloud/api/collections/TestCollectionAPI.java
@@ -25,7 +25,6 @@ import java.util.Map;
 import java.util.concurrent.atomic.AtomicLong;
 
 import com.google.common.collect.Lists;
-import org.apache.solr.client.solrj.SolrQuery;
 import org.apache.solr.client.solrj.SolrRequest;
 import org.apache.solr.client.solrj.SolrServerException;
 import org.apache.solr.client.solrj.impl.BaseHttpSolrClient;
@@ -34,7 +33,6 @@ import org.apache.solr.client.solrj.request.CollectionAdminRequest;
 import org.apache.solr.client.solrj.request.QueryRequest;
 import org.apache.solr.client.solrj.request.V2Request;
 import org.apache.solr.client.solrj.response.CollectionAdminResponse;
-import org.apache.solr.client.solrj.response.QueryResponse;
 import org.apache.solr.cloud.ZkTestServer;
 import org.apache.solr.common.SolrException;
 import org.apache.solr.common.SolrInputDocument;
@@ -94,7 +92,6 @@ public class TestCollectionAPI extends ReplicaPropertiesBase {
     clusterStatusBadCollectionTest();
     replicaPropTest();
     clusterStatusZNodeVersion();
-    testClusterStateMigration();
     testCollectionCreationCollectionNameValidation();
     testCollectionCreationTooManyShards();
     testReplicationFactorValidaton();
@@ -895,34 +892,6 @@ public class TestCollectionAPI extends ReplicaPropertiesBase {
     }
   }
 
-  private void testClusterStateMigration() throws Exception {
-    try (CloudSolrClient client = createCloudClient(null)) {
-      client.connect();
-
-      CollectionAdminRequest.createCollection("testClusterStateMigration","conf1",1,1).setStateFormat(1).process(client);
-
-      waitForRecoveriesToFinish("testClusterStateMigration", true);
-
-      assertEquals(1, client.getZkStateReader().getClusterState().getCollection("testClusterStateMigration").getStateFormat());
-
-      for (int i = 0; i < 10; i++) {
-        SolrInputDocument doc = new SolrInputDocument();
-        doc.addField("id", "id_" + i);
-        client.add("testClusterStateMigration", doc);
-      }
-      client.commit("testClusterStateMigration");
-
-      CollectionAdminRequest.migrateCollectionFormat("testClusterStateMigration").process(client);
-
-      client.getZkStateReader().forceUpdateCollection("testClusterStateMigration");
-
-      assertEquals(2, client.getZkStateReader().getClusterState().getCollection("testClusterStateMigration").getStateFormat());
-
-      QueryResponse response = client.query("testClusterStateMigration", new SolrQuery("*:*"));
-      assertEquals(10, response.getResults().getNumFound());
-    }
-  }
-  
   private void testCollectionCreationCollectionNameValidation() throws Exception {
     try (CloudSolrClient client = createCloudClient(null)) {
       ModifiableSolrParams params = new ModifiableSolrParams();
diff --git a/solr/core/src/test/org/apache/solr/cloud/overseer/TestClusterStateMutator.java b/solr/core/src/test/org/apache/solr/cloud/overseer/TestClusterStateMutator.java
index 67e3244..0be579c 100644
--- a/solr/core/src/test/org/apache/solr/cloud/overseer/TestClusterStateMutator.java
+++ b/solr/core/src/test/org/apache/solr/cloud/overseer/TestClusterStateMutator.java
@@ -39,7 +39,7 @@ public class TestClusterStateMutator extends SolrTestCaseJ4 {
   }
   
   public void testCreateCollection() throws Exception {
-    ClusterState clusterState = new ClusterState(-1, Collections.<String>emptySet(), Collections.<String, DocCollection>emptyMap());
+    ClusterState clusterState = new ClusterState(Collections.<String>emptySet(), Collections.<String, DocCollection>emptyMap());
     DistribStateManager mockStateManager = mock(DistribStateManager.class);
     SolrCloudManager dataProvider = mock(SolrCloudManager.class);
     when(dataProvider.getDistribStateManager()).thenReturn(mockStateManager);
@@ -55,7 +55,7 @@ public class TestClusterStateMutator extends SolrTestCaseJ4 {
     assertEquals(1, collection.getSlicesMap().size());
     assertEquals(1, collection.getMaxShardsPerNode());
 
-    ClusterState state = new ClusterState(-1, Collections.<String>emptySet(), Collections.singletonMap("xyz", collection));
+    ClusterState state = new ClusterState(Collections.<String>emptySet(), Collections.singletonMap("xyz", collection));
     message = new ZkNodeProps(Utils.makeMap(
         "name", "abc",
         "numShards", "2",
diff --git a/solr/core/src/test/org/apache/solr/cloud/overseer/ZkCollectionPropsCachingTest.java b/solr/core/src/test/org/apache/solr/cloud/overseer/ZkCollectionPropsCachingTest.java
index a765ada..604aec5 100644
--- a/solr/core/src/test/org/apache/solr/cloud/overseer/ZkCollectionPropsCachingTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/overseer/ZkCollectionPropsCachingTest.java
@@ -46,11 +46,7 @@ public class ZkCollectionPropsCachingTest extends SolrCloudTestCase {
 
   @BeforeClass
   public static void setupClass() throws Exception {
-    Boolean useLegacyCloud = rarely();
-    log.info("Using legacyCloud?: {}", useLegacyCloud);
-
     configureCluster(4)
-        .withProperty(ZkStateReader.LEGACY_CLOUD, String.valueOf(useLegacyCloud))
         .addConfig("conf", configset("cloud-minimal"))
         .configure();
   }
diff --git a/solr/core/src/test/org/apache/solr/cloud/overseer/ZkStateReaderTest.java b/solr/core/src/test/org/apache/solr/cloud/overseer/ZkStateReaderTest.java
index f4c5bb2..2bf0971 100644
--- a/solr/core/src/test/org/apache/solr/cloud/overseer/ZkStateReaderTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/overseer/ZkStateReaderTest.java
@@ -19,7 +19,6 @@ package org.apache.solr.cloud.overseer;
 import java.nio.file.Path;
 import java.util.Collections;
 import java.util.HashMap;
-import java.util.Map;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.lucene.util.IOUtils;
@@ -34,109 +33,12 @@ import org.apache.solr.common.cloud.DocRouter;
 import org.apache.solr.common.cloud.SolrZkClient;
 import org.apache.solr.common.cloud.ZkStateReader;
 import org.apache.solr.common.util.TimeSource;
-import org.apache.solr.common.util.Utils;
 import org.apache.solr.util.TimeOut;
 
 public class ZkStateReaderTest extends SolrTestCaseJ4 {
 
   private static final long TIMEOUT = 30;
 
-  /** Uses explicit refresh to ensure latest changes are visible. */
-  public void testStateFormatUpdateWithExplicitRefresh() throws Exception {
-    testStateFormatUpdate(true, true);
-  }
-
-  /** Uses explicit refresh to ensure latest changes are visible. */
-  public void testStateFormatUpdateWithExplicitRefreshLazy() throws Exception {
-    testStateFormatUpdate(true, false);
-  }
-
-  /** ZkStateReader should automatically pick up changes based on ZK watches. */
-  public void testStateFormatUpdateWithTimeDelay() throws Exception {
-    testStateFormatUpdate(false, true);
-  }
-
-  /** ZkStateReader should automatically pick up changes based on ZK watches. */
-  public void testStateFormatUpdateWithTimeDelayLazy() throws Exception {
-    testStateFormatUpdate(false, false);
-  }
-
-  public void testStateFormatUpdate(boolean explicitRefresh, boolean isInteresting) throws Exception {
-    Path zkDir = createTempDir("testStateFormatUpdate");
-
-    ZkTestServer server = new ZkTestServer(zkDir);
-
-    SolrZkClient zkClient = null;
-    ZkStateReader reader = null;
-
-    try {
-      server.run();
-
-      zkClient = new SolrZkClient(server.getZkAddress(), OverseerTest.DEFAULT_CONNECTION_TIMEOUT);
-      ZkController.createClusterZkNodes(zkClient);
-
-      reader = new ZkStateReader(zkClient);
-      reader.createClusterStateWatchersAndUpdate();
-      if (isInteresting) {
-        reader.registerCore("c1");
-      }
-
-      ZkStateWriter writer = new ZkStateWriter(reader, new Stats());
-
-      zkClient.makePath(ZkStateReader.COLLECTIONS_ZKNODE + "/c1", true);
-
-      {
-        // create new collection with stateFormat = 1
-        DocCollection stateV1 = new DocCollection("c1", new HashMap<>(), new HashMap<>(), DocRouter.DEFAULT, 0, ZkStateReader.CLUSTER_STATE);
-        ZkWriteCommand c1 = new ZkWriteCommand("c1", stateV1);
-        writer.enqueueUpdate(reader.getClusterState(), Collections.singletonList(c1), null);
-        writer.writePendingUpdates();
-
-        Map map = (Map) Utils.fromJSON(zkClient.getData("/clusterstate.json", null, null, true));
-        assertNotNull(map.get("c1"));
-        boolean exists = zkClient.exists(ZkStateReader.COLLECTIONS_ZKNODE + "/c1/state.json", true);
-        assertFalse(exists);
-
-        if (explicitRefresh) {
-          reader.forceUpdateCollection("c1");
-        } else {
-          reader.waitForState("c1", TIMEOUT, TimeUnit.SECONDS, (n, c) -> c != null);
-        }
-
-        DocCollection collection = reader.getClusterState().getCollection("c1");
-        assertEquals(1, collection.getStateFormat());
-      }
-
-
-      {
-        // Now update the collection to stateFormat = 2
-        DocCollection stateV2 = new DocCollection("c1", new HashMap<>(), new HashMap<>(), DocRouter.DEFAULT, 0, ZkStateReader.COLLECTIONS_ZKNODE + "/c1/state.json");
-        ZkWriteCommand c2 = new ZkWriteCommand("c1", stateV2);
-        writer.enqueueUpdate(reader.getClusterState(), Collections.singletonList(c2), null);
-        writer.writePendingUpdates();
-
-        Map map = (Map) Utils.fromJSON(zkClient.getData("/clusterstate.json", null, null, true));
-        assertNull(map.get("c1"));
-        boolean exists = zkClient.exists(ZkStateReader.COLLECTIONS_ZKNODE + "/c1/state.json", true);
-        assertTrue(exists);
-
-        if (explicitRefresh) {
-          reader.forceUpdateCollection("c1");
-        } else {
-          reader.waitForState("c1", TIMEOUT, TimeUnit.SECONDS,
-              (n, c) -> c != null && c.getStateFormat() == 2);
-        }
-
-        DocCollection collection = reader.getClusterState().getCollection("c1");
-        assertEquals(2, collection.getStateFormat());
-      }
-    } finally {
-      IOUtils.close(reader, zkClient);
-      server.shutdown();
-
-    }
-  }
-
   public void testExternalCollectionWatchedNotWatched() throws Exception{
     Path zkDir = createTempDir("testExternalCollectionWatchedNotWatched");
     ZkTestServer server = new ZkTestServer(zkDir);
@@ -156,9 +58,9 @@ public class ZkStateReaderTest extends SolrTestCaseJ4 {
 
       zkClient.makePath(ZkStateReader.COLLECTIONS_ZKNODE + "/c1", true);
 
-      // create new collection with stateFormat = 2
+      // create new collection
       ZkWriteCommand c1 = new ZkWriteCommand("c1",
-          new DocCollection("c1", new HashMap<>(), new HashMap<>(), DocRouter.DEFAULT, 0, ZkStateReader.COLLECTIONS_ZKNODE + "/c1/state.json"));
+          new DocCollection("c1", new HashMap<>(), new HashMap<>(), DocRouter.DEFAULT, 0));
       writer.enqueueUpdate(reader.getClusterState(), Collections.singletonList(c1), null);
       writer.writePendingUpdates();
       reader.forceUpdateCollection("c1");
@@ -195,14 +97,14 @@ public class ZkStateReaderTest extends SolrTestCaseJ4 {
       zkClient.makePath(ZkStateReader.COLLECTIONS_ZKNODE + "/c1", true);
 
       ZkStateWriter writer = new ZkStateWriter(reader, new Stats());
-      DocCollection state = new DocCollection("c1", new HashMap<>(), new HashMap<>(), DocRouter.DEFAULT, 0, ZkStateReader.CLUSTER_STATE + "/c1/state.json");
+      DocCollection state = new DocCollection("c1", new HashMap<>(), new HashMap<>(), DocRouter.DEFAULT, 0);
       ZkWriteCommand wc = new ZkWriteCommand("c1", state);
       writer.enqueueUpdate(reader.getClusterState(), Collections.singletonList(wc), null);
       writer.writePendingUpdates();
       assertTrue(zkClient.exists(ZkStateReader.COLLECTIONS_ZKNODE + "/c1/state.json", true));
       reader.waitForState("c1", 1, TimeUnit.SECONDS, (liveNodes, collectionState) -> collectionState != null);
 
-      state = new DocCollection("c1", new HashMap<>(), Collections.singletonMap("x", "y"), DocRouter.DEFAULT, 0, ZkStateReader.CLUSTER_STATE + "/c1/state.json");
+      state = new DocCollection("c1", new HashMap<>(), Collections.singletonMap("x", "y"), DocRouter.DEFAULT, 0);
       wc = new ZkWriteCommand("c1", state);
       writer.enqueueUpdate(reader.getClusterState(), Collections.singletonList(wc), null);
       writer.writePendingUpdates();
@@ -253,8 +155,8 @@ public class ZkStateReaderTest extends SolrTestCaseJ4 {
       ZkStateWriter writer = new ZkStateWriter(reader, new Stats());
 
 
-      // create new collection with stateFormat = 2
-      DocCollection state = new DocCollection("c1", new HashMap<>(), new HashMap<>(), DocRouter.DEFAULT, 0, ZkStateReader.CLUSTER_STATE + "/c1/state.json");
+      // create new collection
+      DocCollection state = new DocCollection("c1", new HashMap<>(), new HashMap<>(), DocRouter.DEFAULT, 0);
       ZkWriteCommand wc = new ZkWriteCommand("c1", state);
       writer.enqueueUpdate(reader.getClusterState(), Collections.singletonList(wc), null);
       writer.writePendingUpdates();
@@ -266,7 +168,6 @@ public class ZkStateReaderTest extends SolrTestCaseJ4 {
       ClusterState.CollectionRef ref = reader.getClusterState().getCollectionRef("c1");
       assertNotNull(ref);
       assertFalse(ref.isLazilyLoaded());
-      assertEquals(2, ref.get().getStateFormat());
     } finally {
       IOUtils.close(reader, zkClient);
       server.shutdown();
diff --git a/solr/core/src/test/org/apache/solr/cloud/overseer/ZkStateWriterTest.java b/solr/core/src/test/org/apache/solr/cloud/overseer/ZkStateWriterTest.java
index bc2b42d..ea4e1e8 100644
--- a/solr/core/src/test/org/apache/solr/cloud/overseer/ZkStateWriterTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/overseer/ZkStateWriterTest.java
@@ -80,12 +80,9 @@ public class ZkStateWriterTest extends SolrTestCaseJ4 {
         zkClient.makePath(ZkStateReader.COLLECTIONS_ZKNODE + "/c2", true);
         zkClient.makePath(ZkStateReader.COLLECTIONS_ZKNODE + "/c3", true);
 
-        ZkWriteCommand c1 = new ZkWriteCommand("c1",
-            new DocCollection("c1", new HashMap<>(), new HashMap<>(), DocRouter.DEFAULT, 0, ZkStateReader.COLLECTIONS_ZKNODE + "/c1"));
-        ZkWriteCommand c2 = new ZkWriteCommand("c2",
-            new DocCollection("c2", new HashMap<>(), new HashMap<>(), DocRouter.DEFAULT, 0, ZkStateReader.COLLECTIONS_ZKNODE + "/c2"));
-        ZkWriteCommand c3 = new ZkWriteCommand("c3",
-            new DocCollection("c3", new HashMap<>(), new HashMap<>(), DocRouter.DEFAULT, 0, ZkStateReader.COLLECTIONS_ZKNODE + "/c3"));
+        ZkWriteCommand c1 = new ZkWriteCommand("c1", new DocCollection("c1", new HashMap<>(), new HashMap<>(), DocRouter.DEFAULT, 0));
+        ZkWriteCommand c2 = new ZkWriteCommand("c2", new DocCollection("c2", new HashMap<>(), new HashMap<>(), DocRouter.DEFAULT, 0));
+        ZkWriteCommand c3 = new ZkWriteCommand("c3", new DocCollection("c3", new HashMap<>(), new HashMap<>(), DocRouter.DEFAULT, 0));
         ZkStateWriter writer = new ZkStateWriter(reader, new Stats());
 
         // First write is flushed immediately
@@ -110,46 +107,6 @@ public class ZkStateWriterTest extends SolrTestCaseJ4 {
     }
   }
 
-  public void testSingleLegacyCollection() throws Exception {
-    Path zkDir = createTempDir("testSingleLegacyCollection");
-
-    ZkTestServer server = new ZkTestServer(zkDir);
-
-    SolrZkClient zkClient = null;
-
-    try {
-      server.run();
-
-      zkClient = new SolrZkClient(server.getZkAddress(), OverseerTest.DEFAULT_CONNECTION_TIMEOUT);
-      ZkController.createClusterZkNodes(zkClient);
-
-      try (ZkStateReader reader = new ZkStateReader(zkClient)) {
-        reader.createClusterStateWatchersAndUpdate();
-
-        ZkStateWriter writer = new ZkStateWriter(reader, new Stats());
-
-        zkClient.makePath(ZkStateReader.COLLECTIONS_ZKNODE + "/c1", true);
-
-        // create new collection with stateFormat = 1
-        ZkWriteCommand c1 = new ZkWriteCommand("c1",
-            new DocCollection("c1", new HashMap<String, Slice>(), new HashMap<String, Object>(), DocRouter.DEFAULT, 0, ZkStateReader.CLUSTER_STATE));
-
-        writer.enqueueUpdate(reader.getClusterState(), Collections.singletonList(c1), null);
-        writer.writePendingUpdates();
-
-        Map map = (Map) Utils.fromJSON(zkClient.getData("/clusterstate.json", null, null, true));
-        assertNotNull(map.get("c1"));
-        boolean exists = zkClient.exists(ZkStateReader.COLLECTIONS_ZKNODE + "/c1/state.json", true);
-        assertFalse(exists);
-      }
-
-    } finally {
-      IOUtils.close(zkClient);
-      server.shutdown();
-
-    }
-  }
-
   public void testSingleExternalCollection() throws Exception {
     Path zkDir = createTempDir("testSingleExternalCollection");
 
@@ -170,29 +127,23 @@ public class ZkStateWriterTest extends SolrTestCaseJ4 {
 
         zkClient.makePath(ZkStateReader.COLLECTIONS_ZKNODE + "/c1", true);
 
-        // create new collection with stateFormat = 2
+        // create new collection
         ZkWriteCommand c1 = new ZkWriteCommand("c1",
-            new DocCollection("c1", new HashMap<String, Slice>(), new HashMap<String, Object>(), DocRouter.DEFAULT, 0, ZkStateReader.COLLECTIONS_ZKNODE + "/c1/state.json"));
+            new DocCollection("c1", new HashMap<String, Slice>(), new HashMap<String, Object>(), DocRouter.DEFAULT, 0));
 
         writer.enqueueUpdate(reader.getClusterState(), Collections.singletonList(c1), null);
         writer.writePendingUpdates();
 
-        Map map = (Map) Utils.fromJSON(zkClient.getData("/clusterstate.json", null, null, true));
-        assertNull(map.get("c1"));
-        map = (Map) Utils.fromJSON(zkClient.getData(ZkStateReader.COLLECTIONS_ZKNODE + "/c1/state.json", null, null, true));
+        Map map = (Map) Utils.fromJSON(zkClient.getData(ZkStateReader.COLLECTIONS_ZKNODE + "/c1/state.json", null, null, true));
         assertNotNull(map.get("c1"));
       }
-
     } finally {
       IOUtils.close(zkClient);
       server.shutdown();
-
     }
-
-
   }
 
-  public void testExternalModificationToSharedClusterState() throws Exception {
+  public void testExternalModification() throws Exception {
     Path zkDir = createTempDir("testExternalModification");
 
     ZkTestServer server = new ZkTestServer(zkDir);
@@ -213,90 +164,15 @@ public class ZkStateWriterTest extends SolrTestCaseJ4 {
         zkClient.makePath(ZkStateReader.COLLECTIONS_ZKNODE + "/c1", true);
         zkClient.makePath(ZkStateReader.COLLECTIONS_ZKNODE + "/c2", true);
 
-        // create collection 1 with stateFormat = 1
-        ZkWriteCommand c1 = new ZkWriteCommand("c1",
-            new DocCollection("c1", new HashMap<String, Slice>(), new HashMap<String, Object>(), DocRouter.DEFAULT, 0, ZkStateReader.CLUSTER_STATE));
-        writer.enqueueUpdate(reader.getClusterState(), Collections.singletonList(c1), null);
-        writer.writePendingUpdates();
-
-        reader.forceUpdateCollection("c1");
-        reader.forceUpdateCollection("c2");
-        ClusterState clusterState = reader.getClusterState(); // keep a reference to the current cluster state object
-        assertTrue(clusterState.hasCollection("c1"));
-        assertFalse(clusterState.hasCollection("c2"));
-
-        // Simulate an external modification to /clusterstate.json
-        byte[] data = zkClient.getData("/clusterstate.json", null, null, true);
-        zkClient.setData("/clusterstate.json", data, true);
-
-        // enqueue another c1 so that ZkStateWriter has pending updates
-        writer.enqueueUpdate(clusterState, Collections.singletonList(c1), null);
-        assertTrue(writer.hasPendingUpdates());
-
-        // Will trigger flush
-        Thread.sleep(Overseer.STATE_UPDATE_DELAY + 100);
-        ZkWriteCommand c2 = new ZkWriteCommand("c2",
-            new DocCollection("c2", new HashMap<String, Slice>(), new HashMap<String, Object>(), DocRouter.DEFAULT, 0, ZkStateReader.getCollectionPath("c2")));
-
-        try {
-          writer.enqueueUpdate(clusterState, Collections.singletonList(c2), null); // we are sending in the old cluster state object
-          fail("Enqueue should not have succeeded");
-        } catch (KeeperException.BadVersionException bve) {
-          // expected
-        }
-
-        try {
-          writer.enqueueUpdate(reader.getClusterState(), Collections.singletonList(c2), null);
-          fail("enqueueUpdate after BadVersionException should not have succeeded");
-        } catch (IllegalStateException e) {
-          // expected
-        }
-
-        try {
-          writer.writePendingUpdates();
-          fail("writePendingUpdates after BadVersionException should not have succeeded");
-        } catch (IllegalStateException e) {
-          // expected
-        }
-      }
-    } finally {
-      IOUtils.close(zkClient);
-      server.shutdown();
-    }
-
-  }
-
-  public void testExternalModificationToStateFormat2() throws Exception {
-    Path zkDir = createTempDir("testExternalModificationToStateFormat2");
-
-    ZkTestServer server = new ZkTestServer(zkDir);
-
-    SolrZkClient zkClient = null;
-
-    try {
-      server.run();
-
-      zkClient = new SolrZkClient(server.getZkAddress(), OverseerTest.DEFAULT_CONNECTION_TIMEOUT);
-      ZkController.createClusterZkNodes(zkClient);
-
-      try (ZkStateReader reader = new ZkStateReader(zkClient)) {
-        reader.createClusterStateWatchersAndUpdate();
-
-        ZkStateWriter writer = new ZkStateWriter(reader, new Stats());
-
-        zkClient.makePath(ZkStateReader.COLLECTIONS_ZKNODE + "/c1", true);
-        zkClient.makePath(ZkStateReader.COLLECTIONS_ZKNODE + "/c2", true);
-
         ClusterState state = reader.getClusterState();
 
-        // create collection 2 with stateFormat = 2
+        // create collection 2
         ZkWriteCommand c2 = new ZkWriteCommand("c2",
-            new DocCollection("c2", new HashMap<String, Slice>(), new HashMap<String, Object>(), DocRouter.DEFAULT, 0, ZkStateReader.getCollectionPath("c2")));
+            new DocCollection("c2", new HashMap<String, Slice>(), new HashMap<String, Object>(), DocRouter.DEFAULT, 0));
         state = writer.enqueueUpdate(state, Collections.singletonList(c2), null);
         assertFalse(writer.hasPendingUpdates()); // first write is flushed immediately
 
-        int sharedClusterStateVersion = state.getZkClusterStateVersion();
-        int stateFormat2Version = state.getCollection("c2").getZNodeVersion();
+        int c2Version = state.getCollection("c2").getZNodeVersion();
 
         // Simulate an external modification to /collections/c2/state.json
         byte[] data = zkClient.getData(ZkStateReader.getCollectionPath("c2"), null, null, true);
@@ -307,8 +183,7 @@ public class ZkStateWriterTest extends SolrTestCaseJ4 {
         state = reader.getClusterState();
         log.info("Cluster state: {}", state);
         assertTrue(state.hasCollection("c2"));
-        assertEquals(sharedClusterStateVersion, (int) state.getZkClusterStateVersion());
-        assertEquals(stateFormat2Version + 1, state.getCollection("c2").getZNodeVersion());
+        assertEquals(c2Version + 1, state.getCollection("c2").getZNodeVersion());
 
         writer.enqueueUpdate(state, Collections.singletonList(c2), null);
         assertTrue(writer.hasPendingUpdates());
@@ -320,7 +195,7 @@ public class ZkStateWriterTest extends SolrTestCaseJ4 {
         // Will trigger flush
         Thread.sleep(Overseer.STATE_UPDATE_DELAY+100);
         ZkWriteCommand c1 = new ZkWriteCommand("c1",
-            new DocCollection("c1", new HashMap<String, Slice>(), new HashMap<String, Object>(), DocRouter.DEFAULT, 0, ZkStateReader.CLUSTER_STATE));
+            new DocCollection("c1", new HashMap<String, Slice>(), new HashMap<String, Object>(), DocRouter.DEFAULT, 0));
 
         try {
           writer.enqueueUpdate(state, Collections.singletonList(c1), null);
diff --git a/solr/core/src/test/org/apache/solr/core/snapshots/TestSolrCloudSnapshots.java b/solr/core/src/test/org/apache/solr/core/snapshots/TestSolrCloudSnapshots.java
index f0bae3b..56c0988 100644
--- a/solr/core/src/test/org/apache/solr/core/snapshots/TestSolrCloudSnapshots.java
+++ b/solr/core/src/test/org/apache/solr/core/snapshots/TestSolrCloudSnapshots.java
@@ -244,7 +244,7 @@ public class TestSolrCloudSnapshots extends SolrCloudTestCase {
     CollectionAdminRequest.DeleteSnapshot deleteSnap = new CollectionAdminRequest.DeleteSnapshot(collectionName, commitName);
     deleteSnap.process(solrClient);
 
-    // Wait for a while so that the clusterstate.json updates are propagated to the client side.
+    // Wait for a while so that the cluster state updates are propagated to the client side.
     Thread.sleep(2000);
     collectionState = solrClient.getZkStateReader().getClusterState().getCollection(collectionName);
 
diff --git a/solr/core/src/test/org/apache/solr/handler/admin/TestCollectionAPIs.java b/solr/core/src/test/org/apache/solr/handler/admin/TestCollectionAPIs.java
index a646912..6731b70 100644
--- a/solr/core/src/test/org/apache/solr/handler/admin/TestCollectionAPIs.java
+++ b/solr/core/src/test/org/apache/solr/handler/admin/TestCollectionAPIs.java
@@ -88,20 +88,20 @@ public class TestCollectionAPIs extends SolrTestCaseJ4 {
     //test a simple create collection call
     compareOutput(apiBag, "/collections", POST,
         "{create:{name:'newcoll', config:'schemaless', numShards:2, replicationFactor:2 }}", null,
-        "{name:newcoll, fromApi:'true', replicationFactor:'2', nrtReplicas:'2', collection.configName:schemaless, numShards:'2', stateFormat:'2', operation:create}");
+        "{name:newcoll, fromApi:'true', replicationFactor:'2', nrtReplicas:'2', collection.configName:schemaless, numShards:'2', operation:create}");
     
     compareOutput(apiBag, "/collections", POST,
         "{create:{name:'newcoll', config:'schemaless', numShards:2, nrtReplicas:2 }}", null,
-        "{name:newcoll, fromApi:'true', nrtReplicas:'2', replicationFactor:'2', collection.configName:schemaless, numShards:'2', stateFormat:'2', operation:create}");
+        "{name:newcoll, fromApi:'true', nrtReplicas:'2', replicationFactor:'2', collection.configName:schemaless, numShards:'2', operation:create}");
     
     compareOutput(apiBag, "/collections", POST,
         "{create:{name:'newcoll', config:'schemaless', numShards:2, nrtReplicas:2, tlogReplicas:2, pullReplicas:2 }}", null,
-        "{name:newcoll, fromApi:'true', nrtReplicas:'2', replicationFactor:'2', tlogReplicas:'2', pullReplicas:'2', collection.configName:schemaless, numShards:'2', stateFormat:'2', operation:create}");
+        "{name:newcoll, fromApi:'true', nrtReplicas:'2', replicationFactor:'2', tlogReplicas:'2', pullReplicas:'2', collection.configName:schemaless, numShards:'2', operation:create}");
 
     //test a create collection with custom properties
     compareOutput(apiBag, "/collections", POST,
         "{create:{name:'newcoll', config:'schemaless', numShards:2, replicationFactor:2, properties:{prop1:'prop1val', prop2: prop2val} }}", null,
-        "{name:newcoll, fromApi:'true', replicationFactor:'2', nrtReplicas:'2', collection.configName:schemaless, numShards:'2', stateFormat:'2', operation:create, property.prop1:prop1val, property.prop2:prop2val}");
+        "{name:newcoll, fromApi:'true', replicationFactor:'2', nrtReplicas:'2', collection.configName:schemaless, numShards:'2', operation:create, property.prop1:prop1val, property.prop2:prop2val}");
 
 
     compareOutput(apiBag, "/collections", POST,
diff --git a/solr/core/src/test/org/apache/solr/handler/component/TestHttpShardHandlerFactory.java b/solr/core/src/test/org/apache/solr/handler/component/TestHttpShardHandlerFactory.java
index 5380a32..32c665e 100644
--- a/solr/core/src/test/org/apache/solr/handler/component/TestHttpShardHandlerFactory.java
+++ b/solr/core/src/test/org/apache/solr/handler/component/TestHttpShardHandlerFactory.java
@@ -142,7 +142,7 @@ public class TestHttpShardHandlerFactory extends SolrTestCaseJ4 {
         "1.2.3.4:9000_",
         "1.2.3.4:9001_solr-2",
     }));
-    ClusterState cs = new ClusterState(0, liveNodes, new HashMap<>());
+    ClusterState cs = new ClusterState(liveNodes, new HashMap<>());
     WhitelistHostChecker checker = new WhitelistHostChecker(null, true);
     Set<String> hostSet = checker.generateWhitelistFromLiveNodes(cs);
     assertThat(hostSet.size(), is(3));
diff --git a/solr/core/src/test/org/apache/solr/util/TestUtils.java b/solr/core/src/test/org/apache/solr/util/TestUtils.java
index beb0bf6..71ff9bc 100644
--- a/solr/core/src/test/org/apache/solr/util/TestUtils.java
+++ b/solr/core/src/test/org/apache/solr/util/TestUtils.java
@@ -316,7 +316,7 @@ public class TestUtils extends SolrTestCaseJ4 {
     assertEquals("b1", Utils.getObjectByPath(sink, true, "k1/k11/a1"));
 
     sink = new HashMap<>();
-    sink.put("legacyCloud", "false");
+    sink.put("autoAddReplicas", "false");
     assertTrue(Utils.mergeJson(sink, (Map<String, Object>) Utils.fromJSONString("collectionDefaults:{numShards:3 , nrtReplicas:2}")));
     assertEquals(3L, Utils.getObjectByPath(sink, true, ImmutableList.of(COLLECTION_DEF, NUM_SHARDS_PROP)));
     assertEquals(2L, Utils.getObjectByPath(sink, true, ImmutableList.of(COLLECTION_DEF, NRT_REPLICAS)));
diff --git a/solr/solr-ref-guide/src/cluster-node-management.adoc b/solr/solr-ref-guide/src/cluster-node-management.adoc
index e477802..4269ed0 100644
--- a/solr/solr-ref-guide/src/cluster-node-management.adoc
+++ b/solr/solr-ref-guide/src/cluster-node-management.adoc
@@ -131,7 +131,7 @@ Add, edit or delete a cluster-wide property.
 === CLUSTERPROP Parameters
 
 `name`::
-The name of the property. Supported properties names are `autoAddReplicas`, `legacyCloud`, `location`, `maxCoresPerNode`, `urlScheme` and `defaultShardPreferences`. Other properties can be set
+The name of the property. Supported properties names are `autoAddReplicas`, `location`, `maxCoresPerNode`, `urlScheme` and `defaultShardPreferences`. Other properties can be set
 (for example, if you need them for custom plugins) but they must begin with the prefix `ext.`. Unknown properties that don't begin with `ext.` will be rejected.
 
 `val`::
@@ -498,21 +498,4 @@ http://localhost:8983/solr/admin/collections?action=OVERSEERSTATUS
   ],
   "..."
  }
-----
-
-[[migratestateformat]]
-== MIGRATESTATEFORMAT: Migrate Cluster State
-
-A expert level utility API to move a collection from shared `clusterstate.json` ZooKeeper node (created with `stateFormat=1`, the default in all Solr releases prior to 5.0) to the per-collection `state.json` stored in ZooKeeper (created with `stateFormat=2`, the current default) seamlessly without any application down-time.
-
-`/admin/collections?action=MIGRATESTATEFORMAT&collection=<collection_name>`
-
-=== MIGRATESTATEFORMAT Parameters
-
-`collection`::
-The name of the collection to be migrated from `clusterstate.json` to its own `state.json` ZooKeeper node. This parameter is required.
-
-`async`::
-Request ID to track this action which will be <<collections-api.adoc#asynchronous-calls,processed asynchronously>>.
-
-This API is useful in migrating any collections created prior to Solr 5.0 to the more scalable cluster state format now used by default. If a collection was created in any Solr 5.x version or higher, then executing this command is not necessary.
+----
\ No newline at end of file
diff --git a/solr/solr-ref-guide/src/collection-management.adoc b/solr/solr-ref-guide/src/collection-management.adoc
index 981ad5f..6d2b2a9 100644
--- a/solr/solr-ref-guide/src/collection-management.adoc
+++ b/solr/solr-ref-guide/src/collection-management.adoc
@@ -903,7 +903,6 @@ http://localhost:8983/solr/admin/collections?action=COLSTATUS&collection=getting
         "QTime": 50
     },
     "gettingstarted": {
-        "stateFormat": 2,
         "znodeVersion": 16,
         "properties": {
             "autoAddReplicas": "false",
@@ -1049,7 +1048,6 @@ http://localhost:8983/solr/admin/collections?action=COLSTATUS&collection=getting
         "QTime": 26812
     },
     "gettingstarted": {
-        "stateFormat": 2,
         "znodeVersion": 33,
         "properties": {
             "autoAddReplicas": "false",
diff --git a/solr/solr-ref-guide/src/major-changes-in-solr-9.adoc b/solr/solr-ref-guide/src/major-changes-in-solr-9.adoc
index cdb0b86..1e7774a 100644
--- a/solr/solr-ref-guide/src/major-changes-in-solr-9.adoc
+++ b/solr/solr-ref-guide/src/major-changes-in-solr-9.adoc
@@ -105,8 +105,18 @@ _(raw; not yet edited)_
 
 * SOLR-11775: Return long value for facet count in Json Facet module irrespective of number of shards (hossman, Munendra S N)
 
+* SOLR-12823: Remove /clusterstate.json support, i.e. support for collections created with stateFormat=1 as well as support
+  for Collection API MIGRATESTATEFORMAT action. Also removes support for cluster property `legacyCloud` (as if always false now).
+
 === Upgrade Prerequisites in Solr 9
 
+* Upgrade all collections in stateFormat=1 to stateFormat=2 *before* upgrading to Solr 9, as Solr 9 does not support the
+older format and no longer supports migrating collections from the older format to the current format (previously known
+as stateFormat=2).
+Upgrade is to be done using Collection API MIGRATESTATEFORMAT action using a previous version of Solr.
+See for example https://lucene.apache.org/solr/guide/8_5/cluster-node-management.html#migratestateforma[Solr 8.5 Ref Guide].
+// Can't link directly to .adoc file, need to link to 8.something ref guide as MIGRATESTATEFORMAT no longer exists in 9.0.
+
 === Rolling Upgrades with Solr 9
 
 === Reindexing After Upgrades in Solr 9
diff --git a/solr/solr-ref-guide/src/rule-based-replica-placement.adoc b/solr/solr-ref-guide/src/rule-based-replica-placement.adoc
index 34f990c..b22383b 100644
--- a/solr/solr-ref-guide/src/rule-based-replica-placement.adoc
+++ b/solr/solr-ref-guide/src/rule-based-replica-placement.adoc
@@ -174,4 +174,4 @@ Rules are specified per collection during collection creation as request paramet
 snitch=class:EC2Snitch&rule=shard:*,replica:1,dc:dc1&rule=shard:*,replica:<2,dc:dc3
 ----
 
-These rules are persisted in `clusterstate.json` in ZooKeeper and are available throughout the lifetime of the collection. This enables the system to perform any future node allocation without direct user interaction. The rules added during collection creation can be modified later using the <<collection-management.adoc#modifycollection,MODIFYCOLLECTION>> API.
+These rules are persisted in the collection's `state.json` in ZooKeeper and are available throughout the lifetime of the collection. This enables the system to perform any future node allocation without direct user interaction. The rules added during collection creation can be modified later using the <<collection-management.adoc#modifycollection,MODIFYCOLLECTION>> API.
diff --git a/solr/solr-ref-guide/src/shard-management.adoc b/solr/solr-ref-guide/src/shard-management.adoc
index fa0712d..228d5f9 100644
--- a/solr/solr-ref-guide/src/shard-management.adoc
+++ b/solr/solr-ref-guide/src/shard-management.adoc
@@ -272,7 +272,7 @@ http://localhost:8983/solr/admin/collections?action=CREATESHARD&collection=anImp
 [[deleteshard]]
 == DELETESHARD: Delete a Shard
 
-Deleting a shard will unload all replicas of the shard, remove them from `clusterstate.json`, and (by default) delete the instanceDir and dataDir for each replica. It will only remove shards that are inactive, or which have no range given for custom sharding.
+Deleting a shard will unload all replicas of the shard, remove them from the collection's `state.json`, and (by default) delete the instanceDir and dataDir for each replica. It will only remove shards that are inactive, or which have no range given for custom sharding.
 
 `/admin/collections?action=DELETESHARD&shard=_shardID_&collection=_name_`
 
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/Policy.java b/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/Policy.java
index c137100..0f17306 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/Policy.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/Policy.java
@@ -550,7 +550,6 @@ public class Policy implements MapWriter {
     final SolrCloudManager cloudManager;
     final List<Row> matrix;
     final NodeStateProvider nodeStateProvider;
-    final int znodeVersion;
     Set<String> collections = new HashSet<>();
     final Policy policy;
     List<Clause> expandedClauses;
@@ -569,7 +568,6 @@ public class Policy implements MapWriter {
       } catch (Exception e) {
         log.trace("-- session created, can't obtain cluster state", e);
       }
-      this.znodeVersion = state != null ? state.getZNodeVersion() : -1;
       this.nodes = new ArrayList<>(cloudManager.getClusterStateProvider().getLiveNodes());
       this.cloudManager = cloudManager;
       for (String node : nodes) {
@@ -608,7 +606,7 @@ public class Policy implements MapWriter {
     }
 
     private Session(List<String> nodes, SolrCloudManager cloudManager,
-                   List<Row> matrix, List<Clause> expandedClauses, int znodeVersion,
+                   List<Row> matrix, List<Clause> expandedClauses,
                    NodeStateProvider nodeStateProvider, Policy policy, Transaction transaction) {
       this.transaction = transaction;
       this.policy = policy;
@@ -616,7 +614,6 @@ public class Policy implements MapWriter {
       this.cloudManager = cloudManager;
       this.matrix = matrix;
       this.expandedClauses = expandedClauses;
-      this.znodeVersion = znodeVersion;
       this.nodeStateProvider = nodeStateProvider;
       for (Row row : matrix) row.session = this;
     }
@@ -638,7 +635,7 @@ public class Policy implements MapWriter {
     }
 
     Session copy() {
-      return new Session(nodes, cloudManager, getMatrixCopy(), expandedClauses, znodeVersion, nodeStateProvider, policy, transaction);
+      return new Session(nodes, cloudManager, getMatrixCopy(), expandedClauses, nodeStateProvider, policy, transaction);
     }
 
     public Row getNode(String node) {
@@ -687,7 +684,6 @@ public class Policy implements MapWriter {
 
     @Override
     public void writeMap(EntryWriter ew) throws IOException {
-      ew.put("znodeVersion", znodeVersion);
       for (Row row : matrix) {
         ew.put(row.node, row);
       }
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/BaseCloudSolrClient.java b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/BaseCloudSolrClient.java
index 73dc6c1..6f1223f 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/BaseCloudSolrClient.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/BaseCloudSolrClient.java
@@ -227,7 +227,7 @@ public abstract class BaseCloudSolrClient extends SolrClient {
     this.requestRLTGenerator = new RequestReplicaListTransformerGenerator();
   }
 
-  /** Sets the cache ttl for DocCollection Objects cached  . This is only applicable for collections which are persisted outside of clusterstate.json
+  /** Sets the cache ttl for DocCollection Objects cached.
    * @param seconds ttl value in seconds
    */
   public void setCollectionCacheTTl(int seconds){
@@ -877,18 +877,16 @@ public abstract class BaseCloudSolrClient extends SolrClient {
           throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Collection not found: " + requestedCollection);
         }
         int collVer = coll.getZNodeVersion();
-        if (coll.getStateFormat()>1) {
-          if(requestedCollections == null) requestedCollections = new ArrayList<>(requestedCollectionNames.size());
-          requestedCollections.add(coll);
+        if(requestedCollections == null) requestedCollections = new ArrayList<>(requestedCollectionNames.size());
+        requestedCollections.add(coll);
 
-          if (stateVerParamBuilder == null) {
-            stateVerParamBuilder = new StringBuilder();
-          } else {
-            stateVerParamBuilder.append("|"); // hopefully pipe is not an allowed char in a collection name
-          }
-
-          stateVerParamBuilder.append(coll.getName()).append(":").append(collVer);
+        if (stateVerParamBuilder == null) {
+          stateVerParamBuilder = new StringBuilder();
+        } else {
+          stateVerParamBuilder.append("|"); // hopefully pipe is not an allowed char in a collection name
         }
+
+        stateVerParamBuilder.append(coll.getName()).append(":").append(collVer);
       }
 
       if (stateVerParamBuilder != null) {
@@ -1226,8 +1224,7 @@ public abstract class BaseCloudSolrClient extends SolrClient {
         cacheEntry.setRetriedAt();//we retried and found that it is the same version
         cacheEntry.maybeStale = false;
       } else {
-        if (fetchedCol.getStateFormat() > 1)
-          collectionStateCache.put(collection, new ExpiringCachedDocCollection(fetchedCol));
+        collectionStateCache.put(collection, new ExpiringCachedDocCollection(fetchedCol));
       }
       return fetchedCol;
     }
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/BaseHttpClusterStateProvider.java b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/BaseHttpClusterStateProvider.java
index 03fb2aa..8ed7854 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/BaseHttpClusterStateProvider.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/BaseHttpClusterStateProvider.java
@@ -138,8 +138,7 @@ public abstract class BaseHttpClusterStateProvider implements ClusterStateProvid
     Set<String> liveNodes = new HashSet((List<String>)(cluster.get("live_nodes")));
     this.liveNodes = liveNodes;
     liveNodesTimestamp = System.nanoTime();
-    //TODO SOLR-11877 we don't know the znode path; CLUSTER_STATE is probably wrong leading to bad stateFormat
-    ClusterState cs = ClusterState.load(znodeVersion, collectionsMap, liveNodes, ZkStateReader.CLUSTER_STATE);
+    ClusterState cs = ClusterState.createFromCollectionMap(znodeVersion, collectionsMap, liveNodes);
     if (clusterProperties != null) {
       Map<String, Object> properties = (Map<String, Object>) cluster.get("properties");
       if (properties != null) {
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/request/CollectionAdminRequest.java b/solr/solrj/src/java/org/apache/solr/client/solrj/request/CollectionAdminRequest.java
index c2e7869..2ec73a7 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/request/CollectionAdminRequest.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/request/CollectionAdminRequest.java
@@ -447,7 +447,6 @@ public abstract class CollectionAdminRequest<T extends CollectionAdminResponse>
     protected Properties properties;
     protected Boolean autoAddReplicas;
     protected String alias;
-    protected Integer stateFormat;
     protected String[] rule , snitch;
     protected String withCollection;
 
@@ -486,7 +485,6 @@ public abstract class CollectionAdminRequest<T extends CollectionAdminResponse>
     public Create setPullReplicas(Integer pullReplicas) { this.pullReplicas = pullReplicas; return this;}
 
     public Create setReplicationFactor(Integer repl) { this.nrtReplicas = repl; return this; }
-    public Create setStateFormat(Integer stateFormat) { this.stateFormat = stateFormat; return this; }
     public Create setRule(String... s){ this.rule = s; return this; }
     public Create setSnitch(String... s){ this.snitch = s; return this; }
 
@@ -508,8 +506,6 @@ public abstract class CollectionAdminRequest<T extends CollectionAdminResponse>
     public Integer getNumTlogReplicas() {return tlogReplicas;}
     public Integer getNumPullReplicas() {return pullReplicas;}
 
-    public Integer getStateFormat() { return stateFormat; }
-
     /**
      * Provide the name of the shards to be created, separated by commas
      *
@@ -579,9 +575,6 @@ public abstract class CollectionAdminRequest<T extends CollectionAdminResponse>
       if (properties != null) {
         addProperties(params, properties);
       }
-      if (stateFormat != null) {
-        params.set(DocCollection.STATE_FORMAT, stateFormat);
-      }
       if (pullReplicas != null) {
         params.set(ZkStateReader.PULL_REPLICAS, pullReplicas);
       }
@@ -2798,35 +2791,6 @@ public abstract class CollectionAdminRequest<T extends CollectionAdminResponse>
       params.set("property", propertyName);
       return params;
     }
-
-
-  }
-
-  /**
-   * Returns a SolrRequest to migrate a collection state format
-   *
-   * This is an expert-level request, and should not generally be necessary.
-   */
-  public static MigrateClusterState migrateCollectionFormat(String collection) {
-    return new MigrateClusterState(collection);
-  }
-
-  // MIGRATECLUSTERSTATE request
-  public static class MigrateClusterState extends AsyncCollectionAdminRequest {
-
-    protected String collection;
-
-    private MigrateClusterState(String collection) {
-      super(CollectionAction.MIGRATESTATEFORMAT);
-      this.collection = checkNotNull(CoreAdminParams.COLLECTION, collection);
-    }
-
-    @Override
-    public SolrParams getParams() {
-      ModifiableSolrParams params = new ModifiableSolrParams(super.getParams());
-      params.set(CoreAdminParams.COLLECTION, collection);
-      return params;
-    }
   }
 
   /**
diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/ClusterState.java b/solr/solrj/src/java/org/apache/solr/common/cloud/ClusterState.java
index 80f26f7..2ac25f6 100644
--- a/solr/solrj/src/java/org/apache/solr/common/cloud/ClusterState.java
+++ b/solr/solrj/src/java/org/apache/solr/common/cloud/ClusterState.java
@@ -39,8 +39,6 @@ import org.noggit.JSONWriter;
  * @lucene.experimental
  */
 public class ClusterState implements JSONWriter.Writable {
-  
-  private final Integer znodeVersion;
 
   private final Map<String, CollectionRef> collectionStates, immutableCollectionStates;
   private Set<String> liveNodes;
@@ -48,9 +46,8 @@ public class ClusterState implements JSONWriter.Writable {
   /**
    * Use this constr when ClusterState is meant for consumption.
    */
-  public ClusterState(Integer znodeVersion, Set<String> liveNodes,
-      Map<String, DocCollection> collectionStates) {
-    this(liveNodes, getRefMap(collectionStates),znodeVersion);
+  public ClusterState(Set<String> liveNodes, Map<String, DocCollection> collectionStates) {
+    this(getRefMap(collectionStates), liveNodes);
   }
 
   private static Map<String, CollectionRef> getRefMap(Map<String, DocCollection> collectionStates) {
@@ -62,26 +59,26 @@ public class ClusterState implements JSONWriter.Writable {
     return collRefs;
   }
 
-  /**Use this if all the collection states are not readily available and some needs to be lazily loaded
+  /**
+   * Use this if all the collection states are not readily available and some needs to be lazily loaded
+   * (parameter order different from constructor above to have different erasures)
    */
-  public ClusterState(Set<String> liveNodes, Map<String, CollectionRef> collectionStates, Integer znodeVersion){
-    this.znodeVersion = znodeVersion;
+  public ClusterState(Map<String, CollectionRef> collectionStates, Set<String> liveNodes){
     this.liveNodes = new HashSet<>(liveNodes.size());
     this.liveNodes.addAll(liveNodes);
     this.collectionStates = new LinkedHashMap<>(collectionStates);
     this.immutableCollectionStates = Collections.unmodifiableMap(collectionStates);
   }
 
-
   /**
    * Returns a new cluster state object modified with the given collection.
    *
    * @param collectionName the name of the modified (or deleted) collection
    * @param collection     the collection object. A null value deletes the collection from the state
-   * @return the updated cluster state which preserves the current live nodes and zk node version
+   * @return the updated cluster state which preserves the current live nodes
    */
   public ClusterState copyWith(String collectionName, DocCollection collection) {
-    ClusterState result = new ClusterState(liveNodes, new LinkedHashMap<>(collectionStates), znodeVersion);
+    ClusterState result = new ClusterState(new LinkedHashMap<>(collectionStates), liveNodes);
     if (collection == null) {
       result.collectionStates.remove(collectionName);
     } else {
@@ -91,13 +88,6 @@ public class ClusterState implements JSONWriter.Writable {
   }
 
   /**
-   * Returns the zNode version that was used to construct this instance.
-   */
-  public int getZNodeVersion() {
-    return znodeVersion;
-  }
-
-  /**
    * Returns true if the specified collection name exists, false otherwise.
    *
    * Implementation note: This method resolves the collection reference by calling
@@ -210,47 +200,42 @@ public class ClusterState implements JSONWriter.Writable {
   @Override
   public String toString() {
     StringBuilder sb = new StringBuilder();
-    sb.append("znodeVersion: ").append(znodeVersion);
-    sb.append("\n");
     sb.append("live nodes:").append(liveNodes);
     sb.append("\n");
     sb.append("collections:").append(collectionStates);
     return sb.toString();
   }
 
-  public static ClusterState load(Integer version, byte[] bytes, Set<String> liveNodes) {
-    return load(version, bytes, liveNodes, ZkStateReader.CLUSTER_STATE);
-  }
   /**
-   * Create ClusterState from json string that is typically stored in zookeeper.
+   * Create a ClusterState from Json.
    * 
-   * @param version zk version of the clusterstate.json file (bytes)
-   * @param bytes clusterstate.json as a byte array
+   * @param bytes a byte array of a Json representation of a mapping from collection name to the Json representation of a
+   *              {@link DocCollection} as written by {@link #write(JSONWriter)}. It can represent
+   *              one or more collections.
    * @param liveNodes list of live nodes
    * @return the ClusterState
    */
-  public static ClusterState load(Integer version, byte[] bytes, Set<String> liveNodes, String znode) {
-    // System.out.println("######## ClusterState.load:" + (bytes==null ? null : new String(bytes)));
+  public static ClusterState createFromJson(int version, byte[] bytes, Set<String> liveNodes) {
     if (bytes == null || bytes.length == 0) {
-      return new ClusterState(version, liveNodes, Collections.<String, DocCollection>emptyMap());
+      return new ClusterState(liveNodes, Collections.<String, DocCollection>emptyMap());
     }
     Map<String, Object> stateMap = (Map<String, Object>) Utils.fromJSON(bytes);
-    return load(version, stateMap, liveNodes, znode);
+    return createFromCollectionMap(version, stateMap, liveNodes);
   }
 
-  public static ClusterState load(Integer version, Map<String, Object> stateMap, Set<String> liveNodes, String znode) {
+  public static ClusterState createFromCollectionMap(int version, Map<String, Object> stateMap, Set<String> liveNodes) {
     Map<String,CollectionRef> collections = new LinkedHashMap<>(stateMap.size());
     for (Entry<String, Object> entry : stateMap.entrySet()) {
       String collectionName = entry.getKey();
-      DocCollection coll = collectionFromObjects(collectionName, (Map<String,Object>)entry.getValue(), version, znode);
+      DocCollection coll = collectionFromObjects(collectionName, (Map<String,Object>)entry.getValue(), version);
       collections.put(collectionName, new CollectionRef(coll));
     }
 
-    return new ClusterState( liveNodes, collections,version);
+    return new ClusterState(collections, liveNodes);
   }
 
   // TODO move to static DocCollection.loadFromMap
-  private static DocCollection collectionFromObjects(String name, Map<String, Object> objs, Integer version, String znode) {
+  private static DocCollection collectionFromObjects(String name, Map<String, Object> objs, int version) {
     Map<String,Object> props;
     Map<String,Slice> slices;
 
@@ -277,43 +262,25 @@ public class ClusterState implements JSONWriter.Writable {
       router = DocRouter.getDocRouter((String) routerProps.get("name"));
     }
 
-    return new DocCollection(name, slices, props, router, version, znode);
+    return new DocCollection(name, slices, props, router, version);
   }
 
   @Override
   public void write(JSONWriter jsonWriter) {
     LinkedHashMap<String , DocCollection> map = new LinkedHashMap<>();
     for (Entry<String, CollectionRef> e : collectionStates.entrySet()) {
-      // using this class check to avoid fetching from ZK in case of lazily loaded collection
       if (e.getValue().getClass() == CollectionRef.class) {
-        // check if it is a lazily loaded collection outside of clusterstate.json
         DocCollection coll = e.getValue().get();
-        if (coll.getStateFormat() == 1) {
-          map.put(coll.getName(),coll);
-        }
+        map.put(coll.getName(),coll);
       }
     }
     jsonWriter.write(map);
   }
 
-  /**
-   * The version of clusterstate.json in ZooKeeper.
-   * 
-   * @return null if ClusterState was created for publication, not consumption
-   * @deprecated true cluster state spans many ZK nodes, stop depending on the version number of the shared node!
-   * will be removed in 8.0
-   */
-  @Deprecated
-  public Integer getZkClusterStateVersion() {
-    return znodeVersion;
-  }
-
   @Override
   public int hashCode() {
     final int prime = 31;
     int result = 1;
-    result = prime * result
-        + ((znodeVersion == null) ? 0 : znodeVersion.hashCode());
     result = prime * result + ((liveNodes == null) ? 0 : liveNodes.hashCode());
     return result;
   }
@@ -324,13 +291,9 @@ public class ClusterState implements JSONWriter.Writable {
     if (obj == null) return false;
     if (getClass() != obj.getClass()) return false;
     ClusterState other = (ClusterState) obj;
-    if (znodeVersion == null) {
-      if (other.znodeVersion != null) return false;
-    } else if (!znodeVersion.equals(other.znodeVersion)) return false;
     if (liveNodes == null) {
-      if (other.liveNodes != null) return false;
-    } else if (!liveNodes.equals(other.liveNodes)) return false;
-    return true;
+      return other.liveNodes == null;
+    } else return liveNodes.equals(other.liveNodes);
   }
 
 
diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/DocCollection.java b/solr/solrj/src/java/org/apache/solr/common/cloud/DocCollection.java
index 08d5296..831d97d 100644
--- a/solr/solrj/src/java/org/apache/solr/common/cloud/DocCollection.java
+++ b/solr/solrj/src/java/org/apache/solr/common/cloud/DocCollection.java
@@ -50,7 +50,6 @@ public class DocCollection extends ZkNodeProps implements Iterable<Slice> {
 
   public static final String DOC_ROUTER = "router";
   public static final String SHARDS = "shards";
-  public static final String STATE_FORMAT = "stateFormat";
   public static final String RULE = "rule";
   public static final String SNITCH = "snitch";
 
@@ -63,7 +62,6 @@ public class DocCollection extends ZkNodeProps implements Iterable<Slice> {
   private final Map<String, List<Replica>> nodeNameReplicas;
   private final Map<String, List<Replica>> nodeNameLeaderReplicas;
   private final DocRouter router;
-  private final String znode;
 
   private final Integer replicationFactor;
   private final Integer numNrtReplicas;
@@ -75,15 +73,16 @@ public class DocCollection extends ZkNodeProps implements Iterable<Slice> {
   private final Boolean readOnly;
 
   public DocCollection(String name, Map<String, Slice> slices, Map<String, Object> props, DocRouter router) {
-    this(name, slices, props, router, Integer.MAX_VALUE, ZkStateReader.CLUSTER_STATE);
+    this(name, slices, props, router, Integer.MAX_VALUE);
   }
 
   /**
    * @param name  The name of the collection
    * @param slices The logical shards of the collection.  This is used directly and a copy is not made.
    * @param props  The properties of the slice.  This is used directly and a copy is not made.
+   * @param zkVersion The version of the Collection node in Zookeeper (used for conditional updates).
    */
-  public DocCollection(String name, Map<String, Slice> slices, Map<String, Object> props, DocRouter router, int zkVersion, String znode) {
+  public DocCollection(String name, Map<String, Slice> slices, Map<String, Object> props, DocRouter router, int zkVersion) {
     super(props==null ? props = new HashMap<>() : props);
     // -1 means any version in ZK CAS, so we choose Integer.MAX_VALUE instead to avoid accidental overwrites
     this.znodeVersion = zkVersion == -1 ? Integer.MAX_VALUE : zkVersion;
@@ -119,7 +118,6 @@ public class DocCollection extends ZkNodeProps implements Iterable<Slice> {
     }
     this.activeSlicesArr = activeSlices.values().toArray(new Slice[activeSlices.size()]);
     this.router = router;
-    this.znode = znode == null? ZkStateReader.CLUSTER_STATE : znode;
     assert name != null && slices != null;
   }
 
@@ -172,7 +170,7 @@ public class DocCollection extends ZkNodeProps implements Iterable<Slice> {
    * @return the resulting DocCollection
    */
   public DocCollection copyWithSlices(Map<String, Slice> slices){
-    return new DocCollection(getName(), slices, propMap, router, znodeVersion,znode);
+    return new DocCollection(getName(), slices, propMap, router, znodeVersion);
   }
 
   /**
@@ -247,9 +245,6 @@ public class DocCollection extends ZkNodeProps implements Iterable<Slice> {
     return znodeVersion;
   }
 
-  public int getStateFormat() {
-    return ZkStateReader.CLUSTER_STATE.equals(znode) ? 1 : 2;
-  }
   /**
    * @return replication factor for this collection or null if no
    *         replication factor exists.
@@ -270,11 +265,6 @@ public class DocCollection extends ZkNodeProps implements Iterable<Slice> {
     return maxShardsPerNode == 0 ? Integer.MAX_VALUE : maxShardsPerNode;
   }
 
-  public String getZNode(){
-    return znode;
-  }
-
-
   public DocRouter getRouter() {
     return router;
   }
@@ -285,7 +275,7 @@ public class DocCollection extends ZkNodeProps implements Iterable<Slice> {
 
   @Override
   public String toString() {
-    return "DocCollection("+name+"/" + znode + "/" + znodeVersion + ")=" + toJSONString(this);
+    return "DocCollection("+name+"/" + znodeVersion + ")=" + toJSONString(this);
   }
 
   @Override
@@ -386,10 +376,10 @@ public class DocCollection extends ZkNodeProps implements Iterable<Slice> {
 
   @Override
   public boolean equals(Object that) {
-    if (that instanceof DocCollection == false)
+    if (!(that instanceof DocCollection))
       return false;
     DocCollection other = (DocCollection) that;
-    return super.equals(that) && Objects.equals(this.znode, other.znode) && this.znodeVersion == other.znodeVersion;
+    return super.equals(that) && Objects.equals(this.name, other.name) && this.znodeVersion == other.znodeVersion;
   }
 
   /**
@@ -426,6 +416,5 @@ public class DocCollection extends ZkNodeProps implements Iterable<Slice> {
     if (type == Replica.Type.PULL) result = numPullReplicas;
     if (type == Replica.Type.TLOG) result = numTlogReplicas;
     return result == null ? def : result;
-
   }
 }
diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/SolrZkClient.java b/solr/solrj/src/java/org/apache/solr/common/cloud/SolrZkClient.java
index 7af119a..051870d 100644
--- a/solr/solrj/src/java/org/apache/solr/common/cloud/SolrZkClient.java
+++ b/solr/solrj/src/java/org/apache/solr/common/cloud/SolrZkClient.java
@@ -608,12 +608,7 @@ public class SolrZkClient implements Closeable {
     string.append(dent).append(path).append(" (").append(children.size()).append(")").append(NEWL);
     if (data != null) {
       String dataString = new String(data, StandardCharsets.UTF_8);
-      if ((!path.endsWith(".txt") && !path.endsWith(".xml")) || path.endsWith(ZkStateReader.CLUSTER_STATE)) {
-        if (path.endsWith(".xml")) {
-          // this is the cluster state in xml format - lets pretty print
-          dataString = prettyPrint(dataString);
-        }
-
+      if (!path.endsWith(".txt") && !path.endsWith(".xml")) {
         string.append(dent).append("DATA:\n").append(dent).append("    ").append(dataString.replaceAll("\n", "\n" + dent + "    ")).append(NEWL);
       } else {
         string.append(dent).append("DATA: ...supressed...").append(NEWL);
diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java b/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java
index 29074e8..a1d5531 100644
--- a/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java
+++ b/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java
@@ -29,7 +29,6 @@ import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
-import java.util.Objects;
 import java.util.Set;
 import java.util.SortedSet;
 import java.util.TreeSet;
@@ -70,8 +69,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import static java.util.Collections.EMPTY_MAP;
-import static java.util.Collections.emptyMap;
-import static java.util.Collections.emptySet;
 import static java.util.Collections.emptySortedSet;
 import static org.apache.solr.common.util.Utils.fromJSON;
 
@@ -109,7 +106,12 @@ public class ZkStateReader implements SolrCloseable {
   public static final String COLLECTIONS_ZKNODE = "/collections";
   public static final String LIVE_NODES_ZKNODE = "/live_nodes";
   public static final String ALIASES = "/aliases.json";
-  public static final String CLUSTER_STATE = "/clusterstate.json";
+  /**
+   * This ZooKeeper file is no longer used starting with Solr 9 but keeping the name around to check if it
+   * is still present and non empty (in case of upgrade from previous Solr version). It used to contain collection
+   * state for all collections in the cluster.
+   */
+  public static final String UNSUPPORTED_CLUSTER_STATE = "/clusterstate.json";
   public static final String CLUSTER_PROPS = "/clusterprops.json";
   public static final String COLLECTION_PROPS_ZKNODE = "collectionprops.json";
   public static final String REJOIN_AT_HEAD_PROP = "rejoinAtHead";
@@ -136,7 +138,6 @@ public class ZkStateReader implements SolrCloseable {
   public static final String CONFIGS_ZKNODE = "/configs";
   public final static String CONFIGNAME_PROP = "configName";
 
-  public static final String LEGACY_CLOUD = "legacyCloud";
   public static final String SAMPLE_PERCENTAGE = "samplePercentage";
 
   /**
@@ -152,7 +153,7 @@ public class ZkStateReader implements SolrCloseable {
   public static final String REPLICA_TYPE = "type";
 
   /**
-   * A view of the current state of all collections; combines all the different state sources into a single view.
+   * A view of the current state of all collections.
    */
   protected volatile ClusterState clusterState;
 
@@ -166,22 +167,12 @@ public class ZkStateReader implements SolrCloseable {
   public static final String ELECTION_NODE = "election";
 
   /**
-   * Collections tracked in the legacy (shared) state format, reflects the contents of clusterstate.json.
-   */
-  private Map<String, ClusterState.CollectionRef> legacyCollectionStates = emptyMap();
-
-  /**
-   * Last seen ZK version of clusterstate.json.
-   */
-  private int legacyClusterStateVersion = 0;
-
-  /**
-   * Collections with format2 state.json, "interesting" and actively watched.
+   * "Interesting" and actively watched Collections.
    */
   private final ConcurrentHashMap<String, DocCollection> watchedCollectionStates = new ConcurrentHashMap<>();
 
   /**
-   * Collections with format2 state.json, not "interesting" and not actively watched.
+   * "Interesting" but not actively watched Collections.
    */
   private final ConcurrentHashMap<String, LazyCollectionRef> lazyCollectionStates = new ConcurrentHashMap<>();
 
@@ -191,7 +182,7 @@ public class ZkStateReader implements SolrCloseable {
   private final ConcurrentHashMap<String, VersionedCollectionProps> watchedCollectionProps = new ConcurrentHashMap<>();
 
   /**
-   * Collection properties being actively watched
+   * Watchers of Collection properties
    */
   private final ConcurrentHashMap<String, PropsWatcher> collectionPropsWatchers = new ConcurrentHashMap<>();
 
@@ -273,7 +264,6 @@ public class ZkStateReader implements SolrCloseable {
   }
 
   public static final Set<String> KNOWN_CLUSTER_PROPS = Set.of(
-      LEGACY_CLOUD,
       URL_SCHEME,
       AUTO_ADD_REPLICAS,
       CoreAdminParams.BACKUP_LOCATION,
@@ -388,7 +378,6 @@ public class ZkStateReader implements SolrCloseable {
       // No need to set watchers because we should already have watchers registered for everything.
       refreshCollectionList(null);
       refreshLiveNodes(null);
-      refreshLegacyClusterState(null);
       // Need a copy so we don't delete from what we're iterating over.
       Collection<String> safeCopy = new ArrayList<>(watchedCollectionStates.keySet());
       Set<String> updatedCollections = new HashSet<>();
@@ -416,28 +405,21 @@ public class ZkStateReader implements SolrCloseable {
       }
 
       ClusterState.CollectionRef ref = clusterState.getCollectionRef(collection);
-      if (ref == null || legacyCollectionStates.containsKey(collection)) {
-        // We either don't know anything about this collection (maybe it's new?) or it's legacy.
-        // First update the legacy cluster state.
-        log.debug("Checking legacy cluster state for collection {}", collection);
-        refreshLegacyClusterState(null);
-        if (!legacyCollectionStates.containsKey(collection)) {
-          // No dice, see if a new collection just got created.
-          LazyCollectionRef tryLazyCollection = new LazyCollectionRef(collection);
-          if (tryLazyCollection.get() != null) {
-            // What do you know, it exists!
-            log.debug("Adding lazily-loaded reference for collection {}", collection);
-            lazyCollectionStates.putIfAbsent(collection, tryLazyCollection);
-            constructState(Collections.singleton(collection));
-          }
+      if (ref == null) {
+        // We either don't know anything about this collection (maybe it's new?).
+        // see if it just got created.
+        LazyCollectionRef tryLazyCollection = new LazyCollectionRef(collection);
+        if (tryLazyCollection.get() != null) {
+          // What do you know, it exists!
+          log.debug("Adding lazily-loaded reference for collection {}", collection);
+          lazyCollectionStates.putIfAbsent(collection, tryLazyCollection);
+          constructState(Collections.singleton(collection));
         }
       } else if (ref.isLazilyLoaded()) {
         log.debug("Refreshing lazily-loaded state for collection {}", collection);
         if (ref.get() != null) {
           return;
         }
-        // Edge case: if there's no external collection, try refreshing legacy cluster state in case it's there.
-        refreshLegacyClusterState(null);
       } else if (watchedCollectionStates.containsKey(collection)) {
         // Exists as a watched collection, force a refresh.
         log.debug("Forcing refresh of watched collection state for {}", collection);
@@ -446,10 +428,9 @@ public class ZkStateReader implements SolrCloseable {
           constructState(Collections.singleton(collection));
         }
       } else {
-        log.error("Collection {} is not lazy or watched!", collection);
+        log.error("Collection {} is not lazy nor watched!", collection);
       }
     }
-
   }
 
   /**
@@ -490,23 +471,15 @@ public class ZkStateReader implements SolrCloseable {
   }
 
   @SuppressWarnings({"unchecked"})
-  public synchronized void createClusterStateWatchersAndUpdate() throws KeeperException,
-      InterruptedException {
+  public synchronized void createClusterStateWatchersAndUpdate() throws KeeperException, InterruptedException {
     // We need to fetch the current cluster state and the set of live nodes
 
     log.debug("Updating cluster state from ZooKeeper... ");
 
-    // Sanity check ZK structure.
-    if (!zkClient.exists(CLUSTER_STATE, true)) {
-      throw new SolrException(ErrorCode.SERVICE_UNAVAILABLE,
-          "Cannot connect to cluster at " + zkClient.getZkServerAddress() + ": cluster not found/not ready");
-    }
-
     // on reconnect of SolrZkClient force refresh and re-add watches.
     loadClusterProperties();
     refreshLiveNodes(new LiveNodeWatcher());
-    refreshLegacyClusterState(new LegacyClusterStateWatcher());
-    refreshStateFormat2Collections();
+    refreshCollections();
     refreshCollectionList(new CollectionsChildWatcher());
     refreshAliases(aliasesManager);
 
@@ -582,13 +555,11 @@ public class ZkStateReader implements SolrCloseable {
 
     Set<String> liveNodes = this.liveNodes; // volatile read
 
-    // Legacy clusterstate is authoritative, for backwards compatibility.
-    // To move a collection's state to format2, first create the new state2 format node, then remove legacy entry.
-    Map<String, ClusterState.CollectionRef> result = new LinkedHashMap<>(legacyCollectionStates);
+    Map<String, ClusterState.CollectionRef> result = new LinkedHashMap<>();
 
-    // Add state format2 collections, but don't override legacy collection states.
+    // Add collections
     for (Map.Entry<String, DocCollection> entry : watchedCollectionStates.entrySet()) {
-      result.putIfAbsent(entry.getKey(), new ClusterState.CollectionRef(entry.getValue()));
+      result.put(entry.getKey(), new ClusterState.CollectionRef(entry.getValue()));
     }
 
     // Finally, add any lazy collections that aren't already accounted for.
@@ -596,11 +567,10 @@ public class ZkStateReader implements SolrCloseable {
       result.putIfAbsent(entry.getKey(), entry.getValue());
     }
 
-    this.clusterState = new ClusterState(liveNodes, result, legacyClusterStateVersion);
+    this.clusterState = new ClusterState(result, liveNodes);
 
     if (log.isDebugEnabled()) {
-      log.debug("clusterStateSet: legacy [{}] interesting [{}] watched [{}] lazy [{}] total [{}]",
-          legacyCollectionStates.keySet().size(),
+      log.debug("clusterStateSet: interesting [{}] watched [{}] lazy [{}] total [{}]",
           collectionWatches.keySet().size(),
           watchedCollectionStates.keySet().size(),
           lazyCollectionStates.keySet().size(),
@@ -608,8 +578,7 @@ public class ZkStateReader implements SolrCloseable {
     }
 
     if (log.isTraceEnabled()) {
-      log.trace("clusterStateSet: legacy [{}] interesting [{}] watched [{}] lazy [{}] total [{}]",
-          legacyCollectionStates.keySet(),
+      log.trace("clusterStateSet: interesting [{}] watched [{}] lazy [{}] total [{}]",
           collectionWatches.keySet(),
           watchedCollectionStates.keySet(),
           lazyCollectionStates.keySet(),
@@ -625,51 +594,9 @@ public class ZkStateReader implements SolrCloseable {
   }
 
   /**
-   * Refresh legacy (shared) clusterstate.json
-   */
-  private void refreshLegacyClusterState(Watcher watcher) throws KeeperException, InterruptedException {
-    try {
-      final Stat stat = new Stat();
-      final byte[] data = zkClient.getData(CLUSTER_STATE, watcher, stat, true);
-      final ClusterState loadedData = ClusterState.load(stat.getVersion(), data, emptySet(), CLUSTER_STATE);
-      synchronized (getUpdateLock()) {
-        if (this.legacyClusterStateVersion >= stat.getVersion()) {
-          // Nothing to do, someone else updated same or newer.
-          return;
-        }
-        Set<String> updatedCollections = new HashSet<>();
-        for (String coll : this.collectionWatches.keySet()) {
-          ClusterState.CollectionRef ref = this.legacyCollectionStates.get(coll);
-          // legacy collections are always in-memory
-          DocCollection oldState = ref == null ? null : ref.get();
-          ClusterState.CollectionRef newRef = loadedData.getCollectionStates().get(coll);
-          DocCollection newState = newRef == null ? null : newRef.get();
-          if (newState == null) {
-            // check that we haven't just migrated
-            newState = watchedCollectionStates.get(coll);
-          }
-          if (!Objects.equals(oldState, newState)) {
-            updatedCollections.add(coll);
-          }
-        }
-        this.legacyCollectionStates = loadedData.getCollectionStates();
-        this.legacyClusterStateVersion = stat.getVersion();
-        constructState(updatedCollections);
-      }
-    } catch (KeeperException.NoNodeException e) {
-      // Ignore missing legacy clusterstate.json.
-      synchronized (getUpdateLock()) {
-        this.legacyCollectionStates = emptyMap();
-        this.legacyClusterStateVersion = 0;
-        constructState(Collections.emptySet());
-      }
-    }
-  }
-
-  /**
-   * Refresh state format2 collections.
+   * Refresh collections.
    */
-  private void refreshStateFormat2Collections() {
+  private void refreshCollections() {
     for (String coll : collectionWatches.keySet()) {
       new StateWatcher(coll).refreshAndWatch();
     }
@@ -679,17 +606,7 @@ public class ZkStateReader implements SolrCloseable {
   private final Object refreshCollectionListLock = new Object();
 
   /**
-   * Search for any lazy-loadable state format2 collections.
-   * <p>
-   * A stateFormat=1 collection which is not interesting to us can also
-   * be put into the {@link #lazyCollectionStates} map here. But that is okay
-   * because {@link #constructState(Set)} will give priority to collections in the
-   * shared collection state over this map.
-   * In fact this is a clever way to avoid doing a ZK exists check on
-   * the /collections/collection_name/state.json znode
-   * Such an exists check is done in {@link ClusterState#hasCollection(String)} and
-   * {@link ClusterState#getCollectionsMap()} methods
-   * have a safeguard against exposing wrong collection names to the users
+   * Search for any lazy-loadable collections.
    */
   private void refreshCollectionList(Watcher watcher) throws KeeperException, InterruptedException {
     synchronized (refreshCollectionListLock) {
@@ -763,7 +680,6 @@ public class ZkStateReader implements SolrCloseable {
 
   private Set<String> getCurrentCollections() {
     Set<String> collections = new HashSet<>();
-    collections.addAll(legacyCollectionStates.keySet());
     collections.addAll(watchedCollectionStates.keySet());
     collections.addAll(lazyCollectionStates.keySet());
     return collections;
@@ -1358,44 +1274,6 @@ public class ZkStateReader implements SolrCloseable {
   }
 
   /**
-   * Watches the legacy clusterstate.json.
-   */
-  class LegacyClusterStateWatcher implements Watcher {
-
-    @Override
-    public void process(WatchedEvent event) {
-      // session events are not change events, and do not remove the watcher
-      if (EventType.None.equals(event.getType())) {
-        return;
-      }
-      int liveNodesSize = ZkStateReader.this.clusterState == null ? 0 : ZkStateReader.this.clusterState.getLiveNodes().size();
-      log.debug("A cluster state change: [{}], has occurred - updating... (live nodes size: [{}])", event, liveNodesSize);
-      refreshAndWatch();
-    }
-
-    /**
-     * Must hold {@link #getUpdateLock()} before calling this method.
-     */
-    public void refreshAndWatch() {
-      try {
-        refreshLegacyClusterState(this);
-      } catch (KeeperException.NoNodeException e) {
-        throw new SolrException(ErrorCode.SERVICE_UNAVAILABLE,
-            "Cannot connect to cluster at " + zkClient.getZkServerAddress() + ": cluster not found/not ready");
-      } catch (KeeperException.SessionExpiredException | KeeperException.ConnectionLossException e) {
-        log.warn("ZooKeeper watch triggered, but Solr cannot talk to ZK: [{}]", e.getMessage());
-      } catch (KeeperException e) {
-        log.error("A ZK error has occurred", e);
-        throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR, "A ZK error has occurred", e);
-      } catch (InterruptedException e) {
-        // Restore the interrupted status
-        Thread.currentThread().interrupt();
-        log.warn("Interrupted", e);
-      }
-    }
-  }
-
-  /**
    * Watches collection properties
    */
   class PropsWatcher implements Watcher {
@@ -1572,8 +1450,7 @@ public class ZkStateReader implements SolrCloseable {
       try {
         Stat stat = new Stat();
         byte[] data = zkClient.getData(collectionPath, watcher, stat, true);
-        ClusterState state = ClusterState.load(stat.getVersion(), data,
-            Collections.<String>emptySet(), collectionPath);
+        ClusterState state = ClusterState.createFromJson(stat.getVersion(), data, Collections.emptySet());
         ClusterState.CollectionRef collectionRef = state.getCollectionStates().get(coll);
         return collectionRef == null ? null : collectionRef.get();
       } catch (KeeperException.NoNodeException e) {
diff --git a/solr/solrj/src/java/org/apache/solr/common/params/CollectionParams.java b/solr/solrj/src/java/org/apache/solr/common/params/CollectionParams.java
index 3e8ee37..8e8a027 100644
--- a/solr/solrj/src/java/org/apache/solr/common/params/CollectionParams.java
+++ b/solr/solrj/src/java/org/apache/solr/common/params/CollectionParams.java
@@ -77,7 +77,7 @@ public interface CollectionParams {
    * <p>Some of these actions are also used over the cluster state update queue at <code>/overseer/queue</code> and have a
    * different (though related) meaning there. These actions are:
    * {@link #CREATE}, {@link #DELETE}, {@link #CREATESHARD}, {@link #DELETESHARD}, {@link #ADDREPLICA}, {@link #ADDREPLICAPROP},
-   * {@link #DELETEREPLICAPROP}, {@link #BALANCESHARDUNIQUE}, {@link #MODIFYCOLLECTION} and {@link #MIGRATESTATEFORMAT}.</p>
+   * {@link #DELETEREPLICAPROP}, {@link #BALANCESHARDUNIQUE} and {@link #MODIFYCOLLECTION}.</p>
    */
   enum CollectionAction {
     CREATE(true, LockLevel.COLLECTION),
@@ -112,7 +112,6 @@ public interface CollectionParams {
     BALANCESHARDUNIQUE(true, LockLevel.SHARD),
     REBALANCELEADERS(true, LockLevel.COLLECTION),
     MODIFYCOLLECTION(true, LockLevel.COLLECTION),
-    MIGRATESTATEFORMAT(true, LockLevel.CLUSTER),
     BACKUP(true, LockLevel.COLLECTION),
     RESTORE(true, LockLevel.COLLECTION),
     CREATESNAPSHOT(true, LockLevel.COLLECTION),
diff --git a/solr/solrj/src/resources/apispec/cluster.Commands.json b/solr/solrj/src/resources/apispec/cluster.Commands.json
index 069cd1d..b72b67c 100644
--- a/solr/solrj/src/resources/apispec/cluster.Commands.json
+++ b/solr/solrj/src/resources/apispec/cluster.Commands.json
@@ -75,9 +75,6 @@
       "documentation": "https://lucene.apache.org/solr/guide/cluster-node-management.html#clusterprop",
       "description": "Add, edit, or delete a cluster-wide property.",
       "properties": {
-        "legacyCloud": {
-          "type": "boolean"
-        },
         "urlScheme": {
           "type": "string"
         },
diff --git a/solr/solrj/src/resources/apispec/collections.collection.shards.shard.delete.json b/solr/solrj/src/resources/apispec/collections.collection.shards.shard.delete.json
index ae7c36a..50c1e3b 100644
--- a/solr/solrj/src/resources/apispec/collections.collection.shards.shard.delete.json
+++ b/solr/solrj/src/resources/apispec/collections.collection.shards.shard.delete.json
@@ -1,6 +1,6 @@
 {
   "documentation": "https://lucene.apache.org/solr/guide/shard-management.html#deleteshard",
-  "description": "Deletes a shard by unloading all replicas of the shard, removing it from clusterstate.json, and by default deleting the instanceDir and dataDir. Only inactive shards or those which have no range for custom sharding will be deleted.",
+  "description": "Deletes a shard by unloading all replicas of the shard, removing it from the collection's state.json, and by default deleting the instanceDir and dataDir. Only inactive shards or those which have no range for custom sharding will be deleted.",
   "methods": [
     "DELETE"
   ],
diff --git a/solr/solrj/src/resources/apispec/collections.collection.shards.shard.replica.delete.json b/solr/solrj/src/resources/apispec/collections.collection.shards.shard.replica.delete.json
index 2d4691d..16efecb 100644
--- a/solr/solrj/src/resources/apispec/collections.collection.shards.shard.replica.delete.json
+++ b/solr/solrj/src/resources/apispec/collections.collection.shards.shard.replica.delete.json
@@ -1,6 +1,6 @@
 {
   "documentation": "https://lucene.apache.org/solr/guide/replica-management.html#deletereplica",
-  "description": "Deletes a replica. If the responding node is up, the core is unloaded, the entry removed from clusterstate.json, and the instanceDir and dataDir removed. If the node is not up, the entry for the replica is removed from clusterstate.json; if the nodes comes up later, the replica is automatically de-registered.",
+  "description": "Deletes a replica. If the responding node is up, the core is unloaded, the entry removed from the collection's state.json, and the instanceDir and dataDir removed. If the node is not up, the entry for the replica is removed from state.json; if the nodes comes up later, the replica is automatically de-registered.",
   "methods": [
     "DELETE"
   ],
diff --git a/solr/solrj/src/test/org/apache/solr/client/solrj/cloud/autoscaling/TestPolicy.java b/solr/solrj/src/test/org/apache/solr/client/solrj/cloud/autoscaling/TestPolicy.java
index 0b3c31e..8e69147 100644
--- a/solr/solrj/src/test/org/apache/solr/client/solrj/cloud/autoscaling/TestPolicy.java
+++ b/solr/solrj/src/test/org/apache/solr/client/solrj/cloud/autoscaling/TestPolicy.java
@@ -82,7 +82,6 @@ import static org.apache.solr.client.solrj.cloud.autoscaling.TestPolicy2.loadFro
 import static org.apache.solr.client.solrj.cloud.autoscaling.Variable.Type.CORES;
 import static org.apache.solr.client.solrj.cloud.autoscaling.Variable.Type.FREEDISK;
 import static org.apache.solr.client.solrj.cloud.autoscaling.Variable.Type.REPLICA;
-import static org.apache.solr.common.cloud.ZkStateReader.CLUSTER_STATE;
 import static org.apache.solr.common.params.CollectionParams.CollectionAction.ADDREPLICA;
 import static org.apache.solr.common.params.CollectionParams.CollectionAction.MOVEREPLICA;
 
@@ -141,9 +140,9 @@ public class TestPolicy extends SolrTestCaseJ4 {
 
 
   public void testWithCollection() {
-    ClusterState clusterState = ClusterState.load(1,
+    ClusterState clusterState = ClusterState.createFromCollectionMap(1,
         (Map) loadFromResource("testWithCollection.json"),
-        ImmutableSet.of("node1", "node2", "node3", "node4", "node5"), CLUSTER_STATE);
+        ImmutableSet.of("node1", "node2", "node3", "node4", "node5"));
     DelegatingClusterStateProvider clusterStateProvider = new DelegatingClusterStateProvider(null) {
       @Override
       public ClusterState getClusterState() throws IOException {
@@ -233,9 +232,9 @@ public class TestPolicy extends SolrTestCaseJ4 {
 
   public void testWithCollectionSuggestions() {
     ClusterState clusterState =
-        ClusterState.load(1,
+        ClusterState.createFromCollectionMap(1,
             (Map) loadFromResource("testWithCollectionSuggestions.json"),
-            ImmutableSet.of("node1", "node2", "node3", "node4", "node5"), CLUSTER_STATE);
+            ImmutableSet.of("node1", "node2", "node3", "node4", "node5"));
     DelegatingClusterStateProvider clusterStateProvider = new DelegatingClusterStateProvider(null) {
       @Override
       public ClusterState getClusterState() throws IOException {
@@ -324,11 +323,9 @@ public class TestPolicy extends SolrTestCaseJ4 {
   }
 
   public void testWithCollectionMoveVsAddSuggestions() throws IOException {
-    ClusterState clusterState = ClusterState.load(1,
+    ClusterState clusterState = ClusterState.createFromCollectionMap(1,
         (Map) loadFromResource("testWithCollectionMoveVsAddSuggestions.json"),
-        ImmutableSet.of("node1", "node2", "node3", "node4", "node5", "node6"),
-        CLUSTER_STATE
-    );
+        ImmutableSet.of("node1", "node2", "node3", "node4", "node5", "node6"));
     DelegatingClusterStateProvider clusterStateProvider = new DelegatingClusterStateProvider(null) {
       @Override
       public ClusterState getClusterState() {
@@ -432,9 +429,9 @@ public class TestPolicy extends SolrTestCaseJ4 {
   }
 
   public void testWithCollectionMoveReplica() {
-    ClusterState clusterState = ClusterState.load(1,
+    ClusterState clusterState = ClusterState.createFromCollectionMap(1,
         (Map) loadFromResource("testWithCollectionMoveReplica.json"),
-        ImmutableSet.of("node2", "node3", "node4", "node5"), CLUSTER_STATE);
+        ImmutableSet.of("node2", "node3", "node4", "node5"));
     DelegatingClusterStateProvider clusterStateProvider = new DelegatingClusterStateProvider(null) {
       @Override
       public ClusterState getClusterState() throws IOException {
@@ -808,7 +805,7 @@ public class TestPolicy extends SolrTestCaseJ4 {
         "}";
 
 
-    ClusterState clusterState = ClusterState.load(1, clusterStateStr.getBytes(UTF_8),
+    ClusterState clusterState = ClusterState.createFromJson(1, clusterStateStr.getBytes(UTF_8),
         ImmutableSet.of("node1", "node2", "node3", "node4", "node5"));
     DelegatingClusterStateProvider clusterStateProvider = new DelegatingClusterStateProvider(null) {
       @Override
@@ -1190,7 +1187,7 @@ public class TestPolicy extends SolrTestCaseJ4 {
         return new DelegatingClusterStateProvider(null) {
           @Override
           public ClusterState getClusterState() throws IOException {
-            return ClusterState.load(0, new HashMap<>(), getLiveNodes(), CLUSTER_STATE);
+            return ClusterState.createFromCollectionMap(0, new HashMap<>(), getLiveNodes());
           }
 
           @Override
@@ -2937,8 +2934,8 @@ public class TestPolicy extends SolrTestCaseJ4 {
           }
 
           @Override
-          public ClusterState getClusterState() throws IOException {
-            return ClusterState.load(0, clusterState, getLiveNodes(), ZkStateReader.getCollectionPath("c1"));
+          public ClusterState getClusterState() {
+            return ClusterState.createFromCollectionMap(0, clusterState, getLiveNodes());
           }
 
           @Override
@@ -2988,7 +2985,7 @@ public class TestPolicy extends SolrTestCaseJ4 {
     Map clusterStateMap = (Map) m.remove("clusterstate");
     Map replicaInfoMap = (Map) m.remove("replicaInfo");
 
-    ClusterState clusterState = ClusterState.load(1, clusterStateMap, ImmutableSet.of("node1", "node2"), CLUSTER_STATE);
+    ClusterState clusterState = ClusterState.createFromCollectionMap(1, clusterStateMap, ImmutableSet.of("node1", "node2"));
 
     List<String> shards = Arrays.asList("shard1", "shard2", "shard3");
 
diff --git a/solr/solrj/src/test/org/apache/solr/client/solrj/cloud/autoscaling/TestPolicy2.java b/solr/solrj/src/test/org/apache/solr/client/solrj/cloud/autoscaling/TestPolicy2.java
index 63b7da4..29605b5 100644
--- a/solr/solrj/src/test/org/apache/solr/client/solrj/cloud/autoscaling/TestPolicy2.java
+++ b/solr/solrj/src/test/org/apache/solr/client/solrj/cloud/autoscaling/TestPolicy2.java
@@ -177,7 +177,7 @@ public class TestPolicy2 extends SolrTestCaseJ4 {
   static SolrCloudManager createCloudManager(Map m, Map meta) {
     Map nodeVals = (Map) meta.get("nodeValues");
     List<Map> replicaVals = (List<Map>) meta.get("replicaValues");
-    ClusterState clusterState = ClusterState.load(0, m, Collections.emptySet(), null);
+    ClusterState clusterState = ClusterState.createFromCollectionMap(0, m, Collections.emptySet());
     Map<String, AtomicInteger> coreCount = new LinkedHashMap<>();
     Set<String> nodes = new HashSet<>(nodeVals.keySet());
     clusterState.getCollectionStates().forEach((s, collectionRef) -> collectionRef.get()
@@ -307,7 +307,7 @@ public class TestPolicy2 extends SolrTestCaseJ4 {
         if (clusterState == null) {
           Map map = (Map) getObjectByPath(m, false, "cluster/collections");
           if (map == null) map = new HashMap<>();
-          clusterState = ClusterState.load(0, map, liveNodes, "/clusterstate.json");
+          clusterState = ClusterState.createFromCollectionMap(0, map, liveNodes);
         }
 
         return new DelegatingClusterStateProvider(null) {
diff --git a/solr/solrj/src/test/org/apache/solr/client/solrj/impl/CloudSolrClientCacheTest.java b/solr/solrj/src/test/org/apache/solr/client/solrj/impl/CloudSolrClientCacheTest.java
index 92c5c62..3b1d262 100644
--- a/solr/solrj/src/test/org/apache/solr/client/solrj/impl/CloudSolrClientCacheTest.java
+++ b/solr/solrj/src/test/org/apache/solr/client/solrj/impl/CloudSolrClientCacheTest.java
@@ -84,8 +84,7 @@ public class CloudSolrClientCacheTest extends SolrTestCaseJ4 {
         .withLBHttpSolrClient(mockLbclient)
         .build()) {
       livenodes.addAll(ImmutableSet.of("192.168.1.108:7574_solr", "192.168.1.108:8983_solr"));
-      ClusterState cs = ClusterState.load(1, coll1State.getBytes(UTF_8),
-          Collections.emptySet(), "/collections/gettingstarted/state.json");
+      ClusterState cs = ClusterState.createFromJson(1, coll1State.getBytes(UTF_8), Collections.emptySet());
       refs.put(collName, new Ref(collName));
       colls.put(collName, cs.getCollectionOrNull(collName));
       responses.put("request", o -> {
diff --git a/solr/solrj/src/test/org/apache/solr/common/cloud/TestCloudCollectionsListeners.java b/solr/solrj/src/test/org/apache/solr/common/cloud/TestCloudCollectionsListeners.java
index 1ef806e..b0de383 100644
--- a/solr/solrj/src/test/org/apache/solr/common/cloud/TestCloudCollectionsListeners.java
+++ b/solr/solrj/src/test/org/apache/solr/common/cloud/TestCloudCollectionsListeners.java
@@ -193,114 +193,4 @@ public class TestCloudCollectionsListeners extends SolrCloudTestCase {
 
     client.getZkStateReader().removeCloudCollectionsListener(watcher1);
   }
-
-  @Test
-  // commented out on: 24-Dec-2018   @BadApple(bugUrl="https://issues.apache.org/jira/browse/SOLR-12028") // added 17-Aug-2018
-  public void testWatchesWorkForBothStateFormats() throws Exception {
-    CloudSolrClient client = cluster.getSolrClient();
-
-    Map<Integer, Set<String>> oldResults = new HashMap<>();
-    Map<Integer, Set<String>> newResults = new HashMap<>();
-
-    CloudCollectionsListener watcher1 = (oldCollections, newCollections) -> {
-      log.info("New set of collections: {}, {}", oldCollections, newCollections);
-      oldResults.put(1, oldCollections);
-      newResults.put(1, newCollections);
-    };
-    client.getZkStateReader().registerCloudCollectionsListener(watcher1);
-    CloudCollectionsListener watcher2 = (oldCollections, newCollections) -> {
-      log.info("New set of collections: {}, {}", oldCollections, newCollections);
-      oldResults.put(2, oldCollections);
-      newResults.put(2, newCollections);
-    };
-    client.getZkStateReader().registerCloudCollectionsListener(watcher2);
-
-    assertEquals("CloudCollectionsListener has old collections with size > 0 after registration", 0, oldResults.get(1).size());
-    assertEquals("CloudCollectionsListener has old collections with size > 0 after registration", 0, oldResults.get(2).size());
-    assertEquals("CloudCollectionsListener has new collections with size > 0 after registration", 0, newResults.get(1).size());
-    assertEquals("CloudCollectionsListener has new collections with size > 0 after registration", 0, newResults.get(2).size());
-
-    // Creating old state format collection
-
-    CollectionAdminRequest.createCollection("testcollection1", "config", 4, 1)
-        .setStateFormat(1)
-        .processAndWait(client, MAX_WAIT_TIMEOUT);
-    cluster.waitForActiveCollection("testcollection1", 4, 4);
-
-    assertEquals("CloudCollectionsListener has old collections with size > 0 after collection created with old stateFormat", 0, oldResults.get(1).size());
-    assertEquals("CloudCollectionsListener has old collections with size > 0 after collection created with old stateFormat", 0, oldResults.get(2).size());
-    assertEquals("CloudCollectionsListener not updated with created collection with old stateFormat", 1, newResults.get(1).size());
-    assertTrue("CloudCollectionsListener not updated with created collection with old stateFormat", newResults.get(1).contains("testcollection1"));
-    assertEquals("CloudCollectionsListener not updated with created collection with old stateFormat", 1, newResults.get(2).size());
-    assertTrue("CloudCollectionsListener not updated with created collection with old stateFormat", newResults.get(2).contains("testcollection1"));
-
-    // Creating new state format collection
-
-    CollectionAdminRequest.createCollection("testcollection2", "config", 4, 1)
-        .processAndWait(client, MAX_WAIT_TIMEOUT);
-    cluster.waitForActiveCollection("testcollection2", 4, 4);
-
-    assertEquals("CloudCollectionsListener has incorrect old collections after collection created with new stateFormat", 1, oldResults.get(1).size());
-    assertEquals("CloudCollectionsListener has incorrect old collections after collection created with new stateFormat", 1, oldResults.get(2).size());
-    assertEquals("CloudCollectionsListener not updated with created collection with new stateFormat", 2, newResults.get(1).size());
-    assertTrue("CloudCollectionsListener not updated with created collection with new stateFormat", newResults.get(1).contains("testcollection2"));
-    assertEquals("CloudCollectionsListener not updated with created collection with new stateFormat", 2, newResults.get(2).size());
-    assertTrue("CloudCollectionsListener not updated with created collection with new stateFormat", newResults.get(2).contains("testcollection2"));
-
-    client.getZkStateReader().removeCloudCollectionsListener(watcher2);
-
-    // Creating old state format collection
-
-    CollectionAdminRequest.createCollection("testcollection3", "config", 4, 1)
-        .setStateFormat(1)
-        .processAndWait(client, MAX_WAIT_TIMEOUT);
-    cluster.waitForActiveCollection("testcollection3", 4, 4);
-
-    assertEquals("CloudCollectionsListener has incorrect old collections after collection created with old stateFormat", 2, oldResults.get(1).size());
-    assertEquals("CloudCollectionsListener updated after removal", 1, oldResults.get(2).size());
-    assertEquals("CloudCollectionsListener not updated with created collection with old stateFormat", 3, newResults.get(1).size());
-    assertTrue("CloudCollectionsListener not updated with created collection with old stateFormat", newResults.get(1).contains("testcollection3"));
-    assertEquals("CloudCollectionsListener updated after removal", 2, newResults.get(2).size());
-    assertFalse("CloudCollectionsListener updated after removal", newResults.get(2).contains("testcollection3"));
-
-    // Adding back listener
-    client.getZkStateReader().registerCloudCollectionsListener(watcher2);
-
-    assertEquals("CloudCollectionsListener has old collections after registration", 0, oldResults.get(2).size());
-    assertEquals("CloudCollectionsListener doesn't have all collections after registration", 3, newResults.get(2).size());
-
-    // Deleting old state format collection
-
-    CollectionAdminRequest.deleteCollection("testcollection1").processAndWait(client, MAX_WAIT_TIMEOUT);
-
-    assertEquals("CloudCollectionsListener doesn't have all old collections after collection removal", 3, oldResults.get(1).size());
-    assertEquals("CloudCollectionsListener doesn't have all old collections after collection removal", 3, oldResults.get(2).size());
-    assertEquals("CloudCollectionsListener doesn't have correct new collections after collection removal", 2, newResults.get(1).size());
-    assertEquals("CloudCollectionsListener doesn't have correct new collections after collection removal", 2, newResults.get(2).size());
-    assertFalse("CloudCollectionsListener not updated with deleted collection with old stateFormat", newResults.get(1).contains("testcollection1"));
-    assertFalse("CloudCollectionsListener not updated with deleted collection with old stateFormat", newResults.get(2).contains("testcollection1"));
-
-    CollectionAdminRequest.deleteCollection("testcollection2").processAndWait(client, MAX_WAIT_TIMEOUT);
-
-    assertEquals("CloudCollectionsListener doesn't have all old collections after collection removal", 2, oldResults.get(1).size());
-    assertEquals("CloudCollectionsListener doesn't have all old collections after collection removal", 2, oldResults.get(2).size());
-    assertEquals("CloudCollectionsListener doesn't have correct new collections after collection removal", 1, newResults.get(1).size());
-    assertEquals("CloudCollectionsListener doesn't have correct new collections after collection removal", 1, newResults.get(2).size());
-    assertFalse("CloudCollectionsListener not updated with deleted collection with new stateFormat", newResults.get(1).contains("testcollection2"));
-    assertFalse("CloudCollectionsListener not updated with deleted collection with new stateFormat", newResults.get(2).contains("testcollection2"));
-
-    client.getZkStateReader().removeCloudCollectionsListener(watcher1);
-
-    CollectionAdminRequest.deleteCollection("testcollection3").processAndWait(client, MAX_WAIT_TIMEOUT);
-
-    assertEquals("CloudCollectionsListener updated after removal", 2, oldResults.get(1).size());
-    assertEquals("CloudCollectionsListener doesn't have all old collections after collection removal", 1, oldResults.get(2).size());
-    assertEquals("CloudCollectionsListener updated after removal", 1, newResults.get(1).size());
-    assertEquals("CloudCollectionsListener doesn't have correct new collections after collection removal", 0, newResults.get(2).size());
-    assertTrue("CloudCollectionsListener updated after removal", newResults.get(1).contains("testcollection3"));
-    assertFalse("CloudCollectionsListener not updated with deleted collection with old stateFormat", newResults.get(2).contains("testcollection3"));
-
-    client.getZkStateReader().removeCloudCollectionsListener(watcher2);
-  }
-
 }
diff --git a/solr/solrj/src/test/org/apache/solr/common/cloud/TestCollectionStateWatchers.java b/solr/solrj/src/test/org/apache/solr/common/cloud/TestCollectionStateWatchers.java
index ab3dc95..8c19f3e 100644
--- a/solr/solrj/src/test/org/apache/solr/common/cloud/TestCollectionStateWatchers.java
+++ b/solr/solrj/src/test/org/apache/solr/common/cloud/TestCollectionStateWatchers.java
@@ -337,27 +337,4 @@ public class TestCollectionStateWatchers extends SolrCloudTestCase {
             () -> client.getZkStateReader().getStateWatchers("test_collection").size() == 0);
     
   }
-
-  @Test
-  public void testWatchesWorkForStateFormat1() throws Exception {
-
-    final CloudSolrClient client = cluster.getSolrClient();
-
-    Future<Boolean> future = waitInBackground("stateformat1", MAX_WAIT_TIMEOUT, TimeUnit.SECONDS,
-                                              (n, c) -> DocCollection.isFullyActive(n, c, 1, 1));
-
-    CollectionAdminRequest.createCollection("stateformat1", "config", 1, 1).setStateFormat(1)
-      .processAndWait(client, MAX_WAIT_TIMEOUT);
-    assertTrue("CollectionStateWatcher not notified of stateformat=1 collection creation",
-               future.get());
-
-    Future<Boolean> migrated = waitInBackground("stateformat1", MAX_WAIT_TIMEOUT, TimeUnit.SECONDS,
-                                                (n, c) -> c != null && c.getStateFormat() == 2);
-
-    CollectionAdminRequest.migrateCollectionFormat("stateformat1")
-      .processAndWait(client, MAX_WAIT_TIMEOUT);
-    assertTrue("CollectionStateWatcher did not persist over state format migration", migrated.get());
-
-  }
-
 }
diff --git a/solr/solrj/src/test/org/apache/solr/common/cloud/TestDocCollectionWatcher.java b/solr/solrj/src/test/org/apache/solr/common/cloud/TestDocCollectionWatcher.java
index 22d687e..f22c7cd 100644
--- a/solr/solrj/src/test/org/apache/solr/common/cloud/TestDocCollectionWatcher.java
+++ b/solr/solrj/src/test/org/apache/solr/common/cloud/TestDocCollectionWatcher.java
@@ -264,30 +264,4 @@ public class TestDocCollectionWatcher extends SolrCloudTestCase {
 
     assertTrue("DocCollectionWatcher not notified of delete call", future.get());
   }
-  
-  @Test
-  public void testWatchesWorkForStateFormat1() throws Exception {
-
-    final CloudSolrClient client = cluster.getSolrClient();
-
-    Future<Boolean> future = waitInBackground("stateformat1", MAX_WAIT_TIMEOUT, TimeUnit.SECONDS,
-                                              (c) -> (null != c) );
-
-    CollectionAdminRequest.createCollection("stateformat1", "config", 1, 1).setStateFormat(1)
-      .processAndWait(client, MAX_WAIT_TIMEOUT);
-    client.waitForState("stateformat1", MAX_WAIT_TIMEOUT, TimeUnit.SECONDS,
-                         (n, c) -> DocCollection.isFullyActive(n, c, 1, 1));
-    
-    assertTrue("DocCollectionWatcher not notified of stateformat=1 collection creation",
-               future.get());
-
-    Future<Boolean> migrated = waitInBackground("stateformat1", MAX_WAIT_TIMEOUT, TimeUnit.SECONDS,
-                                                (c) -> c != null && c.getStateFormat() == 2);
-
-    CollectionAdminRequest.migrateCollectionFormat("stateformat1")
-      .processAndWait(client, MAX_WAIT_TIMEOUT);
-    assertTrue("DocCollectionWatcher did not persist over state format migration", migrated.get());
-
-  }
-
 }
diff --git a/solr/test-framework/src/java/org/apache/solr/cloud/AbstractFullDistribZkTestBase.java b/solr/test-framework/src/java/org/apache/solr/cloud/AbstractFullDistribZkTestBase.java
index cadf36a..24c1fff 100644
--- a/solr/test-framework/src/java/org/apache/solr/cloud/AbstractFullDistribZkTestBase.java
+++ b/solr/test-framework/src/java/org/apache/solr/cloud/AbstractFullDistribZkTestBase.java
@@ -383,16 +383,6 @@ public abstract class AbstractFullDistribZkTestBase extends AbstractDistribZkTes
     }
   }
 
-  protected String defaultStateFormat = String.valueOf( 1 + random().nextInt(2));
-
-  protected String getStateFormat()  {
-    String stateFormat = System.getProperty("tests.solr.stateFormat", null);
-    if (stateFormat != null)  {
-      defaultStateFormat = stateFormat;
-    }
-    return defaultStateFormat; // random
-  }
-
   protected List<JettySolrRunner> createJettys(int numJettys) throws Exception {
     List<JettySolrRunner> jettys = Collections.synchronizedList(new ArrayList<>());
     List<SolrClient> clients = Collections.synchronizedList(new ArrayList<>());
@@ -408,7 +398,6 @@ public abstract class AbstractFullDistribZkTestBase extends AbstractDistribZkTes
     // jetty instances are started)
     assertEquals(0, CollectionAdminRequest
                  .createCollection(DEFAULT_COLLECTION, "conf1", sliceCount, 1) // not real rep factor!
-                 .setStateFormat(Integer.parseInt(getStateFormat()))
                  .setCreateNodeSet("") // empty node set prevents creation of cores
                  .process(cloudClient).getStatus());
     
@@ -1812,10 +1801,6 @@ public abstract class AbstractFullDistribZkTestBase extends AbstractDistribZkTes
       collectionInfos.put(collectionName, list);
     }
     params.set("name", collectionName);
-    if ("1".equals(getStateFormat()) ) {
-      log.info("Creating collection with stateFormat=1: {}", collectionName);
-      params.set(DocCollection.STATE_FORMAT, "1");
-    }
     SolrRequest request = new QueryRequest(params);
     request.setPath("/admin/collections");
 
diff --git a/solr/test-framework/src/java/org/apache/solr/cloud/ZkTestServer.java b/solr/test-framework/src/java/org/apache/solr/cloud/ZkTestServer.java
index b1af9d9..6f68499 100644
--- a/solr/test-framework/src/java/org/apache/solr/cloud/ZkTestServer.java
+++ b/solr/test-framework/src/java/org/apache/solr/cloud/ZkTestServer.java
@@ -21,7 +21,6 @@ import com.google.common.util.concurrent.AtomicLongMap;
 import org.apache.solr.SolrTestCaseJ4;
 import org.apache.solr.common.cloud.SolrZkClient;
 import org.apache.solr.common.cloud.ZkNodeProps;
-import org.apache.solr.common.cloud.ZkStateReader;
 import org.apache.solr.common.util.IOUtils;
 import org.apache.solr.common.util.ObjectReleaseTracker;
 import org.apache.solr.common.util.TimeSource;
@@ -846,9 +845,6 @@ public class ZkTestServer {
     ops.add(Op.create(path, null, chRootClient.getZkACLProvider().getACLsToAdd(path),  CreateMode.PERSISTENT));
     chRootClient.multi(ops, true);
 
-    // this workaround is acceptable until we remove legacyCloud because we just init a single core here
-    String defaultClusterProps = "{\""+ZkStateReader.LEGACY_CLOUD+"\":\"true\"}";
-    chRootClient.makePath(ZkStateReader.CLUSTER_PROPS, defaultClusterProps.getBytes(StandardCharsets.UTF_8), CreateMode.PERSISTENT, true);
     // for now, always upload the config and schema to the canonical names
     putConfig("conf1", chRootClient, solrhome, config, "solrconfig.xml");
     putConfig("conf1", chRootClient, solrhome, schema, "schema.xml");