You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by is...@apache.org on 2020/09/14 09:41:50 UTC

[lucene-solr] 03/03: SOLR-12823: remove /clusterstate.json (#1528)

This is an automated email from the ASF dual-hosted git repository.

ishan pushed a commit to branch jira/solr-14830-legacy-removal
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git

commit c4ff30ccc24d92bb96b031f764f0b4c01b898d41
Author: Ishan Chattopadhyaya <is...@apache.org>
AuthorDate: Mon Sep 14 15:09:58 2020 +0530

    SOLR-12823: remove /clusterstate.json (#1528)
    
    Original commit on master:
    
    commit 1ab9b811c65abb3d1a827c87b4f1135116ff90eb
    Author: murblanc <43...@users.noreply.github.com>
    Date:   Tue Jun 9 20:59:17 2020 +0200
    
        SOLR-12823: remove /clusterstate.json (#1528)
---
 solr/CHANGES.txt                                   |   3 +
 .../src/java/org/apache/solr/cloud/Overseer.java   |  14 +-
 .../java/org/apache/solr/cloud/ZkController.java   | 133 ++++++++------
 .../solr/cloud/api/collections/AddReplicaCmd.java  |  47 ++---
 .../solr/cloud/api/collections/BackupCmd.java      |   4 +-
 .../cloud/api/collections/CreateCollectionCmd.java |  98 +++++-----
 .../OverseerCollectionMessageHandler.java          |  46 +----
 .../api/collections/ReindexCollectionCmd.java      |   2 -
 .../solr/cloud/api/collections/RestoreCmd.java     |  12 +-
 .../autoscaling/sim/SimClusterStateProvider.java   |   4 +-
 .../sim/SnapshotClusterStateProvider.java          |  12 +-
 .../solr/cloud/overseer/ClusterStateMutator.java   |  17 +-
 .../solr/cloud/overseer/CollectionMutator.java     |   2 +-
 .../apache/solr/cloud/overseer/ReplicaMutator.java |  44 ++---
 .../apache/solr/cloud/overseer/ZkStateWriter.java  |  21 ++-
 .../java/org/apache/solr/core/CoreContainer.java   |   1 +
 .../org/apache/solr/core/backup/BackupManager.java |   2 +-
 .../apache/solr/handler/admin/ClusterStatus.java   |  32 +---
 .../org/apache/solr/handler/admin/ColStatus.java   |   1 -
 .../solr/handler/admin/CollectionsHandler.java     |  10 +-
 .../solr/handler/admin/ZookeeperInfoHandler.java   | 200 ++++++++++-----------
 .../src/java/org/apache/solr/util/SolrCLI.java     |   1 -
 .../apache/solr/cloud/ClusterStateMockUtil.java    |   2 +-
 .../org/apache/solr/cloud/ClusterStateTest.java    |   8 +-
 .../org/apache/solr/cloud/CollectionPropsTest.java |   4 -
 ...mat2Test.java => CollectionStateZnodeTest.java} |   5 +-
 .../apache/solr/cloud/CollectionsAPISolrJTest.java |  42 ++---
 .../apache/solr/cloud/CreateRoutedAliasTest.java   |   2 -
 .../solr/cloud/DeleteInactiveReplicaTest.java      |   2 -
 .../org/apache/solr/cloud/DeleteReplicaTest.java   |  36 +---
 .../solr/cloud/LegacyCloudClusterPropTest.java     | 180 -------------------
 .../org/apache/solr/cloud/MigrateRouteKeyTest.java |   5 -
 .../OverseerCollectionConfigSetProcessorTest.java  |   2 -
 .../test/org/apache/solr/cloud/OverseerTest.java   | 153 ++++++++--------
 .../apache/solr/cloud/ShardRoutingCustomTest.java  |   3 -
 .../cloud/SharedFSAutoReplicaFailoverTest.java     |   5 -
 .../test/org/apache/solr/cloud/SliceStateTest.java |   4 +-
 .../apache/solr/cloud/TestClusterProperties.java   |  10 +-
 .../org/apache/solr/cloud/TestPullReplica.java     |   5 -
 .../solr/cloud/TestPullReplicaErrorHandling.java   |  15 --
 .../org/apache/solr/cloud/TestTlogReplica.java     |   5 -
 .../test/org/apache/solr/cloud/TestZkChroot.java   | 157 ----------------
 .../org/apache/solr/cloud/ZkControllerTest.java    |   3 +-
 .../AbstractCloudBackupRestoreTestCase.java        |   2 -
 .../CollectionsAPIAsyncDistributedZkTest.java      |  17 +-
 .../solr/cloud/api/collections/ShardSplitTest.java |  15 --
 .../SimpleCollectionCreateDeleteTest.java          |   6 +-
 .../cloud/api/collections/TestCollectionAPI.java   |  29 +--
 .../cloud/overseer/TestClusterStateMutator.java    |   4 +-
 .../overseer/ZkCollectionPropsCachingTest.java     |   4 -
 .../solr/cloud/overseer/ZkStateReaderTest.java     | 110 +-----------
 .../solr/cloud/overseer/ZkStateWriterTest.java     | 148 ++-------------
 .../core/snapshots/TestSolrCloudSnapshots.java     |   2 +-
 .../solr/handler/admin/TestCollectionAPIs.java     |   8 +-
 .../component/TestHttpShardHandlerFactory.java     |   2 +-
 .../src/test/org/apache/solr/util/TestUtils.java   |   2 +-
 .../src/cluster-node-management.adoc               |  21 +--
 solr/solr-ref-guide/src/collection-management.adoc |   2 -
 .../src/major-changes-in-solr-9.adoc               |  10 ++
 .../src/rule-based-replica-placement.adoc          |   2 +-
 solr/solr-ref-guide/src/shard-management.adoc      |   2 +-
 .../client/solrj/cloud/autoscaling/Policy.java     |   8 +-
 .../client/solrj/impl/BaseCloudSolrClient.java     |  23 ++-
 .../solrj/impl/BaseHttpClusterStateProvider.java   |   3 +-
 .../solrj/request/CollectionAdminRequest.java      |  36 ----
 .../org/apache/solr/common/cloud/ClusterState.java |  34 ++--
 .../apache/solr/common/cloud/DocCollection.java    |  18 +-
 .../org/apache/solr/common/cloud/SolrZkClient.java |  10 +-
 .../apache/solr/common/cloud/ZkStateReader.java    |  46 ++---
 .../solr/common/params/CollectionParams.java       |   3 +-
 .../src/resources/apispec/cluster.Commands.json    |   3 -
 ...collections.collection.shards.shard.delete.json |   2 +-
 ...ons.collection.shards.shard.replica.delete.json |   2 +-
 .../client/solrj/cloud/autoscaling/TestPolicy.java |  29 ++-
 .../solrj/cloud/autoscaling/TestPolicy2.java       |   4 +-
 .../solrj/impl/CloudSolrClientCacheTest.java       |   3 +-
 .../cloud/TestCloudCollectionsListeners.java       | 110 ------------
 .../common/cloud/TestCollectionStateWatchers.java  |  20 ---
 .../common/cloud/TestDocCollectionWatcher.java     |  23 ---
 .../solr/cloud/AbstractFullDistribZkTestBase.java  |  15 --
 .../java/org/apache/solr/cloud/ZkTestServer.java   |   4 +-
 81 files changed, 549 insertions(+), 1587 deletions(-)

diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt
index b65b558..a02bd07 100644
--- a/solr/CHANGES.txt
+++ b/solr/CHANGES.txt
@@ -74,6 +74,9 @@ Other Changes
 * SOLR-14486: Autoscaling simulation framework no longer creates /clusterstate.json (format 1),
   instead it creates individual per-collection /state.json files (format 2). (ab)
 
+ * SOLR-12823: Remove /clusterstate.json support: support for collections created with stateFormat=1,
+   as well as support for Collection API MIGRATESTATEFORMAT action and support for the legacyCloud flag (Ilan Ginzburg).
+
 ==================  8.6.0 ==================
 
 Consult the LUCENE_CHANGES.txt file for additional, low level, changes in this release.
diff --git a/solr/core/src/java/org/apache/solr/cloud/Overseer.java b/solr/core/src/java/org/apache/solr/cloud/Overseer.java
index 0de28c8..52b7d78 100644
--- a/solr/core/src/java/org/apache/solr/cloud/Overseer.java
+++ b/solr/core/src/java/org/apache/solr/cloud/Overseer.java
@@ -20,7 +20,7 @@ import org.apache.lucene.util.Version;
 import org.apache.solr.client.solrj.SolrServerException;
 import org.apache.solr.client.solrj.cloud.SolrCloudManager;
 import org.apache.solr.client.solrj.impl.CloudHttp2SolrClient;
-import org.apache.solr.client.solrj.impl.ClusterStateProvider;
+import org.apache.solr.client.solrj.impl.CloudSolrClient;
 import org.apache.solr.client.solrj.request.CollectionAdminRequest;
 import org.apache.solr.client.solrj.response.CollectionAdminResponse;
 import org.apache.solr.cloud.api.collections.CreateCollectionCmd;
@@ -479,8 +479,6 @@ public class Overseer implements SolrCloseable {
             CollectionsHandler.verifyRuleParams(zkController.getCoreContainer(), message.getProperties());
             ZkWriteCommand zkwrite = new CollectionMutator(getSolrCloudManager()).modifyCollection(clusterState, message);
             return Collections.singletonList(zkwrite);
-          case MIGRATESTATEFORMAT:
-            return Collections.singletonList(new ClusterStateMutator(getSolrCloudManager()).migrateStateFormat(clusterState, message));
           default:
             throw new RuntimeException("unknown operation:" + operation
                     + " contents:" + message.getProperties());
@@ -1079,16 +1077,6 @@ public class Overseer implements SolrCloseable {
     return getCollectionQueue(zkClient, zkStats);
   }
   
-  public static boolean isLegacy(ZkStateReader stateReader) {
-    String legacyProperty = stateReader.getClusterProperty(ZkStateReader.LEGACY_CLOUD, "false");
-    return "true".equals(legacyProperty);
-  }
-
-  public static boolean isLegacy(ClusterStateProvider clusterStateProvider) {
-    String legacyProperty = clusterStateProvider.getClusterProperty(ZkStateReader.LEGACY_CLOUD, "false");
-    return "true".equals(legacyProperty);
-  }
-
   public ZkStateReader getZkStateReader() {
     return reader;
   }
diff --git a/solr/core/src/java/org/apache/solr/cloud/ZkController.java b/solr/core/src/java/org/apache/solr/cloud/ZkController.java
index 82d1df3..cde88ec 100644
--- a/solr/core/src/java/org/apache/solr/cloud/ZkController.java
+++ b/solr/core/src/java/org/apache/solr/cloud/ZkController.java
@@ -113,6 +113,7 @@ import java.net.UnknownHostException;
 import java.nio.charset.StandardCharsets;
 import java.nio.file.Path;
 import java.nio.file.Paths;
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Enumeration;
@@ -553,6 +554,41 @@ public class ZkController implements Closeable {
     init();
   }
 
+  /**
+   * <p>Verifies if /clusterstate.json exists in Zookeepeer, and if it does and is not empty, refuses to start and outputs
+   * a helpful message regarding collection migration.</p>
+   *
+   * <p>If /clusterstate.json exists and is empty, it is removed.</p>
+   */
+  private void checkNoOldClusterstate(final SolrZkClient zkClient) throws InterruptedException {
+    try {
+      if (!zkClient.exists(ZkStateReader.UNSUPPORTED_CLUSTER_STATE)) {
+        return;
+      }
+
+      final byte[] data = zkClient.getData(ZkStateReader.UNSUPPORTED_CLUSTER_STATE, null, null);
+
+      if (Arrays.equals("{}".getBytes(StandardCharsets.UTF_8), data)) {
+        // Empty json. This log will only occur once.
+        log.warn("{} no longer supported starting with Solr 9. Found empty file on Zookeeper, deleting it.", ZkStateReader.UNSUPPORTED_CLUSTER_STATE);
+        zkClient.delete(ZkStateReader.UNSUPPORTED_CLUSTER_STATE, -1);
+      } else {
+        // /clusterstate.json not empty: refuse to start but do not automatically delete. A bit of a pain but user shouldn't
+        // have older collections at this stage anyway.
+        String message = ZkStateReader.UNSUPPORTED_CLUSTER_STATE + " no longer supported starting with Solr 9. "
+            + "It is present and not empty. Cannot start Solr. Please first migrate collections to stateFormat=2 using an "
+            + "older version of Solr or if you don't care about the data then delete the file from "
+            + "Zookeeper using a command line tool, for example: bin/solr zk rm /clusterstate.json -z host:port";
+        log.error(message);
+        throw new SolrException(SolrException.ErrorCode.INVALID_STATE, message);
+      }
+    } catch (KeeperException e) {
+      // Convert checked exception to one acceptable by the caller (see also init() further down)
+      log.error("", e);
+      throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR, "", e);
+    }
+  }
+
   public int getLeaderVoteWait() {
     return leaderVoteWait;
   }
@@ -864,7 +900,6 @@ public class ZkController implements Closeable {
     paths.put("/autoscaling/events/.scheduled_maintenance", null);
     paths.put("/autoscaling/events/.auto_add_replicas", null);
 //
-    paths.put(ZkStateReader.CLUSTER_STATE, emptyJson);
     //   operations.add(zkClient.createPathOp(ZkStateReader.CLUSTER_PROPS, emptyJson));
     paths.put(ZkStateReader.SOLR_PKGS_PATH, null);
     paths.put(ZkStateReader.ROLES, emptyJson);
@@ -1485,7 +1520,7 @@ public class ZkController implements Closeable {
       log.info("Register SolrCore, baseUrl={} collection={}, shard={} coreNodeName={}", baseUrl, collection, shardId, coreZkNodeName);
       // check replica's existence in clusterstate first
       try {
-        zkStateReader.waitForState(collection, Overseer.isLegacy(zkStateReader) ? 10000 : 10000,
+        zkStateReader.waitForState(collection, 10000,
             TimeUnit.MILLISECONDS, (collectionState) -> getReplicaOrNull(collectionState, shardId, coreZkNodeName) != null);
       } catch (TimeoutException e) {
         throw new SolrException(ErrorCode.SERVER_ERROR, "Error registering SolrCore, timeout waiting for replica present in clusterstate");
@@ -1874,10 +1909,7 @@ public class ZkController implements Closeable {
       props.put(ZkStateReader.SHARD_ID_PROP, cd.getCloudDescriptor().getShardId());
       props.put(ZkStateReader.COLLECTION_PROP, collection);
       props.put(ZkStateReader.REPLICA_TYPE, cd.getCloudDescriptor().getReplicaType().toString());
-
-      if (!Overseer.isLegacy(zkStateReader)) {
-        props.put(ZkStateReader.FORCE_SET_STATE_PROP, "false");
-      }
+      props.put(ZkStateReader.FORCE_SET_STATE_PROP, "false");
       if (numShards != null) {
         props.put(ZkStateReader.NUM_SHARDS_PROP, numShards.toString());
       }
@@ -2159,69 +2191,60 @@ public class ZkController implements Closeable {
 
   /**
    * On startup, the node already published all of its replicas as DOWN,
-   * so in case of legacyCloud=false ( the replica must already present on Zk )
    * we can skip publish the replica as down
    * @return Should publish the replica as down on startup
    */
   private boolean isPublishAsDownOnStartup(CloudDescriptor cloudDesc) {
-    if (!Overseer.isLegacy(zkStateReader)) {
       Replica replica = zkStateReader.getClusterState().getCollection(cloudDesc.getCollectionName())
           .getSlice(cloudDesc.getShardId())
           .getReplica(cloudDesc.getCoreNodeName());
-      if (replica.getNodeName().equals(getNodeName())) {
-        return false;
-      }
-    }
-    return true;
+      return !replica.getNodeName().equals(getNodeName());
   }
 
   private void checkStateInZk(CoreDescriptor cd) throws InterruptedException, NotInClusterStateException {
-    if (!Overseer.isLegacy(zkStateReader)) {
-      CloudDescriptor cloudDesc = cd.getCloudDescriptor();
-      String nodeName = cloudDesc.getCoreNodeName();
+    CloudDescriptor cloudDesc = cd.getCloudDescriptor();
+    String nodeName = cloudDesc.getCoreNodeName();
+    if (nodeName == null) {
+      nodeName = cloudDesc.getCoreNodeName();
+      // verify that the repair worked.
       if (nodeName == null) {
-        nodeName = cloudDesc.getCoreNodeName();
-        // verify that the repair worked.
-        if (nodeName == null) {
-          throw new SolrException(ErrorCode.SERVER_ERROR, "No coreNodeName for " + cd);
-        }
-      }
-      final String coreNodeName = nodeName;
-
-      if (cloudDesc.getShardId() == null) {
-        throw new SolrException(ErrorCode.SERVER_ERROR, "No shard id for " + cd);
+        throw new SolrException(ErrorCode.SERVER_ERROR, "No coreNodeName for " + cd);
       }
+    }
+    final String coreNodeName = nodeName;
 
-      AtomicReference<String> errorMessage = new AtomicReference<>();
-      AtomicReference<DocCollection> collectionState = new AtomicReference<>();
-      try {
-        zkStateReader.waitForState(cd.getCollectionName(), WAIT_FOR_STATE, TimeUnit.SECONDS, (c) -> {
-          collectionState.set(c);
-          if (c == null)
-            return false;
-          Slice slice = c.getSlice(cloudDesc.getShardId());
-          if (slice == null) {
-            errorMessage.set("Invalid shard: " + cloudDesc.getShardId());
-            return false;
-          }
-          Replica replica = slice.getReplica(coreNodeName);
-          if (replica == null) {
-            StringBuilder sb = new StringBuilder();
-            slice.getReplicas().stream().forEach(replica1 -> sb.append(replica1.getName() + " "));
-            errorMessage.set("coreNodeName " + coreNodeName + " does not exist in shard " + cloudDesc.getShardId() +
-                ", ignore the exception if the replica was deleted. Found: " + sb.toString());
-            return false;
-          }
-          return true;
-        });
-      } catch (TimeoutException e) {
-        String error = errorMessage.get();
-        if (error == null)
-          error = "coreNodeName " + coreNodeName + " does not exist in shard " + cloudDesc.getShardId() +
-              ", ignore the exception if the replica was deleted" ;
+    if (cloudDesc.getShardId() == null) {
+      throw new SolrException(ErrorCode.SERVER_ERROR, "No shard id for " + cd);
+    }
 
-        throw new NotInClusterStateException(ErrorCode.SERVER_ERROR, error + "\n" + getZkStateReader().getClusterState().getCollection(cd.getCollectionName()));
-      }
+    AtomicReference<String> errorMessage = new AtomicReference<>();
+    AtomicReference<DocCollection> collectionState = new AtomicReference<>();
+    try {
+      zkStateReader.waitForState(cd.getCollectionName(), WAIT_FOR_STATE, TimeUnit.SECONDS, (c) -> {
+        collectionState.set(c);
+        if (c == null)
+          return false;
+        Slice slice = c.getSlice(cloudDesc.getShardId());
+        if (slice == null) {
+          errorMessage.set("Invalid shard: " + cloudDesc.getShardId());
+          return false;
+        }
+        Replica replica = slice.getReplica(coreNodeName);
+        if (replica == null) {
+          StringBuilder sb = new StringBuilder();
+          slice.getReplicas().stream().forEach(replica1 -> sb.append(replica1.getName() + " "));
+          errorMessage.set("coreNodeName " + coreNodeName + " does not exist in shard " + cloudDesc.getShardId() +
+              ", ignore the exception if the replica was deleted. Found: " + sb.toString());
+          return false;
+        }
+        return true;
+      });
+    } catch (TimeoutException e) {
+      String error = errorMessage.get();
+      if (error == null)
+        error = "coreNodeName " + coreNodeName + " does not exist in shard " + cloudDesc.getShardId() +
+            ", ignore the exception if the replica was deleted" ;
+       throw new NotInClusterStateException(ErrorCode.SERVER_ERROR, error + "\n" + getZkStateReader().getClusterState().getCollection(cd.getCollectionName()));
     }
   }
 
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/AddReplicaCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/AddReplicaCmd.java
index c572ed9..44ae298 100644
--- a/solr/core/src/java/org/apache/solr/cloud/api/collections/AddReplicaCmd.java
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/AddReplicaCmd.java
@@ -248,31 +248,32 @@ public class AddReplicaCmd implements OverseerCollectionMessageHandler.Cmd {
 
     ModifiableSolrParams params = new ModifiableSolrParams();
     ZkStateReader zkStateReader = ocmh.zkStateReader;
-    if (!Overseer.isLegacy(zkStateReader)) {
-      ZkNodeProps props = new ZkNodeProps(
-              Overseer.QUEUE_OPERATION, ADDREPLICA.toLower(),
-              ZkStateReader.COLLECTION_PROP, collectionName,
-              ZkStateReader.SHARD_ID_PROP, createReplica.sliceName,
-              ZkStateReader.CORE_NAME_PROP, createReplica.coreName,
-              ZkStateReader.STATE_PROP, Replica.State.DOWN.toString(),
-              ZkStateReader.BASE_URL_PROP, zkStateReader.getBaseUrlForNodeName(createReplica.node),
-              ZkStateReader.NODE_NAME_PROP, createReplica.node,
-              ZkStateReader.REPLICA_TYPE, createReplica.replicaType.name());
-      if (createReplica.coreNodeName != null) {
-        props = props.plus(ZkStateReader.CORE_NODE_NAME_PROP, createReplica.coreNodeName);
-      }
-      if (!skipCreateReplicaInClusterState) {
-        try {
-          ocmh.overseer.offerStateUpdate(Utils.toJSON(props));
-        } catch (Exception e) {
-          ParWork.propegateInterrupt(e);
-          throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Exception updating Overseer state queue", e);
-        }
+    ZkNodeProps props = new ZkNodeProps(
+            Overseer.QUEUE_OPERATION, ADDREPLICA.toLower(),
+            ZkStateReader.COLLECTION_PROP, collectionName,
+            ZkStateReader.SHARD_ID_PROP, createReplica.sliceName,
+            ZkStateReader.CORE_NAME_PROP, createReplica.coreName,
+            ZkStateReader.STATE_PROP, Replica.State.DOWN.toString(),
+            ZkStateReader.BASE_URL_PROP, zkStateReader.getBaseUrlForNodeName(createReplica.node),
+            ZkStateReader.NODE_NAME_PROP, createReplica.node,
+            ZkStateReader.REPLICA_TYPE, createReplica.replicaType.name());
+    if (createReplica.coreNodeName != null) {
+      props = props.plus(ZkStateReader.CORE_NODE_NAME_PROP, createReplica.coreNodeName);
+    }
+    if (!skipCreateReplicaInClusterState) {
+      try {
+        ocmh.overseer.offerStateUpdate(Utils.toJSON(props));
+      } catch (Exception e) {
+        ParWork.propegateInterrupt(e);
+        throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Exception updating Overseer state queue", e);
       }
-      String coreUrl = ZkCoreNodeProps.getCoreUrl(props.getStr(ZkStateReader.BASE_URL_PROP), createReplica.coreName);;
-      params.set(CoreAdminParams.CORE_NODE_NAME,
-          ocmh.waitToSeeReplicasInState(collectionName, Collections.singletonList(coreUrl), false).get(coreUrl).getName());
     }
+    String coreUrl = ZkCoreNodeProps.getCoreUrl(props.getStr(ZkStateReader.BASE_URL_PROP), createReplica.coreName);;
+    params.set(CoreAdminParams.CORE_NODE_NAME,
+        ocmh.waitToSeeReplicasInState(collectionName, Collections.singletonList(coreUrl), false).get(coreUrl).getName());
+
+    params.set(CoreAdminParams.CORE_NODE_NAME,
+        ocmh.waitToSeeReplicasInState(collectionName, Collections.singletonList(createReplica.coreName), true).get(createReplica.coreName).getName());
 
     String configName = zkStateReader.readConfigName(collectionName);
     String routeKey = message.getStr(ShardParams._ROUTE_);
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/BackupCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/BackupCmd.java
index eb6c878..76663f5 100644
--- a/solr/core/src/java/org/apache/solr/cloud/api/collections/BackupCmd.java
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/BackupCmd.java
@@ -113,8 +113,8 @@ public class BackupCmd implements OverseerCollectionMessageHandler.Cmd {
     String configName = ocmh.zkStateReader.readConfigName(collectionName);
     backupMgr.downloadConfigDir(location, backupName, configName);
 
-    //Save the collection's state. Can be part of the monolithic clusterstate.json or a individual state.json
-    //Since we don't want to distinguish we extract the state and back it up as a separate json
+    //Save the collection's state (coming from the collection's state.json)
+    //We extract the state and back it up as a separate json
     DocCollection collectionState = ocmh.zkStateReader.getClusterState().getCollection(collectionName);
     backupMgr.writeCollectionState(location, backupName, collectionName, collectionState);
 
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateCollectionCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateCollectionCmd.java
index 8a4ce7c..804de84 100644
--- a/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateCollectionCmd.java
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateCollectionCmd.java
@@ -159,8 +159,6 @@ public class CreateCollectionCmd implements OverseerCollectionMessageHandler.Cmd
 
       final String async = message.getStr(ASYNC);
 
-      boolean isLegacyCloud = Overseer.isLegacy(zkStateReader);
-
       Map<String,String> collectionParams = new HashMap<>();
       Map<String,Object> collectionProps = message.getProperties();
       for (Map.Entry<String, Object> entry : collectionProps.entrySet()) {
@@ -171,7 +169,7 @@ public class CreateCollectionCmd implements OverseerCollectionMessageHandler.Cmd
       }
       createCollectionZkNode(stateManager, collectionName, collectionParams, configName);
 
-      OverseerCollectionMessageHandler.createConfNode(stateManager, configName, collectionName, isLegacyCloud);
+      OverseerCollectionMessageHandler.createConfNode(stateManager, configName, collectionName);
 
       // nocommit
       for (String shardName : shardNames) {
@@ -264,31 +262,28 @@ public class CreateCollectionCmd implements OverseerCollectionMessageHandler.Cmd
 
 
         String baseUrl = zkStateReader.getBaseUrlForNodeName(nodeName);
-        //in the new mode, create the replica in clusterstate prior to creating the core.
+        // create the replica in the collection's state.json in ZK prior to creating the core.
         // Otherwise the core creation fails
 
         log.info("Base url for replica={}", baseUrl);
 
-        if (!isLegacyCloud) {
-
-          ZkNodeProps props = new ZkNodeProps();
-          props.getProperties().putAll(message.getProperties());
-          ZkNodeProps addReplicaProps = new ZkNodeProps(
-                  Overseer.QUEUE_OPERATION, ADDREPLICA.toString(),
-                  ZkStateReader.COLLECTION_PROP, collectionName,
-                  ZkStateReader.SHARD_ID_PROP, replicaPosition.shard,
-                  ZkStateReader.CORE_NAME_PROP, coreName,
-                  ZkStateReader.STATE_PROP, Replica.State.DOWN.toString(),
-                  ZkStateReader.BASE_URL_PROP, baseUrl,
-                  ZkStateReader.NODE_NAME_PROP, nodeName,
-                  ZkStateReader.REPLICA_TYPE, replicaPosition.type.name(),
-                  ZkStateReader.NUM_SHARDS_PROP, message.getStr(ZkStateReader.NUM_SHARDS_PROP),
-                      "shards", message.getStr("shards"),
-                  CommonAdminParams.WAIT_FOR_FINAL_STATE, Boolean.toString(waitForFinalState));
-          props.getProperties().putAll(addReplicaProps.getProperties());
-          if (log.isDebugEnabled()) log.debug("Sending state update to populate clusterstate with new replica {}", props);
-          ocmh.overseer.offerStateUpdate(Utils.toJSON(props));
-        }
+        ZkNodeProps props = new ZkNodeProps();
+        props.getProperties().putAll(message.getProperties());
+        ZkNodeProps addReplicaProps = new ZkNodeProps(
+                Overseer.QUEUE_OPERATION, ADDREPLICA.toString(),
+                ZkStateReader.COLLECTION_PROP, collectionName,
+                ZkStateReader.SHARD_ID_PROP, replicaPosition.shard,
+                ZkStateReader.CORE_NAME_PROP, coreName,
+                ZkStateReader.STATE_PROP, Replica.State.DOWN.toString(),
+                ZkStateReader.BASE_URL_PROP, baseUrl,
+                ZkStateReader.NODE_NAME_PROP, nodeName,
+                ZkStateReader.REPLICA_TYPE, replicaPosition.type.name(),
+                ZkStateReader.NUM_SHARDS_PROP, message.getStr(ZkStateReader.NUM_SHARDS_PROP),
+                    "shards", message.getStr("shards"),
+                CommonAdminParams.WAIT_FOR_FINAL_STATE, Boolean.toString(waitForFinalState));
+        props.getProperties().putAll(addReplicaProps.getProperties());
+        if (log.isDebugEnabled()) log.debug("Sending state update to populate clusterstate with new replica {}", props);
+        ocmh.overseer.offerStateUpdate(Utils.toJSON(props));
 
         // Need to create new params for each request
         ModifiableSolrParams params = new ModifiableSolrParams();
@@ -317,39 +312,32 @@ public class CreateCollectionCmd implements OverseerCollectionMessageHandler.Cmd
         sreq.actualShards = sreq.shards;
         sreq.params = params;
 
-        if (isLegacyCloud) {
-          log.info("Submit request to shard for legacyCloud for replica={}", baseUrl);
-          shardHandler.submit(sreq, sreq.shards[0], sreq.params);
-        } else {
-          coresToCreate.put(coreName, sreq);
-        }
+        coresToCreate.put(coreName, sreq);
       }
 
-      if(!isLegacyCloud) {
-        // wait for all replica entries to be created
-        Map<String,Replica> replicas = new HashMap<>();
-        zkStateReader.waitForState(collectionName, 10, TimeUnit.SECONDS, (n, c) -> c != null && c.getSlices().size() == shardNames.size());
-        zkStateReader.waitForState(collectionName, 5, TimeUnit.SECONDS, expectedReplicas(coresToCreate.size(), replicas)); // nocommit - timeout - keep this below containing timeouts - need central timeout stuff
-        // TODO what if replicas comes back wrong?
-        if (replicas.size() > 0) {
-          for (Map.Entry<String, ShardRequest> e : coresToCreate.entrySet()) {
-            ShardRequest sreq = e.getValue();
-            for (Replica rep : replicas.values()) {
-              if (rep.getCoreName().equals(sreq.params.get(CoreAdminParams.NAME)) && rep.getBaseUrl().equals(sreq.shards[0])) {
-                sreq.params.set(CoreAdminParams.CORE_NODE_NAME, rep.getName());
-              }
+      // wait for all replica entries to be created
+      Map<String,Replica> replicas = new HashMap<>();
+      zkStateReader.waitForState(collectionName, 10, TimeUnit.SECONDS, (n, c) -> c != null && c.getSlices().size() == shardNames.size());
+      zkStateReader.waitForState(collectionName, 5, TimeUnit.SECONDS, expectedReplicas(coresToCreate.size(), replicas)); // nocommit - timeout - keep this below containing timeouts - need central timeout stuff
+      // TODO what if replicas comes back wrong?
+      if (replicas.size() > 0) {
+        for (Map.Entry<String, ShardRequest> e : coresToCreate.entrySet()) {
+          ShardRequest sreq = e.getValue();
+          for (Replica rep : replicas.values()) {
+            if (rep.getCoreName().equals(sreq.params.get(CoreAdminParams.NAME)) && rep.getBaseUrl().equals(sreq.shards[0])) {
+              sreq.params.set(CoreAdminParams.CORE_NODE_NAME, rep.getName());
             }
-            Replica replica = replicas.get(e.getKey());
+          }
+          Replica replica = replicas.get(e.getKey());
 
-            if (replica != null) {
-              String coreNodeName = replica.getName();
-              sreq.params.set(CoreAdminParams.CORE_NODE_NAME, coreNodeName);
-              log.info("Set the {} for replica {} to {}", CoreAdminParams.CORE_NODE_NAME, replica, coreNodeName);
-            }
-           
-            log.info("Submit request to shard for for replica={}", sreq.actualShards != null ? Arrays.asList(sreq.actualShards) : "null");
-            shardHandler.submit(sreq, sreq.shards[0], sreq.params);
+          if (replica != null) {
+            String coreNodeName = replica.getName();
+            sreq.params.set(CoreAdminParams.CORE_NODE_NAME, coreNodeName);
+            log.info("Set the {} for replica {} to {}", CoreAdminParams.CORE_NODE_NAME, replica, coreNodeName);
           }
+           
+          log.info("Submit request to shard for for replica={}", sreq.actualShards != null ? Arrays.asList(sreq.actualShards) : "null");
+          shardHandler.submit(sreq, sreq.shards[0], sreq.params);
         }
       }
 
@@ -573,12 +561,8 @@ public class CreateCollectionCmd implements OverseerCollectionMessageHandler.Cmd
       }
     }
 
-    // TODO default to 2; but need to debug why BasicDistributedZk2Test fails early on
-    String znode = message.getInt(DocCollection.STATE_FORMAT, 1) == 1 ? ZkStateReader.CLUSTER_STATE
-            : ZkStateReader.getCollectionPath(cName);
-
     DocCollection newCollection = new DocCollection(cName,
-            slices, collectionProps, router, -1, znode);
+            slices, collectionProps, router, -1);
 
     return newCollection;
   }
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/OverseerCollectionMessageHandler.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/OverseerCollectionMessageHandler.java
index a6d1ca2..053271a 100644
--- a/solr/core/src/java/org/apache/solr/cloud/api/collections/OverseerCollectionMessageHandler.java
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/OverseerCollectionMessageHandler.java
@@ -113,7 +113,6 @@ import static org.apache.solr.common.params.CollectionParams.CollectionAction.DE
 import static org.apache.solr.common.params.CollectionParams.CollectionAction.DELETESNAPSHOT;
 import static org.apache.solr.common.params.CollectionParams.CollectionAction.MAINTAINROUTEDALIAS;
 import static org.apache.solr.common.params.CollectionParams.CollectionAction.MIGRATE;
-import static org.apache.solr.common.params.CollectionParams.CollectionAction.MIGRATESTATEFORMAT;
 import static org.apache.solr.common.params.CollectionParams.CollectionAction.MOCK_COLL_TASK;
 import static org.apache.solr.common.params.CollectionParams.CollectionAction.MOCK_REPLICA_TASK;
 import static org.apache.solr.common.params.CollectionParams.CollectionAction.MOCK_SHARD_TASK;
@@ -250,7 +249,6 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler,
         .put(MOCK_COLL_TASK, this::mockOperation)
         .put(MOCK_SHARD_TASK, this::mockOperation)
         .put(MOCK_REPLICA_TASK, this::mockOperation)
-        .put(MIGRATESTATEFORMAT, this::migrateStateFormat)
         .put(CREATESHARD, new CreateShardCmd(this))
         .put(MIGRATE, new MigrateCmd(this))
             .put(CREATE, new CreateCollectionCmd(this, overseer.getCoreContainer(), cloudManager))
@@ -502,39 +500,6 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler,
     }
   }
 
-
-  //TODO should we not remove in the next release ?
-  @SuppressWarnings({"unchecked"})
-  private void migrateStateFormat(ClusterState state, ZkNodeProps message, @SuppressWarnings({"rawtypes"})NamedList results) throws Exception {
-    final String collectionName = message.getStr(COLLECTION_PROP);
-
-    boolean firstLoop = true;
-    // wait for a while until the state format changes
-    TimeOut timeout = new TimeOut(30, TimeUnit.SECONDS, timeSource);
-
-    // TODO: don't poll
-    while (! timeout.hasTimedOut()) {
-      DocCollection collection = zkStateReader.getClusterState().getCollection(collectionName);
-      if (collection == null) {
-        throw new SolrException(ErrorCode.BAD_REQUEST, "Collection: " + collectionName + " not found");
-      }
-      if (collection.getStateFormat() == 2) {
-        // Done.
-        results.add("success", new SimpleOrderedMap<>());
-        return;
-      }
-
-      if (firstLoop) {
-        // Actually queue the migration command.
-        firstLoop = false;
-        ZkNodeProps m = new ZkNodeProps(Overseer.QUEUE_OPERATION, MIGRATESTATEFORMAT.toLower(), COLLECTION_PROP, collectionName);
-        overseer.offerStateUpdate(Utils.toJSON(m));
-      }
-      timeout.sleep(100);
-    }
-    throw new SolrException(ErrorCode.SERVER_ERROR, "Could not migrate state format for collection: " + collectionName);
-  }
-
   @SuppressWarnings({"unchecked"})
   void commit(@SuppressWarnings({"rawtypes"})NamedList results, String slice, Replica parentShardLeader) {
     log.debug("Calling soft commit to make sub shard updates visible");
@@ -666,8 +631,7 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler,
     if(configName != null) {
       validateConfigOrThrowSolrException(configName);
 
-      boolean isLegacyCloud =  Overseer.isLegacy(zkStateReader);
-      createConfNode(cloudManager.getDistribStateManager(), configName, collectionName, isLegacyCloud);
+      createConfNode(cloudManager.getDistribStateManager(), configName, collectionName);
       reloadCollection(null, new ZkNodeProps(NAME, collectionName), results);
     }
 
@@ -779,7 +743,7 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler,
    * This doesn't validate the config (path) itself and is just responsible for creating the confNode.
    * That check should be done before the config node is created.
    */
-  public static void createConfNode(DistribStateManager stateManager, String configName, String coll, boolean isLegacyCloud) throws IOException, AlreadyExistsException, BadVersionException, KeeperException, InterruptedException {
+  public static void createConfNode(DistribStateManager stateManager, String configName, String coll) throws IOException, AlreadyExistsException, BadVersionException, KeeperException, InterruptedException {
 
     if (configName != null) {
       String collDir = ZkStateReader.COLLECTIONS_ZKNODE + "/" + coll;
@@ -791,11 +755,7 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler,
         stateManager.makePath(collDir, data, CreateMode.PERSISTENT, false);
       }
     } else {
-      if(isLegacyCloud){
-        log.warn("Could not obtain config name");
-      } else {
-        throw new SolrException(ErrorCode.BAD_REQUEST,"Unable to get config name");
-      }
+      throw new SolrException(ErrorCode.BAD_REQUEST,"Unable to get config name");
     }
   }
 
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/ReindexCollectionCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/ReindexCollectionCmd.java
index d92cef0..e32aa41 100644
--- a/solr/core/src/java/org/apache/solr/cloud/api/collections/ReindexCollectionCmd.java
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/ReindexCollectionCmd.java
@@ -322,7 +322,6 @@ public class ReindexCollectionCmd implements OverseerCollectionMessageHandler.Cm
 
       propMap.put(ZkStateReader.MAX_SHARDS_PER_NODE, maxShardsPerNode);
       propMap.put(CommonAdminParams.WAIT_FOR_FINAL_STATE, true);
-      propMap.put(DocCollection.STATE_FORMAT, message.getInt(DocCollection.STATE_FORMAT, coll.getStateFormat()));
       if (rf != null) {
         propMap.put(ZkStateReader.REPLICATION_FACTOR, rf);
       }
@@ -348,7 +347,6 @@ public class ReindexCollectionCmd implements OverseerCollectionMessageHandler.Cm
           CommonParams.NAME, chkCollection,
           ZkStateReader.NUM_SHARDS_PROP, "1",
           ZkStateReader.REPLICATION_FACTOR, "1",
-          DocCollection.STATE_FORMAT, "2",
           CollectionAdminParams.COLL_CONF, "_default",
           CommonAdminParams.WAIT_FOR_FINAL_STATE, "true"
       );
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/RestoreCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/RestoreCmd.java
index 3051b71..b209462 100644
--- a/solr/core/src/java/org/apache/solr/cloud/api/collections/RestoreCmd.java
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/RestoreCmd.java
@@ -64,7 +64,6 @@ import org.apache.solr.handler.component.ShardHandler;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import static org.apache.solr.common.cloud.DocCollection.STATE_FORMAT;
 import static org.apache.solr.common.cloud.ZkStateReader.COLLECTION_PROP;
 import static org.apache.solr.common.cloud.ZkStateReader.MAX_SHARDS_PER_NODE;
 import static org.apache.solr.common.cloud.ZkStateReader.NRT_REPLICAS;
@@ -108,6 +107,14 @@ public class RestoreCmd implements OverseerCollectionMessageHandler.Cmd {
 
     Properties properties = backupMgr.readBackupProperties(location, backupName);
     String backupCollection = properties.getProperty(BackupManager.COLLECTION_NAME_PROP);
+
+    // Test if the collection is of stateFormat 1 (i.e. not 2) supported pre Solr 9, in which case can't restore it.
+    Object format = properties.get("stateFormat");
+    if (format != null && !"2".equals(format)) {
+      throw new SolrException(ErrorCode.BAD_REQUEST, "Collection " + backupCollection + " is in stateFormat=" + format +
+          " no longer supported in Solr 9 and above. It can't be restored. If it originates in Solr 8 you can restore" +
+          " it there, migrate it to stateFormat=2 and backup again, it will then be restorable on Solr 9");
+    }
     String backupCollectionAlias = properties.getProperty(BackupManager.COLLECTION_ALIAS_PROP);
     DocCollection backupCollectionState = backupMgr.readCollectionState(location, backupName, backupCollection);
 
@@ -161,9 +168,6 @@ public class RestoreCmd implements OverseerCollectionMessageHandler.Cmd {
       Map<String, Object> propMap = new HashMap<>();
       propMap.put(Overseer.QUEUE_OPERATION, CREATE.toString());
       propMap.put("fromApi", "true"); // mostly true.  Prevents autoCreated=true in the collection state.
-      if (properties.get(STATE_FORMAT) == null) {
-        propMap.put(STATE_FORMAT, "2");
-      }
       propMap.put(REPLICATION_FACTOR, numNrtReplicas);
       propMap.put(NRT_REPLICAS, numNrtReplicas);
       propMap.put(TLOG_REPLICAS, numTlogReplicas);
diff --git a/solr/core/src/java/org/apache/solr/cloud/autoscaling/sim/SimClusterStateProvider.java b/solr/core/src/java/org/apache/solr/cloud/autoscaling/sim/SimClusterStateProvider.java
index fbaa47f..cee479d 100644
--- a/solr/core/src/java/org/apache/solr/cloud/autoscaling/sim/SimClusterStateProvider.java
+++ b/solr/core/src/java/org/apache/solr/cloud/autoscaling/sim/SimClusterStateProvider.java
@@ -233,7 +233,7 @@ public class SimClusterStateProvider implements ClusterStateProvider {
           Map<String, Object> routerProp = (Map<String, Object>) collProps.getOrDefault(DocCollection.DOC_ROUTER, Collections.singletonMap("name", DocRouter.DEFAULT_NAME));
           DocRouter router = DocRouter.getDocRouter((String)routerProp.getOrDefault("name", DocRouter.DEFAULT_NAME));
           String path = ZkStateReader.getCollectionPath(name);
-          coll = new DocCollection(name, slices, collProps, router, zkVersion + 1, path);
+          coll = new DocCollection(name, slices, collProps, router, zkVersion + 1);
           try {
             SimDistribStateManager stateManager = cloudManager.getSimDistribStateManager();
             byte[] data = Utils.toJSON(Collections.singletonMap(name, coll));
@@ -2524,7 +2524,7 @@ public class SimClusterStateProvider implements ClusterStateProvider {
       lock.lock();
       try {
         Map<String, DocCollection> states = getCollectionStates();
-        ClusterState state = new ClusterState(0, liveNodes.get(), states);
+        ClusterState state = new ClusterState(liveNodes.get(), states);
         return state;
       } finally {
         lock.unlock();
diff --git a/solr/core/src/java/org/apache/solr/cloud/autoscaling/sim/SnapshotClusterStateProvider.java b/solr/core/src/java/org/apache/solr/cloud/autoscaling/sim/SnapshotClusterStateProvider.java
index e011b4c..af385c1 100644
--- a/solr/core/src/java/org/apache/solr/cloud/autoscaling/sim/SnapshotClusterStateProvider.java
+++ b/solr/core/src/java/org/apache/solr/cloud/autoscaling/sim/SnapshotClusterStateProvider.java
@@ -30,7 +30,6 @@ import java.util.Set;
 import org.apache.solr.client.solrj.impl.ClusterStateProvider;
 import org.apache.solr.common.cloud.ClusterState;
 import org.apache.solr.common.cloud.DocCollection;
-import org.apache.solr.common.cloud.ZkStateReader;
 import org.apache.solr.common.util.Utils;
 import org.noggit.CharArr;
 import org.noggit.JSONWriter;
@@ -47,7 +46,7 @@ public class SnapshotClusterStateProvider implements ClusterStateProvider {
   public SnapshotClusterStateProvider(ClusterStateProvider other) throws Exception {
     liveNodes = Set.copyOf(other.getLiveNodes());
     ClusterState otherState = other.getClusterState();
-    clusterState = new ClusterState(otherState.getZNodeVersion(), liveNodes, otherState.getCollectionsMap());
+    clusterState = new ClusterState(liveNodes, otherState.getCollectionsMap());
     clusterProperties = new HashMap<>(other.getClusterProperties());
   }
 
@@ -69,15 +68,13 @@ public class SnapshotClusterStateProvider implements ClusterStateProvider {
         collMap = mutableState;
         mutableState = Collections.singletonMap(name, state);
       }
-      Integer version = Integer.parseInt(String.valueOf(collMap.getOrDefault("zNodeVersion", stateVersion)));
-      String path = String.valueOf(collMap.getOrDefault("zNode", ZkStateReader.getCollectionPath(name)));
+      int version = Integer.parseInt(String.valueOf(collMap.getOrDefault("zNodeVersion", stateVersion)));
       collMap.remove("zNodeVersion");
-      collMap.remove("zNode");
       byte[] data = Utils.toJSON(mutableState);
-      ClusterState collState = ClusterState.load(version, data, Collections.emptySet(), path);
+      ClusterState collState = ClusterState.createFromJson(version, data, Collections.emptySet());
       collectionStates.put(name, collState.getCollection(name));
     });
-    clusterState = new ClusterState(stateVersion, liveNodes, collectionStates);
+    clusterState = new ClusterState(liveNodes, collectionStates);
   }
 
   public Map<String, Object> getSnapshot() {
@@ -97,7 +94,6 @@ public class SnapshotClusterStateProvider implements ClusterStateProvider {
         @SuppressWarnings({"unchecked"})
         Map<String, Object> collMap = new LinkedHashMap<>((Map<String, Object>)Utils.fromJSON(json.getBytes("UTF-8")));
         collMap.put("zNodeVersion", coll.getZNodeVersion());
-        collMap.put("zNode", coll.getZNode());
         // format compatible with the real /state.json, which uses a mini-ClusterState
         // consisting of a single collection
         stateMap.put(coll.getName(), Collections.singletonMap(coll.getName(), collMap));
diff --git a/solr/core/src/java/org/apache/solr/cloud/overseer/ClusterStateMutator.java b/solr/core/src/java/org/apache/solr/cloud/overseer/ClusterStateMutator.java
index f7373d6..0409e4a 100644
--- a/solr/core/src/java/org/apache/solr/cloud/overseer/ClusterStateMutator.java
+++ b/solr/core/src/java/org/apache/solr/cloud/overseer/ClusterStateMutator.java
@@ -111,12 +111,8 @@ public class ClusterStateMutator {
       collectionProps.put("autoCreated", "true");
     }
 
-    //TODO default to 2; but need to debug why BasicDistributedZk2Test fails early on
-    String znode = message.getInt(DocCollection.STATE_FORMAT, 1) == 1 ? null
-            : ZkStateReader.getCollectionPath(cName);
-
     DocCollection newCollection = new DocCollection(cName,
-            slices, collectionProps, router, -1, znode);
+            slices, collectionProps, router, -1);
 
     return new ZkWriteCommand(cName, newCollection);
   }
@@ -171,16 +167,5 @@ public class ClusterStateMutator {
     }
     return null;
   }
-
-  public ZkWriteCommand migrateStateFormat(ClusterState clusterState, ZkNodeProps message) {
-    final String collection = message.getStr(ZkStateReader.COLLECTION_PROP);
-    if (!CollectionMutator.checkKeyExistence(message, ZkStateReader.COLLECTION_PROP)) return ZkStateWriter.NO_OP;
-    DocCollection coll = clusterState.getCollectionOrNull(collection);
-    if (coll == null || coll.getStateFormat() == 2) return ZkStateWriter.NO_OP;
-
-    return new ZkWriteCommand(coll.getName(),
-            new DocCollection(coll.getName(), coll.getSlicesMap(), coll.getProperties(), coll.getRouter(), 0,
-                    ZkStateReader.getCollectionPath(collection)));
-  }
 }
 
diff --git a/solr/core/src/java/org/apache/solr/cloud/overseer/CollectionMutator.java b/solr/core/src/java/org/apache/solr/cloud/overseer/CollectionMutator.java
index 1c2be1b..fb783c0 100644
--- a/solr/core/src/java/org/apache/solr/cloud/overseer/CollectionMutator.java
+++ b/solr/core/src/java/org/apache/solr/cloud/overseer/CollectionMutator.java
@@ -136,7 +136,7 @@ public class CollectionMutator {
     }
 
     return new ZkWriteCommand(coll.getName(),
-        new DocCollection(coll.getName(), coll.getSlicesMap(), m, coll.getRouter(), coll.getZNodeVersion(), coll.getZNode()));
+        new DocCollection(coll.getName(), coll.getSlicesMap(), m, coll.getRouter(), coll.getZNodeVersion()));
   }
 
   public static DocCollection updateSlice(String collectionName, DocCollection collection, Slice slice) {
diff --git a/solr/core/src/java/org/apache/solr/cloud/overseer/ReplicaMutator.java b/solr/core/src/java/org/apache/solr/cloud/overseer/ReplicaMutator.java
index 3f0e297..34bd120 100644
--- a/solr/core/src/java/org/apache/solr/cloud/overseer/ReplicaMutator.java
+++ b/solr/core/src/java/org/apache/solr/cloud/overseer/ReplicaMutator.java
@@ -205,12 +205,26 @@ public class ReplicaMutator {
     return new ZkWriteCommand(collectionName, newCollection);
   }
 
+  /**
+   * Handles state updates
+   */
   public ZkWriteCommand setState(ClusterState clusterState, ZkNodeProps message) {
-    if (Overseer.isLegacy(cloudManager.getClusterStateProvider())) {
-      return updateState(clusterState, message);
-    } else {
-      return updateStateNew(clusterState, message);
+    String collectionName = message.getStr(ZkStateReader.COLLECTION_PROP);
+    if (!checkCollectionKeyExistence(message)) return ZkStateWriter.NO_OP;
+    String sliceName = message.getStr(ZkStateReader.SHARD_ID_PROP);
+
+    if (collectionName == null || sliceName == null) {
+      log.error("Invalid collection and slice {}", message);
+      return ZkStateWriter.NO_OP;
     }
+    DocCollection collection = clusterState.getCollectionOrNull(collectionName);
+    Slice slice = collection != null ? collection.getSlice(sliceName) : null;
+    if (slice == null) {
+      log.error("No such slice exists {}", message);
+      return ZkStateWriter.NO_OP;
+    }
+
+    return updateState(clusterState, message);
   }
 
   protected ZkWriteCommand updateState(final ClusterState prevState, ZkNodeProps message) {
@@ -357,28 +371,6 @@ public class ReplicaMutator {
     return new ZkWriteCommand(collectionName, newCollection);
   }
 
-  /**
-   * Handles non-legacy state updates
-   */
-  protected ZkWriteCommand updateStateNew(ClusterState clusterState, final ZkNodeProps message) {
-    String collectionName = message.getStr(ZkStateReader.COLLECTION_PROP);
-    if (!checkCollectionKeyExistence(message)) return ZkStateWriter.NO_OP;
-    String sliceName = message.getStr(ZkStateReader.SHARD_ID_PROP);
-
-    if (collectionName == null || sliceName == null) {
-      log.error("Invalid collection and slice {}", message);
-      return ZkStateWriter.NO_OP;
-    }
-    DocCollection collection = clusterState.getCollectionOrNull(collectionName);
-    Slice slice = collection != null ? collection.getSlice(sliceName) : null;
-    if (slice == null) {
-      log.error("No such slice exists {}", message);
-      return ZkStateWriter.NO_OP;
-    }
-
-    return updateState(clusterState, message);
-  }
-
   private DocCollection checkAndCompleteShardSplit(ClusterState prevState, DocCollection collection, String coreNodeName, String sliceName, Replica replica) {
     Slice slice = collection.getSlice(sliceName);
     Map<String, Object> sliceProps = slice.getProperties();
diff --git a/solr/core/src/java/org/apache/solr/cloud/overseer/ZkStateWriter.java b/solr/core/src/java/org/apache/solr/cloud/overseer/ZkStateWriter.java
index cf99221..c405d62 100644
--- a/solr/core/src/java/org/apache/solr/cloud/overseer/ZkStateWriter.java
+++ b/solr/core/src/java/org/apache/solr/cloud/overseer/ZkStateWriter.java
@@ -51,6 +51,20 @@ import java.util.concurrent.TimeoutException;
 
 
 // nocommit - need to allow for a configurable flush interval again
+/**
+ * ZkStateWriter is responsible for writing updates to the cluster state stored in ZooKeeper for collections
+ * each of which gets their own individual state.json in ZK.
+ *
+ * Updates to the cluster state are specified using the
+ * {@link #enqueueUpdate(ClusterState, List, ZkWriteCallback)} method. The class buffers updates
+ * to reduce the number of writes to ZK. The buffered updates are flushed during <code>enqueueUpdate</code>
+ * automatically if necessary. The writePendingUpdates(ClusterState) can be used to force flush any pending updates.
+ *
+ * If either enqueueUpdate(ClusterState, List, ZkWriteCallback) or writePendingUpdates(ClusterState))
+ * throws a {@link org.apache.zookeeper.KeeperException.BadVersionException} then the internal buffered state of the
+ * class is suspect and the current instance of the class should be discarded and a new instance should be created
+ * and used for any future updates.
+ */
 public class ZkStateWriter {
   // pleeeease leeeeeeeeeeets not - THERE HAS TO BE  BETTER WAY
   // private static final long MAX_FLUSH_INTERVAL = TimeUnit.NANOSECONDS.convert(Overseer.STATE_UPDATE_DELAY, TimeUnit.MILLISECONDS);
@@ -89,7 +103,7 @@ public class ZkStateWriter {
    * <p>
    * The modified state may be buffered or flushed to ZooKeeper depending on the internal buffering
    * logic of this class. The {@link #hasPendingUpdates()} method may be used to determine if the
-   * last enqueue operation resulted in buffered state. The method {@link #writePendingUpdates(ClusterState)} can
+   * last enqueue operation resulted in buffered state. The method writePendingUpdates(ClusterState) can
    * be used to force an immediate flush of pending cluster state changes.
    *
    * @param state the cluster state information on which the given <code>cmd</code> is applied
@@ -212,8 +226,7 @@ public class ZkStateWriter {
             }
 
             DocCollection newCollection = new DocCollection(name, newSliceMap,
-                c.getProperties(), c.getRouter(), prevVersion,
-                path);
+                c.getProperties(), c.getRouter(), prevVersion);
 
             if (log.isDebugEnabled()) {
               log.debug("The new collection {}", newCollection);
@@ -231,7 +244,7 @@ public class ZkStateWriter {
             }
             //   assert c.getStateFormat() > 1;
             DocCollection newCollection = new DocCollection(name, c.getSlicesMap(), c.getProperties(), c.getRouter(),
-                0, path);
+                0);
 
             LinkedHashMap collStates = new LinkedHashMap<>(prevState.getCollectionStates());
             collStates.put(name, new ClusterState.CollectionRef(newCollection));
diff --git a/solr/core/src/java/org/apache/solr/core/CoreContainer.java b/solr/core/src/java/org/apache/solr/core/CoreContainer.java
index c083a17..15d392d 100644
--- a/solr/core/src/java/org/apache/solr/core/CoreContainer.java
+++ b/solr/core/src/java/org/apache/solr/core/CoreContainer.java
@@ -63,6 +63,7 @@ import org.apache.solr.client.solrj.impl.XMLResponseParser;
 import org.apache.solr.client.solrj.io.SolrClientCache;
 import org.apache.solr.client.solrj.util.SolrIdentifierValidator;
 import org.apache.solr.cloud.CloudDescriptor;
+import org.apache.solr.cloud.OverseerTaskQueue;
 import org.apache.solr.cloud.ZkController;
 import org.apache.solr.cloud.autoscaling.AutoScalingHandler;
 import org.apache.solr.common.AlreadyClosedException;
diff --git a/solr/core/src/java/org/apache/solr/core/backup/BackupManager.java b/solr/core/src/java/org/apache/solr/core/backup/BackupManager.java
index 902fbb6..5b9b533 100644
--- a/solr/core/src/java/org/apache/solr/core/backup/BackupManager.java
+++ b/solr/core/src/java/org/apache/solr/core/backup/BackupManager.java
@@ -139,7 +139,7 @@ public class BackupManager {
     try (IndexInput is = repository.openInput(zkStateDir, COLLECTION_PROPS_FILE, IOContext.DEFAULT)) {
       byte[] arr = new byte[(int) is.length()]; // probably ok since the json file should be small.
       is.readBytes(arr, 0, (int) is.length());
-      ClusterState c_state = ClusterState.load(-1, arr, Collections.emptySet());
+      ClusterState c_state = ClusterState.createFromJson(-1, arr, Collections.emptySet());
       return c_state.getCollection(collectionName);
     }
   }
diff --git a/solr/core/src/java/org/apache/solr/handler/admin/ClusterStatus.java b/solr/core/src/java/org/apache/solr/handler/admin/ClusterStatus.java
index 262b97f..aafe5e9 100644
--- a/solr/core/src/java/org/apache/solr/handler/admin/ClusterStatus.java
+++ b/solr/core/src/java/org/apache/solr/handler/admin/ClusterStatus.java
@@ -22,7 +22,6 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
-import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -80,10 +79,6 @@ public class ClusterStatus {
 
     ClusterState clusterState = zkStateReader.getClusterState();
 
-    // convert cluster state into a map of writable types
-    byte[] bytes = Utils.toJSON(clusterState);
-    Map<String, Object> stateMap = (Map<String,Object>) Utils.fromJSON(bytes);
-
     String routeKey = message.getStr(ShardParams._ROUTE_);
     String shard = message.getStr(ZkStateReader.SHARD_ID_PROP);
 
@@ -135,13 +130,9 @@ public class ClusterStatus {
         requestedShards.addAll(Arrays.asList(paramShards));
       }
 
-      if (clusterStateCollection.getStateFormat() > 1) {
-        bytes = Utils.toJSON(clusterStateCollection);
+        byte[] bytes = Utils.toJSON(clusterStateCollection);
         Map<String, Object> docCollection = (Map<String, Object>) Utils.fromJSON(bytes);
         collectionStatus = getCollectionStatus(docCollection, name, requestedShards);
-      } else {
-        collectionStatus = getCollectionStatus((Map<String, Object>) stateMap.get(name), name, requestedShards);
-      }
 
       collectionStatus.put("znodeVersion", clusterStateCollection.getZNodeVersion());
       if (collectionVsAliases.containsKey(name) && !collectionVsAliases.get(name).isEmpty()) {
@@ -166,8 +157,7 @@ public class ClusterStatus {
     clusterStatus.add("collections", collectionProps);
 
     // read cluster properties
-    @SuppressWarnings({"rawtypes"})
-    Map clusterProps = zkStateReader.getClusterProperties();
+    Map<String, Object> clusterProps = zkStateReader.getClusterProperties();
     if (clusterProps != null && !clusterProps.isEmpty())  {
       clusterStatus.add("properties", clusterProps);
     }
@@ -233,19 +223,17 @@ public class ClusterStatus {
 
   @SuppressWarnings("unchecked")
   protected void crossCheckReplicaStateWithLiveNodes(List<String> liveNodes, NamedList<Object> collectionProps) {
-    Iterator<Map.Entry<String,Object>> colls = collectionProps.iterator();
-    while (colls.hasNext()) {
-      Map.Entry<String,Object> next = colls.next();
-      Map<String,Object> collMap = (Map<String,Object>)next.getValue();
-      Map<String,Object> shards = (Map<String,Object>)collMap.get("shards");
+    for (Map.Entry<String, Object> next : collectionProps) {
+      Map<String, Object> collMap = (Map<String, Object>) next.getValue();
+      Map<String, Object> shards = (Map<String, Object>) collMap.get("shards");
       for (Object nextShard : shards.values()) {
-        Map<String,Object> shardMap = (Map<String,Object>)nextShard;
-        Map<String,Object> replicas = (Map<String,Object>)shardMap.get("replicas");
+        Map<String, Object> shardMap = (Map<String, Object>) nextShard;
+        Map<String, Object> replicas = (Map<String, Object>) shardMap.get("replicas");
         for (Object nextReplica : replicas.values()) {
-          Map<String,Object> replicaMap = (Map<String,Object>)nextReplica;
+          Map<String, Object> replicaMap = (Map<String, Object>) nextReplica;
           if (Replica.State.getState((String) replicaMap.get(ZkStateReader.STATE_PROP)) != Replica.State.DOWN) {
             // not down, so verify the node is live
-            String node_name = (String)replicaMap.get(ZkStateReader.NODE_NAME_PROP);
+            String node_name = (String) replicaMap.get(ZkStateReader.NODE_NAME_PROP);
             if (!liveNodes.contains(node_name)) {
               // node is not live, so this replica is actually down
               replicaMap.put(ZkStateReader.STATE_PROP, Replica.State.DOWN.toString());
@@ -255,6 +243,4 @@ public class ClusterStatus {
       }
     }
   }
-
-
 }
diff --git a/solr/core/src/java/org/apache/solr/handler/admin/ColStatus.java b/solr/core/src/java/org/apache/solr/handler/admin/ColStatus.java
index 3f9cba3..90b7625 100644
--- a/solr/core/src/java/org/apache/solr/handler/admin/ColStatus.java
+++ b/solr/core/src/java/org/apache/solr/handler/admin/ColStatus.java
@@ -101,7 +101,6 @@ public class ColStatus {
         continue;
       }
       SimpleOrderedMap<Object> colMap = new SimpleOrderedMap<>();
-      colMap.add("stateFormat", coll.getStateFormat());
       colMap.add("znodeVersion", coll.getZNodeVersion());
       Map<String, Object> props = new TreeMap<>(coll.getProperties());
       props.remove("shards");
diff --git a/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java b/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java
index 9ed9158..39bae41 100644
--- a/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java
+++ b/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java
@@ -103,7 +103,6 @@ import static org.apache.solr.common.SolrException.ErrorCode.BAD_REQUEST;
 import static org.apache.solr.common.cloud.DocCollection.DOC_ROUTER;
 import static org.apache.solr.common.cloud.DocCollection.RULE;
 import static org.apache.solr.common.cloud.DocCollection.SNITCH;
-import static org.apache.solr.common.cloud.DocCollection.STATE_FORMAT;
 import static org.apache.solr.common.cloud.ZkStateReader.AUTO_ADD_REPLICAS;
 import static org.apache.solr.common.cloud.ZkStateReader.COLLECTION_PROP;
 import static org.apache.solr.common.cloud.ZkStateReader.MAX_SHARDS_PER_NODE;
@@ -151,7 +150,6 @@ import static org.apache.solr.common.params.CollectionParams.CollectionAction.LI
 import static org.apache.solr.common.params.CollectionParams.CollectionAction.LISTALIASES;
 import static org.apache.solr.common.params.CollectionParams.CollectionAction.LISTSNAPSHOTS;
 import static org.apache.solr.common.params.CollectionParams.CollectionAction.MIGRATE;
-import static org.apache.solr.common.params.CollectionParams.CollectionAction.MIGRATESTATEFORMAT;
 import static org.apache.solr.common.params.CollectionParams.CollectionAction.MODIFYCOLLECTION;
 import static org.apache.solr.common.params.CollectionParams.CollectionAction.MOVEREPLICA;
 import static org.apache.solr.common.params.CollectionParams.CollectionAction.OVERSEERSTATUS;
@@ -499,7 +497,6 @@ public class CollectionsHandler extends RequestHandlerBase implements Permission
           ZkStateReader.CREATE_NODE_SET,
           CREATE_NODE_SET_SHUFFLE,
           SHARDS_PROP,
-          STATE_FORMAT,
           AUTO_ADD_REPLICAS,
           RULE,
           SNITCH,
@@ -511,8 +508,6 @@ public class CollectionsHandler extends RequestHandlerBase implements Permission
           WITH_COLLECTION,
           ALIAS);
 
-      props.putIfAbsent(STATE_FORMAT, "2");
-
       if (props.get(REPLICATION_FACTOR) != null && props.get(NRT_REPLICAS) != null) {
         //TODO: Remove this in 8.0 . Keep this for SolrJ client back-compat. See SOLR-11676 for more details
         int replicationFactor = Integer.parseInt((String) props.get(REPLICATION_FACTOR));
@@ -604,7 +599,6 @@ public class CollectionsHandler extends RequestHandlerBase implements Permission
           CREATE_NODE_SET_SHUFFLE,
           AUTO_ADD_REPLICAS,
           "shards",
-          STATE_FORMAT,
           CommonParams.ROWS,
           CommonParams.Q,
           CommonParams.FL,
@@ -1104,8 +1098,6 @@ public class CollectionsHandler extends RequestHandlerBase implements Permission
       }
       return m;
     }),
-    MIGRATESTATEFORMAT_OP(MIGRATESTATEFORMAT, (req, rsp, h) -> copy(req.getParams().required(), null, COLLECTION_PROP)),
-
     BACKUP_OP(BACKUP, (req, rsp, h) -> {
       req.getParams().required().check(NAME, COLLECTION_PROP);
 
@@ -1214,7 +1206,7 @@ public class CollectionsHandler extends RequestHandlerBase implements Permission
       }
       // from CREATE_OP:
       copy(req.getParams(), params, COLL_CONF, REPLICATION_FACTOR, NRT_REPLICAS, TLOG_REPLICAS,
-          PULL_REPLICAS, MAX_SHARDS_PER_NODE, STATE_FORMAT, AUTO_ADD_REPLICAS, ZkStateReader.CREATE_NODE_SET, CREATE_NODE_SET_SHUFFLE);
+          PULL_REPLICAS, MAX_SHARDS_PER_NODE, AUTO_ADD_REPLICAS, ZkStateReader.CREATE_NODE_SET, CREATE_NODE_SET_SHUFFLE);
       copyPropertiesWithPrefix(req.getParams(), params, COLL_PROP_PREFIX);
       return params;
     }),
diff --git a/solr/core/src/java/org/apache/solr/handler/admin/ZookeeperInfoHandler.java b/solr/core/src/java/org/apache/solr/handler/admin/ZookeeperInfoHandler.java
index 65328d0..bc6b103 100644
--- a/solr/core/src/java/org/apache/solr/handler/admin/ZookeeperInfoHandler.java
+++ b/solr/core/src/java/org/apache/solr/handler/admin/ZookeeperInfoHandler.java
@@ -263,7 +263,7 @@ public final class ZookeeperInfoHandler extends RequestHandlerBase {
     }
 
     /**
-     * Create a merged view of all collections (internal from /clusterstate.json and external from /collections/?/state.json
+     * Create a merged view of all collections from /collections/?/state.json
      */
     private synchronized List<String> getCollections(SolrZkClient zkClient) throws KeeperException, InterruptedException {
       if (cachedCollections == null) {
@@ -284,7 +284,7 @@ public final class ZookeeperInfoHandler extends RequestHandlerBase {
     /**
      * Gets the requested page of collections after applying filters and offsets.
      */
-    public PageOfCollections fetchPage(PageOfCollections page, SolrZkClient zkClient)
+    public void fetchPage(PageOfCollections page, SolrZkClient zkClient)
         throws KeeperException, InterruptedException {
 
 
@@ -306,8 +306,6 @@ public final class ZookeeperInfoHandler extends RequestHandlerBase {
       // status until reading all status objects from ZK
       if (page.filterType != FilterType.status)
         page.selectPage(children);
-
-      return page;
     }
 
     @Override
@@ -384,7 +382,7 @@ public final class ZookeeperInfoHandler extends RequestHandlerBase {
     String dumpS = params.get("dump");
     boolean dump = dumpS != null && dumpS.equals("true");
 
-    int start = params.getInt("start", 0);
+    int start = params.getInt("start", 0); // Note start ignored if rows not specified
     int rows = params.getInt("rows", -1);
 
     String filterType = params.get("filterType");
@@ -406,12 +404,19 @@ public final class ZookeeperInfoHandler extends RequestHandlerBase {
     printer.detail = detail;
     printer.dump = dump;
     boolean isGraphView = "graph".equals(params.get("view"));
-    printer.page = (isGraphView && "/clusterstate.json".equals(path))
-        ? new PageOfCollections(start, rows, type, filter) : null;
+    // There is no znode /clusterstate.json (removed in Solr 9), but we do as if there's one and return collection listing
+    // Need to change services.js if cleaning up here, collection list is used from Admin UI Cloud - Graph
+    boolean paginateCollections = (isGraphView && "/clusterstate.json".equals(path));
+    printer.page = paginateCollections ? new PageOfCollections(start, rows, type, filter) : null;
     printer.pagingSupport = pagingSupport;
 
     try {
-      printer.print(path);
+      if (paginateCollections) {
+        // List collections and allow pagination, but no specific znode info like when looking at a normal ZK path
+        printer.printPaginatedCollections();
+      } else {
+        printer.print(path);
+      }
     } finally {
       printer.close();
     }
@@ -433,7 +438,7 @@ public final class ZookeeperInfoHandler extends RequestHandlerBase {
     String keeperAddr; // the address we're connected to
 
     final BAOS baos = new BAOS();
-    final Writer out = new OutputStreamWriter(baos,  StandardCharsets.UTF_8);
+    final Writer out = new OutputStreamWriter(baos, StandardCharsets.UTF_8);
     SolrZkClient zkClient;
 
     PageOfCollections page;
@@ -454,7 +459,7 @@ public final class ZookeeperInfoHandler extends RequestHandlerBase {
       }
     }
 
-    // main entry point
+    // main entry point for printing from path
     void print(String path) throws IOException {
       if (zkClient == null) {
         return;
@@ -502,6 +507,90 @@ public final class ZookeeperInfoHandler extends RequestHandlerBase {
       out.write(chars.toString());
     }
 
+    // main entry point for printing collections
+    @SuppressWarnings("unchecked")
+    void printPaginatedCollections() throws IOException {
+      SortedMap<String, Object> collectionStates;
+      try {
+        // support paging of the collections graph view (in case there are many collections)
+        // fetch the requested page of collections and then retrieve the state for each
+        pagingSupport.fetchPage(page, zkClient);
+        // keep track of how many collections match the filter
+        boolean applyStatusFilter = (page.filterType == FilterType.status && page.filter != null);
+        List<String> matchesStatusFilter = applyStatusFilter ? new ArrayList<String>() : null;
+        Set<String> liveNodes = applyStatusFilter ?
+            zkController.getZkStateReader().getClusterState().getLiveNodes() : null;
+
+        collectionStates = new TreeMap<>(pagingSupport);
+        for (String collection : page.selected) {
+          // Get collection state from ZK
+          String collStatePath = String.format(Locale.ROOT, "/collections/%s/state.json", collection);
+          String childDataStr = null;
+          try {
+            byte[] childData = zkClient.getData(collStatePath, null, null);
+            if (childData != null)
+              childDataStr = (new BytesRef(childData)).utf8ToString();
+          } catch (KeeperException.NoNodeException nne) {
+            log.warn("State for collection {} not found.", collection);
+          } catch (Exception childErr) {
+            log.error("Failed to get {} due to", collStatePath, childErr);
+          }
+
+          if (childDataStr != null) {
+            Map<String, Object> extColl = (Map<String, Object>) Utils.fromJSONString(childDataStr);
+            Object collectionState = extColl.get(collection);
+
+            if (applyStatusFilter) {
+              // verify this collection matches the filtered state
+              if (page.matchesStatusFilter((Map<String, Object>) collectionState, liveNodes)) {
+                matchesStatusFilter.add(collection);
+                collectionStates.put(collection, collectionState);
+              }
+            } else {
+              collectionStates.put(collection, collectionState);
+            }
+          }
+        }
+
+        if (applyStatusFilter) {
+          // update the paged navigation info after applying the status filter
+          page.selectPage(matchesStatusFilter);
+
+          // rebuild the Map of state data
+          SortedMap<String, Object> map = new TreeMap<String, Object>(pagingSupport);
+          for (String next : page.selected)
+            map.put(next, collectionStates.get(next));
+          collectionStates = map;
+        }
+      } catch (KeeperException | InterruptedException e) {
+        writeError(500, e.toString());
+        return;
+      }
+
+      CharArr chars = new CharArr();
+      JSONWriter json = new JSONWriter(chars, 2);
+      json.startObject();
+
+      json.writeString("znode");
+      json.writeNameSeparator();
+      json.startObject();
+
+      // For some reason, without this the Json is badly formed
+      writeKeyValue(json, PATH, "Undefined", true);
+
+      if (collectionStates != null) {
+        CharArr collectionOut = new CharArr();
+        new JSONWriter(collectionOut, 2).write(collectionStates);
+        writeKeyValue(json, "data", collectionOut.toString(), false);
+      }
+
+      writeKeyValue(json, "paging", page.getPagingHeader(), false);
+
+      json.endObject();
+      json.endObject();
+      out.write(chars.toString());
+    }
+
     void writeError(int code, String msg) throws IOException {
       throw new SolrException(ErrorCode.getErrorCode(code), msg);
       /*response.setStatus(code);
@@ -523,7 +612,6 @@ public final class ZookeeperInfoHandler extends RequestHandlerBase {
       out.write(chars.toString());*/
     }
 
-
     boolean printTree(JSONWriter json, String path) throws IOException {
       String label = path;
       if (!fullpath) {
@@ -624,7 +712,6 @@ public final class ZookeeperInfoHandler extends RequestHandlerBase {
       json.write(v);
     }
 
-    @SuppressWarnings("unchecked")
     boolean printZnode(JSONWriter json, String path) throws IOException {
       try {
         String dataStr = null;
@@ -639,95 +726,6 @@ public final class ZookeeperInfoHandler extends RequestHandlerBase {
             dataStrErr = "data is not parsable as a utf8 String: " + e.toString();
           }
         }
-        // support paging of the collections graph view (in case there are many collections)
-        if (page != null) {
-          // we've already pulled the data for /clusterstate.json from ZooKeeper above,
-          // but it needs to be parsed into a map so we can lookup collection states before
-          // trying to find them in the /collections/?/state.json znode
-          Map<String, Object> clusterstateJsonMap = null;
-          if (dataStr != null) {
-            try {
-              clusterstateJsonMap = (Map<String, Object>) Utils.fromJSONString(dataStr);
-            } catch (Exception e) {
-              throw new SolrException(ErrorCode.SERVER_ERROR,
-                  "Failed to parse /clusterstate.json from ZooKeeper due to: " + e, e);
-            }
-          } else {
-            clusterstateJsonMap = Utils.makeMap();
-          }
-
-          // fetch the requested page of collections and then retrieve the state for each 
-          page = pagingSupport.fetchPage(page, zkClient);
-          // keep track of how many collections match the filter
-          boolean applyStatusFilter =
-              (page.filterType == FilterType.status && page.filter != null);
-          List<String> matchesStatusFilter = applyStatusFilter ? new ArrayList<String>() : null;
-          Set<String> liveNodes = applyStatusFilter ?
-              zkController.getZkStateReader().getClusterState().getLiveNodes() : null;
-
-          SortedMap<String, Object> collectionStates = new TreeMap<String, Object>(pagingSupport);
-          for (String collection : page.selected) {
-            Object collectionState = clusterstateJsonMap.get(collection);
-            if (collectionState != null) {
-              // collection state was in /clusterstate.json
-              if (applyStatusFilter) {
-                // verify this collection matches the status filter
-                if (page.matchesStatusFilter((Map<String, Object>) collectionState, liveNodes)) {
-                  matchesStatusFilter.add(collection);
-                  collectionStates.put(collection, collectionState);
-                }
-              } else {
-                collectionStates.put(collection, collectionState);
-              }
-            } else {
-              // looks like an external collection ...
-              String collStatePath = String.format(Locale.ROOT, "/collections/%s/state.json", collection);
-              String childDataStr = null;
-              try {
-                byte[] childData = zkClient.getData(collStatePath, null, null);
-                if (childData != null)
-                  childDataStr = (new BytesRef(childData)).utf8ToString();
-              } catch (KeeperException.NoNodeException nne) {
-                log.warn("State for collection {} not found in /clusterstate.json or /collections/{}/state.json!"
-                    , collection, collection);
-              } catch (Exception childErr) {
-                log.error("Failed to get {} due to", collStatePath, childErr);
-              }
-
-              if (childDataStr != null) {
-                Map<String, Object> extColl = (Map<String, Object>) Utils.fromJSONString(childDataStr);
-                collectionState = extColl.get(collection);
-
-                if (applyStatusFilter) {
-                  // verify this collection matches the filtered state
-                  if (page.matchesStatusFilter((Map<String, Object>) collectionState, liveNodes)) {
-                    matchesStatusFilter.add(collection);
-                    collectionStates.put(collection, collectionState);
-                  }
-                } else {
-                  collectionStates.put(collection, collectionState);
-                }
-              }
-            }
-          }
-
-          if (applyStatusFilter) {
-            // update the paged navigation info after applying the status filter
-            page.selectPage(matchesStatusFilter);
-
-            // rebuild the Map of state data
-            SortedMap<String, Object> map = new TreeMap<String, Object>(pagingSupport);
-            for (String next : page.selected)
-              map.put(next, collectionStates.get(next));
-            collectionStates = map;
-          }
-
-          if (collectionStates != null) {
-            CharArr out = new CharArr();
-            new JSONWriter(out, 2).write(collectionStates);
-            dataStr = out.toString();
-          }
-        }
 
         json.writeString("znode");
         json.writeNameSeparator();
diff --git a/solr/core/src/java/org/apache/solr/util/SolrCLI.java b/solr/core/src/java/org/apache/solr/util/SolrCLI.java
index 94ca548..09bfc9b 100755
--- a/solr/core/src/java/org/apache/solr/util/SolrCLI.java
+++ b/solr/core/src/java/org/apache/solr/util/SolrCLI.java
@@ -1092,7 +1092,6 @@ public class SolrCLI implements CLIO {
       Map<String, Object> results = new LinkedHashMap<>();
       if (withClusterState) {
         Map<String, Object> map = new LinkedHashMap<>();
-        map.put("znodeVersion", clusterState.getZNodeVersion());
         map.put("liveNodes", new TreeSet<>(clusterState.getLiveNodes()));
         map.put("collections", clusterState.getCollectionsMap());
         results.put("CLUSTERSTATE", map);
diff --git a/solr/core/src/test/org/apache/solr/cloud/ClusterStateMockUtil.java b/solr/core/src/test/org/apache/solr/cloud/ClusterStateMockUtil.java
index f41d80a..87629d2 100644
--- a/solr/core/src/test/org/apache/solr/cloud/ClusterStateMockUtil.java
+++ b/solr/core/src/test/org/apache/solr/cloud/ClusterStateMockUtil.java
@@ -184,7 +184,7 @@ public class ClusterStateMockUtil {
       }
     }
 
-    ClusterState clusterState = new ClusterState(1, new HashSet<>(Arrays.asList(liveNodes)), collectionStates);
+    ClusterState clusterState = new ClusterState(new HashSet<>(Arrays.asList(liveNodes)), collectionStates);
     MockZkStateReader reader = new MockZkStateReader(clusterState, collectionStates.keySet());
 
     String json;
diff --git a/solr/core/src/test/org/apache/solr/cloud/ClusterStateTest.java b/solr/core/src/test/org/apache/solr/cloud/ClusterStateTest.java
index 6488de8..4047811 100644
--- a/solr/core/src/test/org/apache/solr/cloud/ClusterStateTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/ClusterStateTest.java
@@ -58,10 +58,10 @@ public class ClusterStateTest extends SolrTestCaseJ4 {
     collectionStates.put("collection1", new DocCollection("collection1", slices, null, DocRouter.DEFAULT));
     collectionStates.put("collection2", new DocCollection("collection2", slices, null, DocRouter.DEFAULT));
 
-    ClusterState clusterState = new ClusterState(-1,liveNodes, collectionStates);
+    ClusterState clusterState = new ClusterState(liveNodes, collectionStates);
     byte[] bytes = Utils.toJSON(clusterState);
     // System.out.println("#################### " + new String(bytes));
-    ClusterState loadedClusterState = ClusterState.load(-1, bytes, liveNodes);
+    ClusterState loadedClusterState = ClusterState.createFromJson(-1, bytes, liveNodes);
     
     assertEquals("Provided liveNodes not used properly", 2, loadedClusterState
         .getLiveNodes().size());
@@ -69,13 +69,13 @@ public class ClusterStateTest extends SolrTestCaseJ4 {
     assertEquals("Properties not copied properly", replica.getStr("prop1"), loadedClusterState.getCollection("collection1").getSlice("shard1").getReplicasMap().get("node1").getStr("prop1"));
     assertEquals("Properties not copied properly", replica.getStr("prop2"), loadedClusterState.getCollection("collection1").getSlice("shard1").getReplicasMap().get("node1").getStr("prop2"));
 
-    loadedClusterState = ClusterState.load(-1, new byte[0], liveNodes);
+    loadedClusterState = ClusterState.createFromJson(-1, new byte[0], liveNodes);
     
     assertEquals("Provided liveNodes not used properly", 2, loadedClusterState
         .getLiveNodes().size());
     assertEquals("Should not have collections", 0, loadedClusterState.getCollectionsMap().size());
 
-    loadedClusterState = ClusterState.load(-1, (byte[])null, liveNodes);
+    loadedClusterState = ClusterState.createFromJson(-1, (byte[])null, liveNodes);
     
     assertEquals("Provided liveNodes not used properly", 2, loadedClusterState
         .getLiveNodes().size());
diff --git a/solr/core/src/test/org/apache/solr/cloud/CollectionPropsTest.java b/solr/core/src/test/org/apache/solr/cloud/CollectionPropsTest.java
index a7f666c..e0df450 100644
--- a/solr/core/src/test/org/apache/solr/cloud/CollectionPropsTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/CollectionPropsTest.java
@@ -52,11 +52,7 @@ public class CollectionPropsTest extends SolrCloudTestCase {
 
   @BeforeClass
   public static void setupClass() throws Exception {
-    Boolean useLegacyCloud = rarely();
-    log.info("Using legacyCloud?: {}", useLegacyCloud);
-
     configureCluster(4)
-        .withProperty(ZkStateReader.LEGACY_CLOUD, String.valueOf(useLegacyCloud))
         .addConfig("conf", configset("cloud-minimal")).formatZk(true)
         .configure();
   }
diff --git a/solr/core/src/test/org/apache/solr/cloud/CollectionStateFormat2Test.java b/solr/core/src/test/org/apache/solr/cloud/CollectionStateZnodeTest.java
similarity index 91%
rename from solr/core/src/test/org/apache/solr/cloud/CollectionStateFormat2Test.java
rename to solr/core/src/test/org/apache/solr/cloud/CollectionStateZnodeTest.java
index 0d31b84..f544d35 100644
--- a/solr/core/src/test/org/apache/solr/cloud/CollectionStateFormat2Test.java
+++ b/solr/core/src/test/org/apache/solr/cloud/CollectionStateZnodeTest.java
@@ -26,7 +26,7 @@ import org.junit.Ignore;
 import org.junit.Test;
 
 @Ignore // debug
-public class CollectionStateFormat2Test extends SolrCloudTestCase {
+public class CollectionStateZnodeTest extends SolrCloudTestCase {
 
   @BeforeClass
   public static void setupCluster() throws Exception {
@@ -56,12 +56,11 @@ public class CollectionStateFormat2Test extends SolrCloudTestCase {
     DocCollection c = getCollectionState(collectionName);
 
     assertEquals("DocCollection version should equal the znode version", stat.getVersion(), c.getZNodeVersion() );
-    assertTrue("DocCollection#getStateFormat() must be > 1", c.getStateFormat() > 1);
 
     // remove collection
     CollectionAdminRequest.deleteCollection(collectionName).process(cluster.getSolrClient());
 
-    assertFalse("collection state should not exist externally",
+    assertFalse("collection state should not exist",
         zkClient().exists(ZkStateReader.getCollectionPath(collectionName)));
 
   }
diff --git a/solr/core/src/test/org/apache/solr/cloud/CollectionsAPISolrJTest.java b/solr/core/src/test/org/apache/solr/cloud/CollectionsAPISolrJTest.java
index 920e3bb..d047f83 100644
--- a/solr/core/src/test/org/apache/solr/cloud/CollectionsAPISolrJTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/CollectionsAPISolrJTest.java
@@ -116,7 +116,6 @@ public class CollectionsAPISolrJTest extends SolrCloudTestCase {
 
   @Before
   public void beforeTest() throws Exception {
-
   }
   
   @After
@@ -318,18 +317,17 @@ public class CollectionsAPISolrJTest extends SolrCloudTestCase {
   public void testCreateAndDeleteCollection() throws Exception {
     String collectionName = "solrj_test";
     CollectionAdminResponse response = CollectionAdminRequest.createCollection(collectionName, "conf", 2, 2)
-            .setMaxShardsPerNode(4).process(cluster.getSolrClient());
-    assertEquals(response.toString(), 0, response.getStatus());
-    assertTrue(response.toString(), response.isSuccess());
+        .process(cluster.getSolrClient());
 
-    // nocommit - there is still a race around getting response for too fast a request
-//    Map<String, NamedList<Integer>> coresStatus = response.getCollectionCoresStatus();
-//    assertEquals(4, coresStatus.size());
-//    for (String coreName : coresStatus.keySet()) {
-//      NamedList<Integer> status = coresStatus.get(coreName);
-//      assertEquals(0, (int)status.get("status"));
-//      assertTrue(status.get("QTime") > 0);
-//    }
+    assertEquals(0, response.getStatus());
+    assertTrue(response.isSuccess());
+    Map<String, NamedList<Integer>> coresStatus = response.getCollectionCoresStatus();
+    assertEquals(4, coresStatus.size());
+    for (String coreName : coresStatus.keySet()) {
+      NamedList<Integer> status = coresStatus.get(coreName);
+      assertEquals(0, (int)status.get("status"));
+      assertTrue(status.get("QTime") > 0);
+    }
 
     response = CollectionAdminRequest.deleteCollection(collectionName).process(cluster.getSolrClient());
 
@@ -338,11 +336,10 @@ public class CollectionsAPISolrJTest extends SolrCloudTestCase {
 //    Map<String,NamedList<Integer>> nodesStatus = response.getCollectionNodesStatus();
 //    assertEquals(TEST_NIGHTLY ? 4 : 2, nodesStatus.size());
 
-    // Test Creating a collection with new stateformat.
-    collectionName = "solrj_newstateformat";
+    // Test Creating a new collection.
+    collectionName = "solrj_test2";
 
     response = CollectionAdminRequest.createCollection(collectionName, "conf", 2, 2)
-        .setStateFormat(2)
         .setMaxShardsPerNode(4).process(cluster.getSolrClient());
     assertEquals(0, response.getStatus());
     assertTrue(response.isSuccess());
@@ -353,7 +350,6 @@ public class CollectionsAPISolrJTest extends SolrCloudTestCase {
   public void testCloudInfoInCoreStatus() throws IOException, SolrServerException {
     String collectionName = "corestatus_test";
     CollectionAdminResponse response = CollectionAdminRequest.createCollection(collectionName, "conf", 2, 2)
-        .setStateFormat(1)
         .process(cluster.getSolrClient());
 
     assertEquals(0, response.getStatus());
@@ -559,21 +555,21 @@ public class CollectionsAPISolrJTest extends SolrCloudTestCase {
     // sanity check our expected default
     final ClusterProperties props = new ClusterProperties(zkClient());
     assertEquals("Expecting prop to default to unset, test needs upated",
-                 props.getClusterProperty(ZkStateReader.LEGACY_CLOUD, null), null);
+                 props.getClusterProperty(ZkStateReader.AUTO_ADD_REPLICAS, null), null);
     
-    CollectionAdminResponse response = CollectionAdminRequest.setClusterProperty(ZkStateReader.LEGACY_CLOUD, "true")
+    CollectionAdminResponse response = CollectionAdminRequest.setClusterProperty(ZkStateReader.AUTO_ADD_REPLICAS, "true")
       .process(cluster.getSolrClient());
     assertEquals(0, response.getStatus());
-    assertEquals("Cluster property was not set", props.getClusterProperty(ZkStateReader.LEGACY_CLOUD, null), "true");
+    assertEquals("Cluster property was not set", props.getClusterProperty(ZkStateReader.AUTO_ADD_REPLICAS, null), "true");
 
     // Unset ClusterProp that we set.
-    CollectionAdminRequest.setClusterProperty(ZkStateReader.LEGACY_CLOUD, null).process(cluster.getSolrClient());
-    assertEquals("Cluster property was not unset", props.getClusterProperty(ZkStateReader.LEGACY_CLOUD, null), null);
+    CollectionAdminRequest.setClusterProperty(ZkStateReader.AUTO_ADD_REPLICAS, null).process(cluster.getSolrClient());
+    assertEquals("Cluster property was not unset", props.getClusterProperty(ZkStateReader.AUTO_ADD_REPLICAS, null), null);
 
-    response = CollectionAdminRequest.setClusterProperty(ZkStateReader.LEGACY_CLOUD, "false")
+    response = CollectionAdminRequest.setClusterProperty(ZkStateReader.AUTO_ADD_REPLICAS, "false")
         .process(cluster.getSolrClient());
     assertEquals(0, response.getStatus());
-    assertEquals("Cluster property was not set", props.getClusterProperty(ZkStateReader.LEGACY_CLOUD, null), "false");
+    assertEquals("Cluster property was not set", props.getClusterProperty(ZkStateReader.AUTO_ADD_REPLICAS, null), "false");
   }
 
   @Test
diff --git a/solr/core/src/test/org/apache/solr/cloud/CreateRoutedAliasTest.java b/solr/core/src/test/org/apache/solr/cloud/CreateRoutedAliasTest.java
index 620f16f..e2808b7 100644
--- a/solr/core/src/test/org/apache/solr/cloud/CreateRoutedAliasTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/CreateRoutedAliasTest.java
@@ -146,7 +146,6 @@ public class CreateRoutedAliasTest extends SolrCloudTestCase {
     assertEquals(1, coll.getNumTlogReplicas().intValue()); // per-shard
     assertEquals(1, coll.getNumPullReplicas().intValue()); // per-shard
     assertEquals(4, coll.getMaxShardsPerNode());
-    //TODO SOLR-11877 assertEquals(2, coll.getStateFormat());
     assertTrue("nodeSet didn't work?",
         coll.getSlices().stream().flatMap(s -> s.getReplicas().stream())
             .map(Replica::getNodeName).allMatch(createNode::equals));
@@ -193,7 +192,6 @@ public class CreateRoutedAliasTest extends SolrCloudTestCase {
     assertEquals("foo_s", ((Map)coll.get("router")).get("field"));
     assertEquals(1, coll.getSlices().size()); // numShards
     assertEquals(2, coll.getReplicationFactor().intValue()); // num replicas
-    //TODO SOLR-11877 assertEquals(2, coll.getStateFormat());
 
     // Test Alias metadata
     Aliases aliases = cluster.getSolrClient().getZkStateReader().getAliases();
diff --git a/solr/core/src/test/org/apache/solr/cloud/DeleteInactiveReplicaTest.java b/solr/core/src/test/org/apache/solr/cloud/DeleteInactiveReplicaTest.java
index 3a05b5b..721b0bb 100644
--- a/solr/core/src/test/org/apache/solr/cloud/DeleteInactiveReplicaTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/DeleteInactiveReplicaTest.java
@@ -27,7 +27,6 @@ import org.apache.solr.client.solrj.request.CoreAdminRequest;
 import org.apache.solr.common.cloud.DocCollection;
 import org.apache.solr.common.cloud.Replica;
 import org.apache.solr.common.cloud.Slice;
-import org.apache.solr.common.cloud.ZkStateReader;
 import org.apache.solr.common.util.TimeSource;
 import org.apache.solr.core.CoreDescriptor;
 import org.apache.solr.core.SolrCore;
@@ -49,7 +48,6 @@ public class DeleteInactiveReplicaTest extends SolrCloudTestCase {
     useFactory(null);
     configureCluster(4)
         .addConfig("conf", configset("cloud-minimal"))
-        .withProperty(ZkStateReader.LEGACY_CLOUD, "false")
         .configure();
   }
 
diff --git a/solr/core/src/test/org/apache/solr/cloud/DeleteReplicaTest.java b/solr/core/src/test/org/apache/solr/cloud/DeleteReplicaTest.java
index f70d992..0c0997b 100644
--- a/solr/core/src/test/org/apache/solr/cloud/DeleteReplicaTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/DeleteReplicaTest.java
@@ -207,20 +207,7 @@ public class DeleteReplicaTest extends SolrCloudTestCase {
   @Test
   @Ignore // nocommit: investigate
   public void deleteReplicaFromClusterState() throws Exception {
-    deleteReplicaFromClusterState("false");
-    CollectionAdminRequest.setClusterProperty(ZkStateReader.LEGACY_CLOUD, null).process(cluster.getSolrClient());
-  }
-  
-  @Test
-  @Ignore // nocommit: investigate
-  public void deleteReplicaFromClusterStateLegacy() throws Exception {
-    deleteReplicaFromClusterState("true"); 
-    CollectionAdminRequest.setClusterProperty(ZkStateReader.LEGACY_CLOUD, null).process(cluster.getSolrClient());
-  }
-
-  private void deleteReplicaFromClusterState(String legacyCloud) throws Exception {
-    CollectionAdminRequest.setClusterProperty(ZkStateReader.LEGACY_CLOUD, legacyCloud).process(cluster.getSolrClient());
-    final String collectionName = "deleteFromClusterState_"+legacyCloud;
+    final String collectionName = "deleteFromClusterStateCollection";
     CollectionAdminRequest.createCollection(collectionName, "conf", 1, 3)
         .process(cluster.getSolrClient());
     
@@ -230,7 +217,7 @@ public class DeleteReplicaTest extends SolrCloudTestCase {
 
     Slice shard = getCollectionState(collectionName).getSlice("shard1");
 
-    // don't choose the leader to shutdown, it just complicates things unneccessarily
+    // don't choose the leader to shutdown, it just complicates things unnecessarily
     Replica replica = getRandomReplica(shard, (r) ->
                                        ( r.getState() == Replica.State.ACTIVE &&
                                          ! r.equals(shard.getLeader())));
@@ -277,24 +264,7 @@ public class DeleteReplicaTest extends SolrCloudTestCase {
   @Ignore // nocommit: investigate
   // commented out on: 17-Feb-2019   @BadApple(bugUrl="https://issues.apache.org/jira/browse/SOLR-12028") // annotated on: 24-Dec-2018
   public void raceConditionOnDeleteAndRegisterReplica() throws Exception {
-    raceConditionOnDeleteAndRegisterReplica("false");
-    CollectionAdminRequest.setClusterProperty(ZkStateReader.LEGACY_CLOUD, null).process(cluster.getSolrClient());
-  }
-  
-  @Test
-  @Slow
-  @Ignore // nocommit: investigate
-  // commented out on: 17-Feb-2019   @BadApple(bugUrl="https://issues.apache.org/jira/browse/SOLR-12028") // annotated on: 24-Dec-2018
-  public void raceConditionOnDeleteAndRegisterReplicaLegacy() throws Exception {
-    raceConditionOnDeleteAndRegisterReplica("true");
-    CollectionAdminRequest.setClusterProperty(ZkStateReader.LEGACY_CLOUD, null).process(cluster.getSolrClient());
-  }
-
-  // commented out on: 17-Feb-2019   @BadApple(bugUrl="https://issues.apache.org/jira/browse/SOLR-12028") // annotated on: 24-Dec-2018
-  public void raceConditionOnDeleteAndRegisterReplica(String legacyCloud) throws Exception {
-    
-    CollectionAdminRequest.setClusterProperty(ZkStateReader.LEGACY_CLOUD, legacyCloud).process(cluster.getSolrClient());
-    final String collectionName = "raceDeleteReplica_"+legacyCloud;
+    final String collectionName = "raceDeleteReplicaCollection";
     CollectionAdminRequest.createCollection(collectionName, "conf", 1, 2)
         .process(cluster.getSolrClient());
 
diff --git a/solr/core/src/test/org/apache/solr/cloud/LegacyCloudClusterPropTest.java b/solr/core/src/test/org/apache/solr/cloud/LegacyCloudClusterPropTest.java
deleted file mode 100644
index 8f83b65..0000000
--- a/solr/core/src/test/org/apache/solr/cloud/LegacyCloudClusterPropTest.java
+++ /dev/null
@@ -1,180 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.solr.cloud;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.InputStreamReader;
-import java.io.OutputStreamWriter;
-import java.io.Writer;
-import java.nio.charset.StandardCharsets;
-import java.nio.file.Files;
-import java.nio.file.Path;
-import java.nio.file.Paths;
-import java.util.Properties;
-
-import org.apache.solr.client.solrj.embedded.JettySolrRunner;
-import org.apache.solr.client.solrj.request.CollectionAdminRequest;
-import org.apache.solr.common.cloud.ClusterProperties;
-import org.apache.solr.common.cloud.ClusterStateUtil;
-import org.apache.solr.common.cloud.DocCollection;
-import org.apache.solr.common.cloud.Replica;
-import org.apache.solr.common.cloud.ZkStateReader;
-import org.apache.solr.core.CorePropertiesLocator;
-import org.junit.After;
-import org.junit.BeforeClass;
-import org.junit.Ignore;
-import org.junit.Test;
-
-@Ignore // nocommit debug
-public class LegacyCloudClusterPropTest extends SolrCloudTestCase {
-
-  @BeforeClass
-  public static void setupCluster() throws Exception {
-    useFactory(null);
-    // currently this test is fine with a single shard with a single replica and it's simpler. Could easily be
-    // extended to multiple shards/replicas, but there's no particular need.
-    configureCluster(1)
-        .addConfig("conf", configset("cloud-minimal"))
-        .configure();
-  }
-  
-  @After
-  public void afterTest() throws Exception {
-    cluster.deleteAllCollections();
-  }
-
-
-  // Are all these required?
-  private static String[] requiredProps = {
-      "numShards",
-      "collection.configName",
-      "name",
-      "replicaType",
-      "shard",
-      "collection",
-      "coreNodeName"
-  };
-
-  @Test
-  //2018-06-18 (commented) @BadApple(bugUrl="https://issues.apache.org/jira/browse/SOLR-12028")
-  //Commented 14-Oct-2018 @BadApple(bugUrl="https://issues.apache.org/jira/browse/SOLR-12028") // added 17-Aug-2018
-  // commented out on: 01-Apr-2019   @BadApple(bugUrl="https://issues.apache.org/jira/browse/SOLR-12028") // annotated on: 24-Dec-2018
-  public void testCreateCollectionSwitchLegacyCloud() throws Exception {
-    createAndTest("legacyTrue", true);
-    createAndTest("legacyFalse", false);
-  }
-
-  private void createAndTest(final String coll, final boolean legacy) throws Exception {
-
-    // First, just insure that core.properties file gets created with coreNodeName and all other mandatory parameters.
-    final String legacyString = Boolean.toString(legacy);
-    final String legacyAnti = Boolean.toString(!legacy);
-    CollectionAdminRequest.setClusterProperty(ZkStateReader.LEGACY_CLOUD, legacyString).process(cluster.getSolrClient());
-    ClusterProperties props = new ClusterProperties(zkClient());
-
-    assertEquals("Value of legacyCloud cluster prop unexpected", legacyString,
-        props.getClusterProperty(ZkStateReader.LEGACY_CLOUD, legacyAnti));
-
-    CollectionAdminRequest.createCollection(coll, "conf", 1, 1)
-        .setMaxShardsPerNode(3)
-        .process(cluster.getSolrClient());
-    
-    assertTrue(ClusterStateUtil.waitForAllActiveAndLiveReplicas(cluster.getSolrClient().getZkStateReader(), 120000));
-    
-    // Insure all mandatory properties are there.
-    checkMandatoryProps(coll);
-
-    checkCollectionActive(coll);
-    // The fixes for SOLR-11503 insure that creating a collection has coreNodeName whether legacyCloud is true or false,
-    // we still need to test repairing a properties file that does _not_ have coreNodeName set, the second part of
-    // the fix.
-
-    // First, remove the coreNodeName from cluster.properties and write it out it.
-    removePropertyFromAllReplicas(coll, "coreNodeName");
-
-    // Now restart Solr, this should repair the removal on core load no matter the value of legacyCloud
-    JettySolrRunner jetty = cluster.getJettySolrRunner(0);
-    jetty.stop();
-    
-    cluster.waitForJettyToStop(jetty);
-    
-    jetty.start();
-    
-    cluster.waitForAllNodes(30);
-    
-    checkMandatoryProps(coll);
-    checkCollectionActive(coll);
-  }
-
-  private void checkCollectionActive(String coll) {
-    assertTrue(ClusterStateUtil.waitForAllActiveAndLiveReplicas(cluster.getSolrClient().getZkStateReader(), 120000));
-    DocCollection docColl = getCollectionState(coll);
-    for (Replica rep : docColl.getReplicas()) {
-      if (rep.getState() == Replica.State.ACTIVE) return;
-    }
-    fail("Replica was not active for collection " + coll);
-  }
-  private void removePropertyFromAllReplicas(String coll, String propDel) throws IOException {
-    DocCollection docColl = getCollectionState(coll);
-
-    // First remove the property from all core.properties files
-    for (Replica rep : docColl.getReplicas()) {
-      final String coreName = rep.getCoreName();
-      Properties prop = loadPropFileForReplica(coreName);
-      prop.remove(propDel);
-      JettySolrRunner jetty = cluster.getJettySolrRunner(0);
-      Path expected = Paths.get(jetty.getSolrHome()).toAbsolutePath().resolve(coreName);
-      Path corePropFile = Paths.get(expected.toString(), CorePropertiesLocator.PROPERTIES_FILENAME);
-
-      try (Writer os = new OutputStreamWriter(Files.newOutputStream(corePropFile), StandardCharsets.UTF_8)) {
-        prop.store(os, "");
-      }
-    }
-
-    // Now insure it's really gone
-    for (Replica rep : docColl.getReplicas()) {
-      Properties prop = loadPropFileForReplica(rep.getCoreName());
-      assertEquals("Property " + propDel + " should have been deleted",
-          "bogus", prop.getProperty(propDel, "bogus"));
-    }
-  }
-
-  private Properties loadPropFileForReplica(String coreName) throws IOException {
-    JettySolrRunner jetty = cluster.getJettySolrRunner(0);
-    Path expected = Paths.get(jetty.getSolrHome()).toAbsolutePath().resolve(coreName);
-    Path corePropFile = Paths.get(expected.toString(), CorePropertiesLocator.PROPERTIES_FILENAME);
-    Properties props = new Properties();
-    try (InputStream fis = Files.newInputStream(corePropFile)) {
-      props.load(new InputStreamReader(fis, StandardCharsets.UTF_8));
-    }
-    return props;
-  }
-
-  private void checkMandatoryProps(String coll) throws IOException {
-    DocCollection docColl = getCollectionState(coll);
-    for (Replica rep : docColl.getReplicas()) {
-      Properties prop = loadPropFileForReplica(rep.getCoreName());      for (String testProp : requiredProps) {
-        String propVal = prop.getProperty(testProp, "bogus");
-        if ("bogus".equals(propVal)) {
-          fail("Should have found property " + testProp + " in properties file");
-        }
-      }
-    }
-  }
-}
diff --git a/solr/core/src/test/org/apache/solr/cloud/MigrateRouteKeyTest.java b/solr/core/src/test/org/apache/solr/cloud/MigrateRouteKeyTest.java
index a4cb840..c142a2f 100644
--- a/solr/core/src/test/org/apache/solr/cloud/MigrateRouteKeyTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/MigrateRouteKeyTest.java
@@ -53,11 +53,6 @@ public class MigrateRouteKeyTest extends SolrCloudTestCase {
     configureCluster(2)
         .addConfig("conf", configset("cloud-minimal"))
         .configure();
-
-    if (usually()) {
-      CollectionAdminRequest.setClusterProperty("legacyCloud", "false").process(cluster.getSolrClient());
-      log.info("Using legacyCloud=false for cluster");
-    }
   }
 
   private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
diff --git a/solr/core/src/test/org/apache/solr/cloud/OverseerCollectionConfigSetProcessorTest.java b/solr/core/src/test/org/apache/solr/cloud/OverseerCollectionConfigSetProcessorTest.java
index d78dfa1..60dd661 100644
--- a/solr/core/src/test/org/apache/solr/cloud/OverseerCollectionConfigSetProcessorTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/OverseerCollectionConfigSetProcessorTest.java
@@ -323,8 +323,6 @@ public class OverseerCollectionConfigSetProcessorTest extends SolrTestCaseJ4 {
       when(zkStateReaderMock.getBaseUrlForNodeName(address)).thenAnswer(invocation -> address.replaceAll("_", "/"));
     }
 
-    when(zkStateReaderMock.getClusterProperty("legacyCloud", "false")).thenReturn("false");
-
     when(solrZkClientMock.getZkClientTimeout()).thenReturn(30000);
     
     when(clusterStateMock.hasCollection(anyString())).thenAnswer(invocation -> {
diff --git a/solr/core/src/test/org/apache/solr/cloud/OverseerTest.java b/solr/core/src/test/org/apache/solr/cloud/OverseerTest.java
index 41f1e39..a100735 100644
--- a/solr/core/src/test/org/apache/solr/cloud/OverseerTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/OverseerTest.java
@@ -87,7 +87,6 @@ import org.apache.zookeeper.KeeperException.NodeExistsException;
 import org.apache.zookeeper.KeeperException.SessionExpiredException;
 import org.apache.zookeeper.WatchedEvent;
 import org.apache.zookeeper.Watcher.Event;
-import org.apache.zookeeper.data.Stat;
 import org.apache.zookeeper.proto.WatcherEvent;
 import org.junit.After;
 import org.junit.AfterClass;
@@ -180,16 +179,21 @@ public class OverseerTest extends SolrTestCaseJ4 {
       zkStateReader.close();
     }
 
+    /**
+     * Create a collection.
+     * Note there's a similar but slightly different {@link OverseerTest#createCollection(String, int)}.
+     */
     public void createCollection(String collection, int numShards) throws Exception {
+      // Create collection znode before having ClusterStateUpdater create state.json below it or it will fail.
+      zkClient.makePath(ZkStateReader.COLLECTIONS_ZKNODE + "/" + collection, false, true);
 
       ZkNodeProps m = new ZkNodeProps(Overseer.QUEUE_OPERATION, CollectionParams.CollectionAction.CREATE.toLower(),
           "name", collection,
           ZkStateReader.REPLICATION_FACTOR, "1",
-          ZkStateReader.NUM_SHARDS_PROP, numShards+"",
+          ZkStateReader.NUM_SHARDS_PROP, Integer.toString(numShards),
           "createNodeSet", "");
       ZkDistributedQueue q = MiniSolrCloudCluster.getOpenOverseer(overseers).getStateUpdateQueue();
       q.offer(Utils.toJSON(m));
-
     }
 
     public String publishState(String collection, String coreName, String coreNodeName, String shard, Replica.State stateName, int numShards, boolean startElection, Overseer overseer)
@@ -368,6 +372,23 @@ public class OverseerTest extends SolrTestCaseJ4 {
     super.tearDown();
   }
 
+  /**
+   * This method creates a collection. It is different from {@link MockZKController#createCollection(String, int)} in
+   * the way the {@link ZkDistributedQueue} is obtained.
+   */
+  private void createCollection(String collection, int numShards) throws Exception {
+    // Create collection znode before having ClusterStateUpdater create state.json below it or it will fail.
+    zkClient.makePath(ZkStateReader.COLLECTIONS_ZKNODE + "/" + collection, false, true);
+
+    ZkNodeProps m = new ZkNodeProps(Overseer.QUEUE_OPERATION, CollectionParams.CollectionAction.CREATE.toLower(),
+        "name", collection,
+        ZkStateReader.REPLICATION_FACTOR, "1",
+        ZkStateReader.NUM_SHARDS_PROP, Integer.toString(numShards),
+        "createNodeSet", "");
+    ZkDistributedQueue q = overseers.get(0).getStateUpdateQueue();
+    q.offer(Utils.toJSON(m));
+  }
+
   @Test
   public void testShardAssignment() throws Exception {
 
@@ -375,8 +396,6 @@ public class OverseerTest extends SolrTestCaseJ4 {
     SolrZkClient overseerClient = null;
 
     try {
-
-
       ZkController.createClusterZkNodes(zkClient);
 
       overseerClient = electNewOverseer(server.getZkAddress());
@@ -386,15 +405,9 @@ public class OverseerTest extends SolrTestCaseJ4 {
 
         mockController = new MockZKController(server.getZkAddress(), "127.0.0.1", overseers);
 
-        final int numShards = 6;
+        final int numShards = 6; // this is not the number of shards in the collection
 
-        ZkNodeProps m = new ZkNodeProps(Overseer.QUEUE_OPERATION, CollectionParams.CollectionAction.CREATE.toLower(),
-            "name", COLLECTION,
-            ZkStateReader.REPLICATION_FACTOR, "1",
-            ZkStateReader.NUM_SHARDS_PROP, "3",
-            "createNodeSet", "");
-        ZkDistributedQueue q = overseers.get(0).getStateUpdateQueue();
-        q.offer(Utils.toJSON(m));
+        createCollection(COLLECTION, 3);
 
         for (int i = 0; i < numShards; i++) {
           assertNotNull("shard got no id?", mockController.publishState(COLLECTION, "core" + (i + 1), "node" + (i + 1), "shard" + ((i % 3) + 1), Replica.State.ACTIVE, 3, true, overseers.get(0)));
@@ -572,14 +585,9 @@ public class OverseerTest extends SolrTestCaseJ4 {
 
       ZkDistributedQueue q = overseers.get(0).getStateUpdateQueue();
 
-      ZkNodeProps m = new ZkNodeProps(Overseer.QUEUE_OPERATION, CollectionParams.CollectionAction.CREATE.toLower(),
-          "name", COLLECTION,
-          ZkStateReader.REPLICATION_FACTOR, "1",
-          ZkStateReader.NUM_SHARDS_PROP, "1",
-          "createNodeSet", "");
-      q.offer(Utils.toJSON(m));
+      createCollection(COLLECTION, 1);
 
-      m = new ZkNodeProps(Overseer.QUEUE_OPERATION, OverseerAction.STATE.toLower(),
+      ZkNodeProps m = new ZkNodeProps(Overseer.QUEUE_OPERATION, OverseerAction.STATE.toLower(),
           ZkStateReader.BASE_URL_PROP, "http://127.0.0.1/solr",
           ZkStateReader.NODE_NAME_PROP, "node1",
           ZkStateReader.COLLECTION_PROP, COLLECTION,
@@ -818,31 +826,19 @@ public class OverseerTest extends SolrTestCaseJ4 {
       reader = new ZkStateReader(zkClient);
       reader.createClusterStateWatchersAndUpdate();
 
-      // We did not create /collections -> this message will cause exception when Overseer try to flush the clusterstate
+      // We did not create /collections/collection1 -> this message will cause exception when Overseer tries to flush
+      // the collection state
       ZkNodeProps badMessage = new ZkNodeProps(Overseer.QUEUE_OPERATION, CollectionParams.CollectionAction.CREATE.toLower(),
           "name", "collection1",
           ZkStateReader.REPLICATION_FACTOR, "1",
           ZkStateReader.NUM_SHARDS_PROP, "1",
-          DocCollection.STATE_FORMAT, "2",
-          "createNodeSet", "");
-      ZkNodeProps goodMessage = new ZkNodeProps(Overseer.QUEUE_OPERATION, CollectionParams.CollectionAction.CREATE.toLower(),
-          "name", "collection2",
-          ZkStateReader.REPLICATION_FACTOR, "1",
-          ZkStateReader.NUM_SHARDS_PROP, "1",
-          DocCollection.STATE_FORMAT, "1",
           "createNodeSet", "");
       ZkDistributedQueue workQueue = Overseer.getInternalWorkQueue(zkClient, new Stats());
       workQueue.offer(Utils.toJSON(badMessage));
-      workQueue.offer(Utils.toJSON(goodMessage));
       overseerClient = electNewOverseer(server.getZkAddress());
-      waitForCollections(reader, "collection2");
 
       ZkDistributedQueue q = getOpenOverseer().getStateUpdateQueue();
       q.offer(Utils.toJSON(badMessage));
-      q.offer(Utils.toJSON(goodMessage.plus("name", "collection3")));
-      waitForCollections(reader, "collection2", "collection3");
-      assertNotNull(reader.getClusterState().getCollectionOrNull("collection2"));
-      assertNotNull(reader.getClusterState().getCollectionOrNull("collection3"));
 
       TimeOut timeOut = new TimeOut(10, TimeUnit.SECONDS, TimeSource.NANO_TIME);
       while(!timeOut.hasTimedOut()) {
@@ -899,6 +895,9 @@ public class OverseerTest extends SolrTestCaseJ4 {
 
      electNewOverseer(server.getZkAddress());
 
+      // Create collection znode before repeatedly trying to enqueue the Cluster state change message
+      zkClient.makePath(ZkStateReader.COLLECTIONS_ZKNODE + "/" + COLLECTION, false, true);
+
       for (int i = 0; i < atLeast(4); i++) {
         killCounter.incrementAndGet(); // for each round allow 1 kill
 
@@ -907,7 +906,14 @@ public class OverseerTest extends SolrTestCaseJ4 {
         TimeOut timeout = new TimeOut(10, TimeUnit.SECONDS, TimeSource.NANO_TIME);
         while (!timeout.hasTimedOut()) {
           try {
-            mockController.createCollection(COLLECTION, 1);
+            // We must only retry the enqueue to Overseer, not the collection znode creation (that doesn't depend on Overseer)
+            ZkNodeProps m = new ZkNodeProps(Overseer.QUEUE_OPERATION, CollectionParams.CollectionAction.CREATE.toLower(),
+                "name", COLLECTION,
+                ZkStateReader.REPLICATION_FACTOR, "1",
+                ZkStateReader.NUM_SHARDS_PROP, "1",
+                "createNodeSet", "");
+            ZkDistributedQueue q = MiniSolrCloudCluster.getOpenOverseer(overseers).getStateUpdateQueue();
+            q.offer(Utils.toJSON(m));
             break;
           } catch (SolrException | KeeperException | AlreadyClosedException e) {
             e.printStackTrace();
@@ -1079,19 +1085,20 @@ public class OverseerTest extends SolrTestCaseJ4 {
     try {
 
       ZkController.createClusterZkNodes(zkClient);
+      overseerClient = electNewOverseer(server.getZkAddress());
 
       reader = new ZkStateReader(zkClient);
       reader.createClusterStateWatchersAndUpdate();
 
       mockController = new MockZKController(server.getZkAddress(), "node1", overseers);
 
-      final int MAX_COLLECTIONS = 10, MAX_CORES = 10, MAX_STATE_CHANGES = 20000, STATE_FORMAT = 2;
+      final int MAX_COLLECTIONS = 10, MAX_CORES = 10, MAX_STATE_CHANGES = 20000;
 
       for (int i=0; i<MAX_COLLECTIONS; i++)  {
+        zkClient.makePath("/collections/perf" + i, false, true);
         ZkNodeProps m = new ZkNodeProps(Overseer.QUEUE_OPERATION, CollectionParams.CollectionAction.CREATE.toLower(),
             "name", "perf" + i,
             ZkStateReader.NUM_SHARDS_PROP, "1",
-            "stateFormat", String.valueOf(STATE_FORMAT),
             ZkStateReader.REPLICATION_FACTOR, "1",
             ZkStateReader.MAX_SHARDS_PER_NODE, "1"
             );
@@ -1105,11 +1112,11 @@ public class OverseerTest extends SolrTestCaseJ4 {
             ZkStateReader.STATE_PROP, Replica.State.RECOVERING.toString(),
             ZkStateReader.NODE_NAME_PROP,  "node1",
             ZkStateReader.CORE_NAME_PROP, "core" + k,
+            ZkStateReader.SHARD_ID_PROP, "shard1",
             ZkStateReader.CORE_NODE_NAME_PROP, "node1",
             ZkStateReader.COLLECTION_PROP, "perf" + j,
             ZkStateReader.NUM_SHARDS_PROP, "1",
-            ZkStateReader.BASE_URL_PROP, "http://" +  "node1"
-            + "/solr/");
+            ZkStateReader.BASE_URL_PROP, "http://" + "node1" + "/solr/");
         ZkDistributedQueue q = overseers.get(0).getStateUpdateQueue();
         q.offer(Utils.toJSON(m));
         if (j >= MAX_COLLECTIONS - 1) j = 0;
@@ -1117,30 +1124,13 @@ public class OverseerTest extends SolrTestCaseJ4 {
         if (i > 0 && i % 100 == 0) log.info("Published {} items", i);
       }
 
-      // let's publish a sentinel collection which we'll use to wait for overseer to complete operations
-      ZkNodeProps m = new ZkNodeProps(Overseer.QUEUE_OPERATION, OverseerAction.STATE.toLower(),
-          ZkStateReader.STATE_PROP, Replica.State.ACTIVE.toString(),
-          ZkStateReader.NODE_NAME_PROP, "node1",
-          ZkStateReader.CORE_NAME_PROP, "core1",
-          ZkStateReader.CORE_NODE_NAME_PROP, "node1",
-          ZkStateReader.COLLECTION_PROP, "perf_sentinel",
-          ZkStateReader.NUM_SHARDS_PROP, "1",
-          ZkStateReader.BASE_URL_PROP, "http://" + "node1"
-          + "/solr/");
-      ZkDistributedQueue q = overseers.get(0).getStateUpdateQueue();
-      q.offer(Utils.toJSON(m));
+      // let's create a sentinel collection which we'll use to wait for overseer to complete operations
+      createCollection("perf_sentinel", 1);
 
       Timer t = new Timer();
       Timer.Context context = t.time();
-      try {
-        overseerClient = electNewOverseer(server.getZkAddress());
-        assertTrue(overseers.size() > 0);
-
-        reader.waitForState("perf_sentinel", 15000, TimeUnit.MILLISECONDS, (liveNodes, collectionState) -> collectionState != null);
-
-      } finally {
-        context.stop();
-      }
+      reader.waitForState("perf_sentinel", 15000, TimeUnit.MILLISECONDS, (liveNodes, collectionState) -> collectionState != null);
+      context.stop();
 
       log.info("Overseer loop finished processing: ");
       printTimingStats(t);
@@ -1210,6 +1200,8 @@ public class OverseerTest extends SolrTestCaseJ4 {
       //prepopulate work queue with some items to emulate previous overseer died before persisting state
       DistributedQueue queue = Overseer.getInternalWorkQueue(zkClient, new Stats());
 
+      zkClient.makePath(ZkStateReader.COLLECTIONS_ZKNODE + "/" + COLLECTION, false, true);
+
       ZkNodeProps m = new ZkNodeProps(Overseer.QUEUE_OPERATION, CollectionParams.CollectionAction.CREATE.toLower(),
           "name", COLLECTION,
           ZkStateReader.REPLICATION_FACTOR, "1",
@@ -1271,8 +1263,6 @@ public class OverseerTest extends SolrTestCaseJ4 {
 
       ZkController.createClusterZkNodes(zkClient);
 
-      zkClient.create("/collections/test", null, CreateMode.PERSISTENT, true);
-
       reader = new ZkStateReader(zkClient);
       reader.createClusterStateWatchersAndUpdate();
 
@@ -1280,15 +1270,9 @@ public class OverseerTest extends SolrTestCaseJ4 {
 
       ZkDistributedQueue q = overseers.get(0).getStateUpdateQueue();
 
+      createCollection("c1", 1);
 
-      ZkNodeProps m = new ZkNodeProps(Overseer.QUEUE_OPERATION, CollectionParams.CollectionAction.CREATE.toLower(),
-          "name", "c1",
-          ZkStateReader.REPLICATION_FACTOR, "1",
-          ZkStateReader.NUM_SHARDS_PROP, "1",
-          "createNodeSet", "");
-      q.offer(Utils.toJSON(m));
-
-      m = new ZkNodeProps(Overseer.QUEUE_OPERATION, OverseerAction.STATE.toLower(),
+      ZkNodeProps m = new ZkNodeProps(Overseer.QUEUE_OPERATION, OverseerAction.STATE.toLower(),
           ZkStateReader.BASE_URL_PROP, "http://127.0.0.1/solr",
           ZkStateReader.SHARD_ID_PROP, "shard1",
           ZkStateReader.NODE_NAME_PROP, "node1",
@@ -1326,28 +1310,33 @@ public class OverseerTest extends SolrTestCaseJ4 {
 
       q.offer(Utils.toJSON(m));
 
-      Stat stat = new Stat();
-      byte[] data = zkClient.getData("/clusterstate.json", null, stat);
-      // Simulate an external modification
-      zkClient.setData("/clusterstate.json", data, true);
+      final String testCollectionName = "test";
+      zkClient.makePath(ZkStateReader.COLLECTIONS_ZKNODE + "/" + testCollectionName, false, true);
 
       m = new ZkNodeProps(Overseer.QUEUE_OPERATION, CollectionParams.CollectionAction.CREATE.toLower(),
-          "name", "test",
+          "name", testCollectionName,
           ZkStateReader.NUM_SHARDS_PROP, "1",
-          ZkStateReader.REPLICATION_FACTOR, "1",
-          DocCollection.STATE_FORMAT, "2"
+          ZkStateReader.REPLICATION_FACTOR, "1"
       );
       q.offer(Utils.toJSON(m));
 
+      // Wait for the overseer to create state.json for the collection
+      waitForCollections(reader, testCollectionName);
+
+      final String path = ZkStateReader.COLLECTIONS_ZKNODE + "/" + testCollectionName + "/state.json";
+      byte[] data = zkClient.getData(path, null, null);
+      // Simulate an external modification of state.json
+      zkClient.setData(path, data, true);
+
       m = new ZkNodeProps(Overseer.QUEUE_OPERATION, CollectionParams.CollectionAction.CREATESHARD.toLower(),
-          "collection", "test",
+          "collection", testCollectionName,
           ZkStateReader.SHARD_ID_PROP, "x",
           ZkStateReader.REPLICATION_FACTOR, "1"
       );
       q.offer(Utils.toJSON(m));
 
       m = new ZkNodeProps(Overseer.QUEUE_OPERATION, CollectionParams.CollectionAction.ADDREPLICA.toLower(),
-          "collection", "test",
+          "collection", testCollectionName,
           ZkStateReader.SHARD_ID_PROP, "x",
           ZkStateReader.BASE_URL_PROP, "http://127.0.0.1/solr",
           ZkStateReader.CORE_NODE_NAME_PROP, "core_node1",
@@ -1357,8 +1346,9 @@ public class OverseerTest extends SolrTestCaseJ4 {
       );
       q.offer(Utils.toJSON(m));
 
-      waitForCollections(reader, "test");
-      verifyReplicaStatus(reader, "test", "x", "core_node1", Replica.State.DOWN);
+      // Verify replica creation worked ok in spite of external update of state.json (although in theory such updates
+      // do not happen unless an old overseer is still updating ZK after a new Overseer got elected...).
+      verifyReplicaStatus(reader, testCollectionName, "x", "core_node1", Replica.State.DOWN);
 
       waitForCollections(reader, "c1");
       verifyReplicaStatus(reader, "c1", "shard1", "core_node1", Replica.State.ACTIVE);
@@ -1475,6 +1465,7 @@ public class OverseerTest extends SolrTestCaseJ4 {
       // create collection
       {
         final Integer maxShardsPerNode = numReplicas * numShards;
+        zkClient.makePath(ZkStateReader.COLLECTIONS_ZKNODE + "/" + COLLECTION, false, true);
         ZkNodeProps m = new ZkNodeProps(Overseer.QUEUE_OPERATION, CollectionParams.CollectionAction.CREATE.toLower(),
             "name", COLLECTION,
             ZkStateReader.NUM_SHARDS_PROP, numShards.toString(),
diff --git a/solr/core/src/test/org/apache/solr/cloud/ShardRoutingCustomTest.java b/solr/core/src/test/org/apache/solr/cloud/ShardRoutingCustomTest.java
index e1b8641..1531821 100644
--- a/solr/core/src/test/org/apache/solr/cloud/ShardRoutingCustomTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/ShardRoutingCustomTest.java
@@ -58,8 +58,6 @@ public class ShardRoutingCustomTest extends AbstractFullDistribZkTestBase {
   private void doCustomSharding() throws Exception {
     printLayout();
 
-  
-
     File jettyDir = createTempDir("jetty").toFile();
     jettyDir.mkdirs();
     setupJettySolrHome(jettyDir);
@@ -67,7 +65,6 @@ public class ShardRoutingCustomTest extends AbstractFullDistribZkTestBase {
     j.start();
     assertEquals(0, CollectionAdminRequest
         .createCollection(DEFAULT_COLLECTION, "conf1", 1, 1)
-        .setStateFormat(Integer.parseInt(getStateFormat()))
         .setCreateNodeSet("")
         .process(cloudClient).getStatus());
     assertTrue(CollectionAdminRequest
diff --git a/solr/core/src/test/org/apache/solr/cloud/SharedFSAutoReplicaFailoverTest.java b/solr/core/src/test/org/apache/solr/cloud/SharedFSAutoReplicaFailoverTest.java
index 5c00416..64d59d6 100644
--- a/solr/core/src/test/org/apache/solr/cloud/SharedFSAutoReplicaFailoverTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/SharedFSAutoReplicaFailoverTest.java
@@ -101,11 +101,6 @@ public class SharedFSAutoReplicaFailoverTest extends AbstractFullDistribZkTestBa
   public void setUp() throws Exception {
     super.setUp();
     collectionUlogDirMap.clear();
-    if (random().nextBoolean()) {
-      CollectionAdminRequest.setClusterProperty("legacyCloud", "false").process(cloudClient);
-    } else {
-      CollectionAdminRequest.setClusterProperty("legacyCloud", "true").process(cloudClient);
-    }
   }
   
   @Override
diff --git a/solr/core/src/test/org/apache/solr/cloud/SliceStateTest.java b/solr/core/src/test/org/apache/solr/cloud/SliceStateTest.java
index 5f8f17e..fcaf2c9 100644
--- a/solr/core/src/test/org/apache/solr/cloud/SliceStateTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/SliceStateTest.java
@@ -53,9 +53,9 @@ public class SliceStateTest extends SolrTestCaseJ4 {
     slices.put("shard1", slice);
     collectionStates.put("collection1", new DocCollection("collection1", slices, null, DocRouter.DEFAULT));
 
-    ClusterState clusterState = new ClusterState(-1,liveNodes, collectionStates);
+    ClusterState clusterState = new ClusterState(liveNodes, collectionStates);
     byte[] bytes = Utils.toJSON(clusterState);
-    ClusterState loadedClusterState = ClusterState.load(-1, bytes, liveNodes);
+    ClusterState loadedClusterState = ClusterState.createFromJson(-1, bytes, liveNodes);
 
     assertSame("Default state not set to active", Slice.State.ACTIVE, loadedClusterState.getCollection("collection1").getSlice("shard1").getState());
   }
diff --git a/solr/core/src/test/org/apache/solr/cloud/TestClusterProperties.java b/solr/core/src/test/org/apache/solr/cloud/TestClusterProperties.java
index c082e37..aa272e5 100644
--- a/solr/core/src/test/org/apache/solr/cloud/TestClusterProperties.java
+++ b/solr/core/src/test/org/apache/solr/cloud/TestClusterProperties.java
@@ -41,13 +41,13 @@ public class TestClusterProperties extends SolrCloudTestCase {
 
   @Test
   public void testClusterProperties() throws Exception {
-    assertEquals("false", props.getClusterProperty(ZkStateReader.LEGACY_CLOUD, "false"));
+    assertEquals("false", props.getClusterProperty(ZkStateReader.AUTO_ADD_REPLICAS, "false"));
 
-    CollectionAdminRequest.setClusterProperty(ZkStateReader.LEGACY_CLOUD, "true").process(cluster.getSolrClient());
-    assertEquals("true", props.getClusterProperty(ZkStateReader.LEGACY_CLOUD, "false"));
+    CollectionAdminRequest.setClusterProperty(ZkStateReader.AUTO_ADD_REPLICAS, "true").process(cluster.getSolrClient());
+    assertEquals("true", props.getClusterProperty(ZkStateReader.AUTO_ADD_REPLICAS, "false"));
 
-    CollectionAdminRequest.setClusterProperty(ZkStateReader.LEGACY_CLOUD, "false").process(cluster.getSolrClient());
-    assertEquals("false", props.getClusterProperty(ZkStateReader.LEGACY_CLOUD, "true"));
+    CollectionAdminRequest.setClusterProperty(ZkStateReader.AUTO_ADD_REPLICAS, "false").process(cluster.getSolrClient());
+    assertEquals("false", props.getClusterProperty(ZkStateReader.AUTO_ADD_REPLICAS, "true"));
   }
   
   @Test
diff --git a/solr/core/src/test/org/apache/solr/cloud/TestPullReplica.java b/solr/core/src/test/org/apache/solr/cloud/TestPullReplica.java
index 5441dea..529b14e 100644
--- a/solr/core/src/test/org/apache/solr/cloud/TestPullReplica.java
+++ b/solr/core/src/test/org/apache/solr/cloud/TestPullReplica.java
@@ -86,11 +86,6 @@ public class TestPullReplica extends SolrCloudTestCase {
    configureCluster(2) // 2 + random().nextInt(3)
         .addConfig("conf", configset("cloud-minimal"))
         .configure();
-    Boolean useLegacyCloud = rarely();
-    log.info("Using legacyCloud?: {}", useLegacyCloud);
-    CollectionAdminRequest.ClusterProp clusterPropRequest = CollectionAdminRequest.setClusterProperty(ZkStateReader.LEGACY_CLOUD, String.valueOf(useLegacyCloud));
-    CollectionAdminResponse response = clusterPropRequest.process(cluster.getSolrClient());
-    assertEquals(0, response.getStatus());
   }
 
   @AfterClass
diff --git a/solr/core/src/test/org/apache/solr/cloud/TestPullReplicaErrorHandling.java b/solr/core/src/test/org/apache/solr/cloud/TestPullReplicaErrorHandling.java
index 4c4727d..a6a5b67 100644
--- a/solr/core/src/test/org/apache/solr/cloud/TestPullReplicaErrorHandling.java
+++ b/solr/core/src/test/org/apache/solr/cloud/TestPullReplicaErrorHandling.java
@@ -37,7 +37,6 @@ import org.apache.solr.client.solrj.cloud.SocketProxy;
 import org.apache.solr.client.solrj.embedded.JettySolrRunner;
 import org.apache.solr.client.solrj.impl.Http2SolrClient;
 import org.apache.solr.client.solrj.request.CollectionAdminRequest;
-import org.apache.solr.client.solrj.response.CollectionAdminResponse;
 import org.apache.solr.common.SolrException;
 import org.apache.solr.common.SolrInputDocument;
 import org.apache.solr.common.cloud.CollectionStatePredicate;
@@ -94,20 +93,6 @@ public class TestPullReplicaErrorHandling extends SolrCloudTestCase {
       proxies.put(proxy.getUrl(), proxy);
       jettys.put(proxy.getUrl(), jetty);
     }
-    TimeOut t = new TimeOut(10, TimeUnit.SECONDS, TimeSource.NANO_TIME);
-    while (true) {
-      try {
-        CollectionAdminRequest.ClusterProp clusterPropRequest = CollectionAdminRequest.setClusterProperty(ZkStateReader.LEGACY_CLOUD, "false");
-        CollectionAdminResponse response = clusterPropRequest.process(cluster.getSolrClient());
-        assertEquals(0, response.getStatus());
-        break;
-      } catch (SolrServerException e) {
-        Thread.sleep(50);
-        if (t.hasTimedOut()) {
-          throw e;
-        }
-      }
-    }
   }
   
   @AfterClass
diff --git a/solr/core/src/test/org/apache/solr/cloud/TestTlogReplica.java b/solr/core/src/test/org/apache/solr/cloud/TestTlogReplica.java
index 11f439c..fa66f0d 100644
--- a/solr/core/src/test/org/apache/solr/cloud/TestTlogReplica.java
+++ b/solr/core/src/test/org/apache/solr/cloud/TestTlogReplica.java
@@ -97,11 +97,6 @@ public class TestTlogReplica extends SolrCloudTestCase {
     configureCluster(2) // 2 + random().nextInt(3)
         .addConfig("conf", configset("cloud-minimal-inplace-updates"))
         .configure();
-    Boolean useLegacyCloud = rarely();
-    log.info("Using legacyCloud?: {}", useLegacyCloud);
-    CollectionAdminRequest.ClusterProp clusterPropRequest = CollectionAdminRequest.setClusterProperty(ZkStateReader.LEGACY_CLOUD, String.valueOf(useLegacyCloud));
-    CollectionAdminResponse response = clusterPropRequest.process(cluster.getSolrClient());
-    assertEquals(0, response.getStatus());
   }
 
   @AfterClass
diff --git a/solr/core/src/test/org/apache/solr/cloud/TestZkChroot.java b/solr/core/src/test/org/apache/solr/cloud/TestZkChroot.java
deleted file mode 100644
index ee38997..0000000
--- a/solr/core/src/test/org/apache/solr/cloud/TestZkChroot.java
+++ /dev/null
@@ -1,157 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.solr.cloud;
-
-import java.nio.file.Path;
-import java.nio.file.Paths;
-
-import org.apache.solr.SolrJettyTestBase;
-import org.apache.solr.SolrTestCaseJ4;
-import org.apache.solr.common.cloud.SolrZkClient;
-import org.apache.solr.common.cloud.ZkConfigManager;
-import org.apache.solr.common.cloud.ZooKeeperException;
-import org.apache.solr.core.CoreContainer;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Ignore;
-import org.junit.Test;
-
-@Ignore // nocommit
-public class TestZkChroot extends SolrTestCaseJ4 {
-  protected CoreContainer cores = null;
-  private Path home;
-  
-  protected ZkTestServer zkServer;
-  protected Path zkDir;
-  
-  @Override
-  @Before
-  public void setUp() throws Exception {
-    super.setUp();
-
-    zkDir = createTempDir("zkData");
-    zkServer = new ZkTestServer(zkDir);
-    zkServer.run();
-    home = Paths.get(SolrJettyTestBase.legacyExampleCollection1SolrHome());
-    
-  }
-  
-  @Override
-  @After
-  public void tearDown() throws Exception {
-    System.clearProperty("zkHost");
-    
-    if (cores != null) {
-      cores.shutdown();
-      cores = null;
-    }
-    
-    if (null != zkServer) {
-      zkServer.shutdown();
-      zkServer = null;
-    }
-    zkDir = null;
-    
-    super.tearDown();
-  }
-  
-  @Test
-  public void testChrootBootstrap() throws Exception {
-    String chroot = "/foo/bar";
-    
-    System.setProperty("bootstrap_conf", "true");
-    System.setProperty("zkHost", zkServer.getZkHost() + chroot);
-    SolrZkClient zkClient = null;
-    SolrZkClient zkClient2 = null;
-    
-    try {
-      cores = CoreContainer.createAndLoad(home);
-      zkClient = cores.getZkController().getZkClient();
-      
-      assertTrue(zkClient.exists("/clusterstate.json"));
-      assertFalse(zkClient.exists(chroot + "/clusterstate.json"));
-      
-      zkClient2 = new SolrZkClient(zkServer.getZkHost(),
-          AbstractZkTestCase.TIMEOUT);
-      zkClient2.start();
-      assertTrue(zkClient2.exists(chroot + "/clusterstate.json"));
-      assertFalse(zkClient2.exists("/clusterstate.json"));
-    } finally {
-      if (zkClient != null) zkClient.close();
-      if (zkClient2 != null) zkClient2.close();
-    }
-  }
-  
-  @Test
-  @Ignore // nocommit debug
-  public void testNoBootstrapConf() throws Exception {
-    String chroot = "/foo/bar2";
-    
-    System.setProperty("bootstrap_conf", "false");
-    System.setProperty("zkHost", zkServer.getZkHost() + chroot);
-
-    try (SolrZkClient zkClient = new SolrZkClient(zkServer.getZkHost(), AbstractZkTestCase.TIMEOUT).start()) {
-      expectThrows(ZooKeeperException.class,
-          "did not get a top level exception when more then 4 updates failed",
-          () -> {
-        assertFalse("Path '" + chroot + "' should not exist before the test",
-            zkClient.exists(chroot));
-        cores = CoreContainer.createAndLoad(home);
-      });
-      assertFalse("Path shouldn't have been created",
-          zkClient.exists(chroot));// check the path was not created
-    }
-  }
-  
-  @Test
-  public void testWithUploadDir() throws Exception {
-    String chroot = "/foo/bar3";
-    String configName = "testWithUploadDir";
-
-    System.setProperty("bootstrap_conf", "false");
-    System.setProperty("bootstrap_confdir", home + "/collection1/conf");
-    System.setProperty("collection.configName", configName);
-    System.setProperty("zkHost", zkServer.getZkHost() + chroot);
-
-    SolrZkClient zkClient = zkServer.getZkClient();
-    assertFalse("Path '" + chroot + "' should not exist before the test",
-            zkClient.exists(chroot));
-    cores = CoreContainer.createAndLoad(home);
-    assertTrue(
-            "solrconfig.xml should have been uploaded to zk to the correct config directory",
-            zkClient.exists(chroot + ZkConfigManager.CONFIGS_ZKNODE + "/"
-                    + configName + "/solrconfig.xml"));
-
-  }
-  
-  @Test
-  public void testInitPathExists() throws Exception {
-    String chroot = "/foo/bar4";
-
-    System.setProperty("bootstrap_conf", "true");
-    System.setProperty("zkHost", zkServer.getZkHost() + chroot);
-
-    SolrZkClient zkClient = zkServer.getZkClient();
-    zkClient.mkdir("/foo/bar4");
-    assertTrue(zkClient.exists(chroot));
-    assertFalse(zkClient.exists(chroot + "/clusterstate.json"));
-
-    cores = CoreContainer.createAndLoad(home);
-    assertTrue(zkClient.exists(chroot + "/clusterstate.json"));
-
-  }
-}
diff --git a/solr/core/src/test/org/apache/solr/cloud/ZkControllerTest.java b/solr/core/src/test/org/apache/solr/cloud/ZkControllerTest.java
index 4536abf..27c96b7 100644
--- a/solr/core/src/test/org/apache/solr/cloud/ZkControllerTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/ZkControllerTest.java
@@ -28,7 +28,6 @@ import java.util.concurrent.atomic.AtomicReference;
 import org.apache.lucene.util.LuceneTestCase.Slow;
 import org.apache.solr.SolrTestCaseJ4;
 import org.apache.solr.common.cloud.ClusterProperties;
-import org.apache.solr.common.cloud.DocCollection;
 import org.apache.solr.common.cloud.SolrZkClient;
 import org.apache.solr.common.cloud.ZkConfigManager;
 import org.apache.solr.common.cloud.ZkNodeProps;
@@ -285,7 +284,7 @@ public class ZkControllerTest extends SolrTestCaseJ4 {
 
         ZkNodeProps m = new ZkNodeProps(Overseer.QUEUE_OPERATION,
             CollectionParams.CollectionAction.CREATE.toLower(), ZkStateReader.NODE_NAME_PROP, nodeName, ZkStateReader.NUM_SHARDS_PROP, "1",
-            "name", collectionName, DocCollection.STATE_FORMAT, "2");
+            "name", collectionName);
         zkController.getOverseerJobQueue().offer(Utils.toJSON(m));
 
         HashMap<String, Object> propMap = new HashMap<>();
diff --git a/solr/core/src/test/org/apache/solr/cloud/api/collections/AbstractCloudBackupRestoreTestCase.java b/solr/core/src/test/org/apache/solr/cloud/api/collections/AbstractCloudBackupRestoreTestCase.java
index c7fc320..2eafe73 100644
--- a/solr/core/src/test/org/apache/solr/cloud/api/collections/AbstractCloudBackupRestoreTestCase.java
+++ b/solr/core/src/test/org/apache/solr/cloud/api/collections/AbstractCloudBackupRestoreTestCase.java
@@ -421,8 +421,6 @@ public abstract class AbstractCloudBackupRestoreTestCase extends SolrCloudTestCa
       assertEquals(restoreCollectionName, backupCollection.getMaxShardsPerNode(), restoreCollection.getMaxShardsPerNode());
     }
 
-    assertEquals("Restore collection should use stateFormat=2", 2, restoreCollection.getStateFormat());
-
     //SOLR-12605: Add more docs after restore is complete to see if they are getting added fine
     //explicitly querying the leaders. If we use CloudSolrClient there is no guarantee that we'll hit a nrtReplica
     {
diff --git a/solr/core/src/test/org/apache/solr/cloud/api/collections/CollectionsAPIAsyncDistributedZkTest.java b/solr/core/src/test/org/apache/solr/cloud/api/collections/CollectionsAPIAsyncDistributedZkTest.java
index afdb676..2a9174e 100644
--- a/solr/core/src/test/org/apache/solr/cloud/api/collections/CollectionsAPIAsyncDistributedZkTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/api/collections/CollectionsAPIAsyncDistributedZkTest.java
@@ -42,6 +42,8 @@ import org.apache.solr.common.SolrInputDocument;
 import org.apache.solr.common.cloud.Replica;
 import org.apache.solr.common.cloud.Slice;
 import org.apache.solr.common.cloud.ZkStateReader;
+import org.apache.solr.common.util.ExecutorUtil;
+import org.apache.solr.common.util.SolrNamedThreadFactory;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Ignore;
@@ -100,13 +102,6 @@ public class CollectionsAPIAsyncDistributedZkTest extends SolrCloudTestCase {
 
   @Test
   public void testAsyncRequests() throws Exception {
-    boolean legacy = random().nextBoolean();
-    if (legacy) {
-      CollectionAdminRequest.setClusterProperty(ZkStateReader.LEGACY_CLOUD, "true").process(cluster.getSolrClient());
-    } else {
-      CollectionAdminRequest.setClusterProperty(ZkStateReader.LEGACY_CLOUD, "false").process(cluster.getSolrClient());
-    }
-    
     final String collection = "testAsyncOperations";
     final CloudHttp2SolrClient client = cluster.getSolrClient();
 
@@ -212,11 +207,9 @@ public class CollectionsAPIAsyncDistributedZkTest extends SolrCloudTestCase {
       .processAndWait(client, MAX_TIMEOUT_SECONDS);
     assertSame("DeleteReplica did not complete", RequestStatusState.COMPLETED, state);
 
-    if (!legacy) {
-      state = CollectionAdminRequest.deleteCollection(collection)
-          .processAndWait(client, MAX_TIMEOUT_SECONDS);
-      assertSame("DeleteCollection did not complete", RequestStatusState.COMPLETED, state);
-    }
+    state = CollectionAdminRequest.deleteCollection(collection)
+        .processAndWait(client, MAX_TIMEOUT_SECONDS);
+    assertSame("DeleteCollection did not complete", RequestStatusState.COMPLETED, state);
   }
 
   public void testAsyncIdRaceCondition() throws Exception {
diff --git a/solr/core/src/test/org/apache/solr/cloud/api/collections/ShardSplitTest.java b/solr/core/src/test/org/apache/solr/cloud/api/collections/ShardSplitTest.java
index 7870d38..4cd28af 100644
--- a/solr/core/src/test/org/apache/solr/cloud/api/collections/ShardSplitTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/api/collections/ShardSplitTest.java
@@ -104,11 +104,6 @@ public class ShardSplitTest extends SolrCloudBridgeTestCase {
   @Nightly
   public void test() throws Exception {
 
-    if (TEST_NIGHTLY && usually()) {
-      log.info("Using legacyCloud=false for cluster");
-      CollectionAdminRequest.setClusterProperty(ZkStateReader.LEGACY_CLOUD, "false")
-              .process(cloudClient);
-    }
     incompleteOrOverlappingCustomRangeTest();
     splitByUniqueKeyTest();
     splitByRouteFieldTest();
@@ -413,10 +408,6 @@ public class ShardSplitTest extends SolrCloudBridgeTestCase {
   @Nightly
   public void testSplitWithChaosMonkey() throws Exception {
 
-    log.info("Using legacyCloud=false for cluster");
-    CollectionAdminRequest.setClusterProperty(ZkStateReader.LEGACY_CLOUD, "false")
-            .process(cloudClient);
-
     List<StoppableIndexingThread> indexers = new ArrayList<>();
     try {
       for (int i = 0; i < 1; i++) {
@@ -633,12 +624,6 @@ public class ShardSplitTest extends SolrCloudBridgeTestCase {
   }
 
   private void doSplitShardWithRule(SolrIndexSplitter.SplitMethod splitMethod) throws Exception {
-    if (TEST_NIGHTLY && usually()) {
-      log.info("Using legacyCloud=false for cluster");
-      CollectionAdminRequest.setClusterProperty(ZkStateReader.LEGACY_CLOUD, "false")
-              .process(cloudClient);
-    }
-
     log.info("Starting testSplitShardWithRule");
     String collectionName = "shardSplitWithRule_" + splitMethod.toLower();
     CollectionAdminRequest.Create createRequest = CollectionAdminRequest.createCollection(collectionName, "_default", 1, 2)
diff --git a/solr/core/src/test/org/apache/solr/cloud/api/collections/SimpleCollectionCreateDeleteTest.java b/solr/core/src/test/org/apache/solr/cloud/api/collections/SimpleCollectionCreateDeleteTest.java
index 2a6ac24..d7ccd4c 100644
--- a/solr/core/src/test/org/apache/solr/cloud/api/collections/SimpleCollectionCreateDeleteTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/api/collections/SimpleCollectionCreateDeleteTest.java
@@ -53,8 +53,7 @@ public class SimpleCollectionCreateDeleteTest extends AbstractFullDistribZkTestB
     }
     String collectionName = "SimpleCollectionCreateDeleteTest";
     CollectionAdminRequest.Create create = CollectionAdminRequest.createCollection(collectionName,1,1)
-            .setCreateNodeSet(overseerNode)
-            .setStateFormat(2);
+            .setCreateNodeSet(overseerNode);
 
     NamedList<Object> request = create.process(cloudClient).getResponse();
 
@@ -94,8 +93,7 @@ public class SimpleCollectionCreateDeleteTest extends AbstractFullDistribZkTestB
 
       // create collection again on a node other than the overseer leader
       create = CollectionAdminRequest.createCollection(collectionName,1,1)
-              .setCreateNodeSet(notOverseerNode)
-              .setStateFormat(2);
+              .setCreateNodeSet(notOverseerNode);
       request = create.process(cloudClient).getResponse();
       assertTrue("Collection creation should not have failed", request.get("success") != null);
     }
diff --git a/solr/core/src/test/org/apache/solr/cloud/api/collections/TestCollectionAPI.java b/solr/core/src/test/org/apache/solr/cloud/api/collections/TestCollectionAPI.java
index 0f67d1a..ebbf978 100644
--- a/solr/core/src/test/org/apache/solr/cloud/api/collections/TestCollectionAPI.java
+++ b/solr/core/src/test/org/apache/solr/cloud/api/collections/TestCollectionAPI.java
@@ -17,6 +17,7 @@
 package org.apache.solr.cloud.api.collections;
 
 import java.io.IOException;
+
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
@@ -26,8 +27,6 @@ import java.util.concurrent.atomic.AtomicLong;
 
 import com.google.common.collect.Lists;
 import org.apache.lucene.util.LuceneTestCase;
-import org.apache.solr.client.solrj.SolrClient;
-import org.apache.solr.client.solrj.SolrQuery;
 import org.apache.solr.client.solrj.SolrRequest;
 import org.apache.solr.client.solrj.SolrServerException;
 import org.apache.solr.client.solrj.impl.BaseHttpSolrClient;
@@ -38,7 +37,6 @@ import org.apache.solr.client.solrj.request.CollectionAdminRequest;
 import org.apache.solr.client.solrj.request.QueryRequest;
 import org.apache.solr.client.solrj.request.V2Request;
 import org.apache.solr.client.solrj.response.CollectionAdminResponse;
-import org.apache.solr.client.solrj.response.QueryResponse;
 import org.apache.solr.cloud.ZkTestServer;
 import org.apache.solr.common.SolrException;
 import org.apache.solr.common.SolrInputDocument;
@@ -896,31 +894,6 @@ public class TestCollectionAPI extends ReplicaPropertiesBase {
     }
   }
 
-  private void testClusterStateMigration() throws Exception {
-    try (CloudHttp2SolrClient client = createCloudClient(null)) {
-      client.connect();
-
-      CollectionAdminRequest.createCollection("testClusterStateMigration","_default",1,1).setStateFormat(1).process(client);
-
-      assertEquals(1, client.getZkStateReader().getClusterState().getCollection("testClusterStateMigration").getStateFormat());
-
-      for (int i = 0; i < 10; i++) {
-        SolrInputDocument doc = new SolrInputDocument();
-        doc.addField("id", "id_" + i);
-        client.add("testClusterStateMigration", doc);
-      }
-      client.commit("testClusterStateMigration");
-
-      CollectionAdminRequest.migrateCollectionFormat("testClusterStateMigration").process(client);
-
-
-      assertEquals(2, client.getZkStateReader().getClusterState().getCollection("testClusterStateMigration").getStateFormat());
-
-      QueryResponse response = client.query("testClusterStateMigration", new SolrQuery("*:*"));
-      assertEquals(10, response.getResults().getNumFound());
-    }
-  }
-  
   private void testCollectionCreationCollectionNameValidation() throws Exception {
     try (CloudHttp2SolrClient client = createCloudClient(null)) {
       ModifiableSolrParams params = new ModifiableSolrParams();
diff --git a/solr/core/src/test/org/apache/solr/cloud/overseer/TestClusterStateMutator.java b/solr/core/src/test/org/apache/solr/cloud/overseer/TestClusterStateMutator.java
index 67e3244..0be579c 100644
--- a/solr/core/src/test/org/apache/solr/cloud/overseer/TestClusterStateMutator.java
+++ b/solr/core/src/test/org/apache/solr/cloud/overseer/TestClusterStateMutator.java
@@ -39,7 +39,7 @@ public class TestClusterStateMutator extends SolrTestCaseJ4 {
   }
   
   public void testCreateCollection() throws Exception {
-    ClusterState clusterState = new ClusterState(-1, Collections.<String>emptySet(), Collections.<String, DocCollection>emptyMap());
+    ClusterState clusterState = new ClusterState(Collections.<String>emptySet(), Collections.<String, DocCollection>emptyMap());
     DistribStateManager mockStateManager = mock(DistribStateManager.class);
     SolrCloudManager dataProvider = mock(SolrCloudManager.class);
     when(dataProvider.getDistribStateManager()).thenReturn(mockStateManager);
@@ -55,7 +55,7 @@ public class TestClusterStateMutator extends SolrTestCaseJ4 {
     assertEquals(1, collection.getSlicesMap().size());
     assertEquals(1, collection.getMaxShardsPerNode());
 
-    ClusterState state = new ClusterState(-1, Collections.<String>emptySet(), Collections.singletonMap("xyz", collection));
+    ClusterState state = new ClusterState(Collections.<String>emptySet(), Collections.singletonMap("xyz", collection));
     message = new ZkNodeProps(Utils.makeMap(
         "name", "abc",
         "numShards", "2",
diff --git a/solr/core/src/test/org/apache/solr/cloud/overseer/ZkCollectionPropsCachingTest.java b/solr/core/src/test/org/apache/solr/cloud/overseer/ZkCollectionPropsCachingTest.java
index 6e56b8e..3fedc8a 100644
--- a/solr/core/src/test/org/apache/solr/cloud/overseer/ZkCollectionPropsCachingTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/overseer/ZkCollectionPropsCachingTest.java
@@ -48,11 +48,7 @@ public class ZkCollectionPropsCachingTest extends SolrCloudTestCase {
 
   @BeforeClass
   public static void setupClass() throws Exception {
-    Boolean useLegacyCloud = rarely();
-    log.info("Using legacyCloud?: {}", useLegacyCloud);
-
     configureCluster(2)
-        .withProperty(ZkStateReader.LEGACY_CLOUD, String.valueOf(useLegacyCloud))
         .addConfig("conf", configset("cloud-minimal"))
         .configure();
   }
diff --git a/solr/core/src/test/org/apache/solr/cloud/overseer/ZkStateReaderTest.java b/solr/core/src/test/org/apache/solr/cloud/overseer/ZkStateReaderTest.java
index 67c495b..b539d59 100644
--- a/solr/core/src/test/org/apache/solr/cloud/overseer/ZkStateReaderTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/overseer/ZkStateReaderTest.java
@@ -19,7 +19,6 @@ package org.apache.solr.cloud.overseer;
 import java.nio.file.Path;
 import java.util.Collections;
 import java.util.HashMap;
-import java.util.Map;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.lucene.util.IOUtils;
@@ -34,7 +33,6 @@ import org.apache.solr.common.cloud.DocRouter;
 import org.apache.solr.common.cloud.SolrZkClient;
 import org.apache.solr.common.cloud.ZkStateReader;
 import org.apache.solr.common.util.TimeSource;
-import org.apache.solr.common.util.Utils;
 import org.apache.solr.util.TimeOut;
 import org.junit.Ignore;
 
@@ -43,101 +41,6 @@ public class ZkStateReaderTest extends SolrTestCaseJ4 {
 
   private static final long TIMEOUT = 30;
 
-  /** Uses explicit refresh to ensure latest changes are visible. */
-  public void testStateFormatUpdateWithExplicitRefresh() throws Exception {
-    testStateFormatUpdate(true, true);
-  }
-
-  /** Uses explicit refresh to ensure latest changes are visible. */
-  public void testStateFormatUpdateWithExplicitRefreshLazy() throws Exception {
-    testStateFormatUpdate(true, false);
-  }
-
-  /** ZkStateReader should automatically pick up changes based on ZK watches. */
-  public void testStateFormatUpdateWithTimeDelay() throws Exception {
-    testStateFormatUpdate(false, true);
-  }
-
-  /** ZkStateReader should automatically pick up changes based on ZK watches. */
-  public void testStateFormatUpdateWithTimeDelayLazy() throws Exception {
-    testStateFormatUpdate(false, false);
-  }
-
-  public void testStateFormatUpdate(boolean explicitRefresh, boolean isInteresting) throws Exception {
-    Path zkDir = createTempDir("testStateFormatUpdate");
-
-    ZkTestServer server = new ZkTestServer(zkDir);
-
-    SolrZkClient zkClient = server.getZkClient();
-    ZkStateReader reader = null;
-
-    try {
-      server.run();
-
-      ZkController.createClusterZkNodes(zkClient);
-
-      reader = new ZkStateReader(zkClient);
-      reader.createClusterStateWatchersAndUpdate();
-      if (isInteresting) {
-        reader.registerCore("c1");
-      }
-
-      ZkStateWriter writer = new ZkStateWriter(reader, new Stats());
-
-      zkClient.mkdir(ZkStateReader.COLLECTIONS_ZKNODE + "/c1");
-
-      {
-        // create new collection with stateFormat = 1
-        DocCollection stateV1 = new DocCollection("c1", new HashMap<>(), new HashMap<>(), DocRouter.DEFAULT, 0, ZkStateReader.CLUSTER_STATE);
-        ZkWriteCommand c1 = new ZkWriteCommand("c1", stateV1);
-        writer.enqueueUpdate(reader.getClusterState(), Collections.singletonList(c1), null);
-        writer.writePendingUpdates(reader.getClusterState());
-
-        Map map = (Map) Utils.fromJSON(zkClient.getData("/clusterstate.json", null, null));
-        assertNotNull(map.get("c1"));
-        boolean exists = zkClient.exists(ZkStateReader.COLLECTIONS_ZKNODE + "/c1/state.json");
-        assertFalse(exists);
-
-       // if (explicitRefresh) {
-          //reader.forceUpdateCollection("c1");
-       // } else {
-          reader.waitForState("c1", TIMEOUT, TimeUnit.SECONDS, (n, c) -> c != null);
-       // }
-
-        DocCollection collection = reader.getClusterState().getCollection("c1");
-        assertEquals(1, collection.getStateFormat());
-      }
-
-
-      {
-        // Now update the collection to stateFormat = 2
-        DocCollection stateV2 = new DocCollection("c1", new HashMap<>(), new HashMap<>(), DocRouter.DEFAULT, 0, ZkStateReader.COLLECTIONS_ZKNODE + "/c1/state.json");
-        ZkWriteCommand c2 = new ZkWriteCommand("c1", stateV2);
-        writer.enqueueUpdate(reader.getClusterState(), Collections.singletonList(c2), null);
-        writer.writePendingUpdates(reader.getClusterState());
-
-        Map map = (Map) Utils.fromJSON(zkClient.getData("/clusterstate.json", null, null));
-        assertNull(map.get("c1"));
-        boolean exists = zkClient.exists(ZkStateReader.COLLECTIONS_ZKNODE + "/c1/state.json");
-        assertTrue(exists);
-
-     //   if (explicitRefresh) {
-    //      reader.forceUpdateCollection("c1");
-      //  } else {
-          reader.waitForState("c1", TIMEOUT, TimeUnit.SECONDS,
-              (n, c) -> c != null && c.getStateFormat() == 2);
-    //    }
-
-        DocCollection collection = reader.getClusterState().getCollection("c1");
-        assertEquals(2, collection.getStateFormat());
-      }
-    } finally {
-      IOUtils.close(reader, zkClient);
-      server.shutdown();
-
-    }
-  }
-
   public void testExternalCollectionWatchedNotWatched() throws Exception{
     Path zkDir = createTempDir("testExternalCollectionWatchedNotWatched");
     ZkTestServer server = new ZkTestServer(zkDir);
@@ -156,9 +59,9 @@ public class ZkStateReaderTest extends SolrTestCaseJ4 {
 
       zkClient.mkdir(ZkStateReader.COLLECTIONS_ZKNODE + "/c1");
 
-      // create new collection with stateFormat = 2
+      // create new collection
       ZkWriteCommand c1 = new ZkWriteCommand("c1",
-          new DocCollection("c1", new HashMap<>(), new HashMap<>(), DocRouter.DEFAULT, 0, ZkStateReader.COLLECTIONS_ZKNODE + "/c1/state.json"));
+          new DocCollection("c1", new HashMap<>(), new HashMap<>(), DocRouter.DEFAULT, 0));
       writer.enqueueUpdate(reader.getClusterState(), Collections.singletonList(c1), null);
       writer.writePendingUpdates(reader.getClusterState());
 
@@ -193,14 +96,14 @@ public class ZkStateReaderTest extends SolrTestCaseJ4 {
       zkClient.mkdir(ZkStateReader.COLLECTIONS_ZKNODE + "/c1");
 
       ZkStateWriter writer = new ZkStateWriter(reader, new Stats());
-      DocCollection state = new DocCollection("c1", new HashMap<>(), new HashMap<>(), DocRouter.DEFAULT, 0, ZkStateReader.CLUSTER_STATE + "/c1/state.json");
+      DocCollection state = new DocCollection("c1", new HashMap<>(), new HashMap<>(), DocRouter.DEFAULT, 0);
       ZkWriteCommand wc = new ZkWriteCommand("c1", state);
       writer.enqueueUpdate(reader.getClusterState(), Collections.singletonList(wc), null);
       writer.writePendingUpdates(reader.getClusterState());
       assertTrue(zkClient.exists(ZkStateReader.COLLECTIONS_ZKNODE + "/c1/state.json"));
       reader.waitForState("c1", 1, TimeUnit.SECONDS, (liveNodes, collectionState) -> collectionState != null);
 
-      state = new DocCollection("c1", new HashMap<>(), Collections.singletonMap("x", "y"), DocRouter.DEFAULT, 0, ZkStateReader.CLUSTER_STATE + "/c1/state.json");
+      state = new DocCollection("c1", new HashMap<>(), Collections.singletonMap("x", "y"), DocRouter.DEFAULT, 0);
       wc = new ZkWriteCommand("c1", state);
       writer.enqueueUpdate(reader.getClusterState(), Collections.singletonList(wc), null);
       writer.writePendingUpdates(reader.getClusterState());
@@ -249,8 +152,8 @@ public class ZkStateReaderTest extends SolrTestCaseJ4 {
       ZkStateWriter writer = new ZkStateWriter(reader, new Stats());
 
 
-      // create new collection with stateFormat = 2
-      DocCollection state = new DocCollection("c1", new HashMap<>(), new HashMap<>(), DocRouter.DEFAULT, 0, ZkStateReader.CLUSTER_STATE + "/c1/state.json");
+      // create new collection
+      DocCollection state = new DocCollection("c1", new HashMap<>(), new HashMap<>(), DocRouter.DEFAULT, 0);
       ZkWriteCommand wc = new ZkWriteCommand("c1", state);
       writer.enqueueUpdate(reader.getClusterState(), Collections.singletonList(wc), null);
       writer.writePendingUpdates(reader.getClusterState());
@@ -262,7 +165,6 @@ public class ZkStateReaderTest extends SolrTestCaseJ4 {
       ClusterState.CollectionRef ref = reader.getClusterState().getCollectionRef("c1");
       assertNotNull(ref);
       assertFalse(ref.isLazilyLoaded());
-      assertEquals(2, ref.get().getStateFormat());
     } finally {
       IOUtils.close(reader);
       server.shutdown();
diff --git a/solr/core/src/test/org/apache/solr/cloud/overseer/ZkStateWriterTest.java b/solr/core/src/test/org/apache/solr/cloud/overseer/ZkStateWriterTest.java
index d97a6b8..a3fe001 100644
--- a/solr/core/src/test/org/apache/solr/cloud/overseer/ZkStateWriterTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/overseer/ZkStateWriterTest.java
@@ -82,12 +82,9 @@ public class ZkStateWriterTest extends SolrTestCaseJ4 {
         zkClient.mkdir(ZkStateReader.COLLECTIONS_ZKNODE + "/c2");
         zkClient.mkdir(ZkStateReader.COLLECTIONS_ZKNODE + "/c3");
 
-        ZkWriteCommand c1 = new ZkWriteCommand("c1",
-            new DocCollection("c1", new HashMap<>(), new HashMap<>(), DocRouter.DEFAULT, 0, ZkStateReader.COLLECTIONS_ZKNODE + "/c1"));
-        ZkWriteCommand c2 = new ZkWriteCommand("c2",
-            new DocCollection("c2", new HashMap<>(), new HashMap<>(), DocRouter.DEFAULT, 0, ZkStateReader.COLLECTIONS_ZKNODE + "/c2"));
-        ZkWriteCommand c3 = new ZkWriteCommand("c3",
-            new DocCollection("c3", new HashMap<>(), new HashMap<>(), DocRouter.DEFAULT, 0, ZkStateReader.COLLECTIONS_ZKNODE + "/c3"));
+        ZkWriteCommand c1 = new ZkWriteCommand("c1", new DocCollection("c1", new HashMap<>(), new HashMap<>(), DocRouter.DEFAULT, 0));
+        ZkWriteCommand c2 = new ZkWriteCommand("c2", new DocCollection("c2", new HashMap<>(), new HashMap<>(), DocRouter.DEFAULT, 0));
+        ZkWriteCommand c3 = new ZkWriteCommand("c3", new DocCollection("c3", new HashMap<>(), new HashMap<>(), DocRouter.DEFAULT, 0));
         ZkStateWriter writer = new ZkStateWriter(reader, new Stats());
 
         // First write is flushed immediately
@@ -112,46 +109,6 @@ public class ZkStateWriterTest extends SolrTestCaseJ4 {
     }
   }
 
-  public void testSingleLegacyCollection() throws Exception {
-    Path zkDir = createTempDir("testSingleLegacyCollection");
-
-    ZkTestServer server = new ZkTestServer(zkDir);
-
-    SolrZkClient zkClient = null;
-
-    try {
-      server.run();
-
-      zkClient = new SolrZkClient(server.getZkAddress(), DEFAULT_ZK_SESSION_TIMEOUT).start();
-      ZkController.createClusterZkNodes(zkClient);
-
-      try (ZkStateReader reader = new ZkStateReader(zkClient)) {
-        reader.createClusterStateWatchersAndUpdate();
-
-        ZkStateWriter writer = new ZkStateWriter(reader, new Stats());
-
-        zkClient.mkdir(ZkStateReader.COLLECTIONS_ZKNODE + "/c1");
-
-        // create new collection with stateFormat = 1
-        ZkWriteCommand c1 = new ZkWriteCommand("c1",
-            new DocCollection("c1", new HashMap<String, Slice>(), new HashMap<String, Object>(), DocRouter.DEFAULT, 0, ZkStateReader.CLUSTER_STATE));
-
-        writer.enqueueUpdate(reader.getClusterState(), Collections.singletonList(c1), null);
-        writer.writePendingUpdates(reader.getClusterState());
-
-        Map map = (Map) Utils.fromJSON(zkClient.getData("/clusterstate.json", null, null));
-        assertNotNull(map.get("c1"));
-        boolean exists = zkClient.exists(ZkStateReader.COLLECTIONS_ZKNODE + "/c1/state.json");
-        assertFalse(exists);
-      }
-
-    } finally {
-      IOUtils.close(zkClient);
-      server.shutdown();
-
-    }
-  }
-
   public void testSingleExternalCollection() throws Exception {
     Path zkDir = createTempDir("testSingleExternalCollection");
 
@@ -172,29 +129,23 @@ public class ZkStateWriterTest extends SolrTestCaseJ4 {
 
         zkClient.mkdir(ZkStateReader.COLLECTIONS_ZKNODE + "/c1");
 
-        // create new collection with stateFormat = 2
+        // create new collection
         ZkWriteCommand c1 = new ZkWriteCommand("c1",
-            new DocCollection("c1", new HashMap<String, Slice>(), new HashMap<String, Object>(), DocRouter.DEFAULT, 0, ZkStateReader.COLLECTIONS_ZKNODE + "/c1/state.json"));
+            new DocCollection("c1", new HashMap<String, Slice>(), new HashMap<String, Object>(), DocRouter.DEFAULT, 0));
 
         writer.enqueueUpdate(reader.getClusterState(), Collections.singletonList(c1), null);
         writer.writePendingUpdates(reader.getClusterState());
 
-        Map map = (Map) Utils.fromJSON(zkClient.getData("/clusterstate.json", null, null));
-        assertNull(map.get("c1"));
-        map = (Map) Utils.fromJSON(zkClient.getData(ZkStateReader.COLLECTIONS_ZKNODE + "/c1/state.json", null, null));
+        Map map = (Map) Utils.fromJSON(zkClient.getData(ZkStateReader.COLLECTIONS_ZKNODE + "/c1/state.json", null, null));
         assertNotNull(map.get("c1"));
       }
-
     } finally {
       IOUtils.close(zkClient);
       server.shutdown();
-
     }
-
-
   }
 
-  public void testExternalModificationToSharedClusterState() throws Exception {
+  public void testExternalModification() throws Exception {
     Path zkDir = createTempDir("testExternalModification");
 
     ZkTestServer server = new ZkTestServer(zkDir);
@@ -215,89 +166,15 @@ public class ZkStateWriterTest extends SolrTestCaseJ4 {
         zkClient.mkdir(ZkStateReader.COLLECTIONS_ZKNODE + "/c1");
         zkClient.mkdir(ZkStateReader.COLLECTIONS_ZKNODE + "/c2");
 
-        // create collection 1 with stateFormat = 1
-        ZkWriteCommand c1 = new ZkWriteCommand("c1",
-            new DocCollection("c1", new HashMap<String, Slice>(), new HashMap<String, Object>(), DocRouter.DEFAULT, 0, ZkStateReader.CLUSTER_STATE));
-        writer.enqueueUpdate(reader.getClusterState(), Collections.singletonList(c1), null);
-        writer.writePendingUpdates(reader.getClusterState());
-
-
-        ClusterState clusterState = reader.getClusterState(); // keep a reference to the current cluster state object
-        assertTrue(clusterState.hasCollection("c1"));
-        assertFalse(clusterState.hasCollection("c2"));
-
-        // Simulate an external modification to /clusterstate.json
-        byte[] data = zkClient.getData("/clusterstate.json", null, null);
-        zkClient.setData("/clusterstate.json", data, true);
-
-        // enqueue another c1 so that ZkStateWriter has pending updates
-        writer.enqueueUpdate(clusterState, Collections.singletonList(c1), null);
-        assertTrue(writer.hasPendingUpdates());
-
-        // Will trigger flush
-        Thread.sleep(Overseer.STATE_UPDATE_DELAY + 100);
-        ZkWriteCommand c2 = new ZkWriteCommand("c2",
-            new DocCollection("c2", new HashMap<String, Slice>(), new HashMap<String, Object>(), DocRouter.DEFAULT, 0, ZkStateReader.getCollectionPath("c2")));
-
-        try {
-          writer.enqueueUpdate(clusterState, Collections.singletonList(c2), null); // we are sending in the old cluster state object
-          fail("Enqueue should not have succeeded");
-        } catch (KeeperException.BadVersionException bve) {
-          // expected
-        }
-
-        try {
-          writer.enqueueUpdate(reader.getClusterState(), Collections.singletonList(c2), null);
-          fail("enqueueUpdate after BadVersionException should not have succeeded");
-        } catch (IllegalStateException e) {
-          // expected
-        }
-
-        try {
-          writer.writePendingUpdates(reader.getClusterState());
-          fail("writePendingUpdates after BadVersionException should not have succeeded");
-        } catch (IllegalStateException e) {
-          // expected
-        }
-      }
-    } finally {
-      IOUtils.close(zkClient);
-      server.shutdown();
-    }
-
-  }
-
-  public void testExternalModificationToStateFormat2() throws Exception {
-    Path zkDir = createTempDir("testExternalModificationToStateFormat2");
-
-    ZkTestServer server = new ZkTestServer(zkDir);
-
-    SolrZkClient zkClient = null;
-
-    try {
-      server.run();
-
-      zkClient = new SolrZkClient(server.getZkAddress(), DEFAULT_ZK_SESSION_TIMEOUT).start();
-      ZkController.createClusterZkNodes(zkClient);
-
-      try (ZkStateReader reader = new ZkStateReader(zkClient)) {
-        reader.createClusterStateWatchersAndUpdate();
-
-        ZkStateWriter writer = new ZkStateWriter(reader, new Stats());
-
-        zkClient.mkdir(ZkStateReader.COLLECTIONS_ZKNODE + "/c1");
-        zkClient.mkdir(ZkStateReader.COLLECTIONS_ZKNODE + "/c2");
-
         ClusterState state = reader.getClusterState();
 
-        // create collection 2 with stateFormat = 2
+        // create collection 2
         ZkWriteCommand c2 = new ZkWriteCommand("c2",
-            new DocCollection("c2", new HashMap<String, Slice>(), new HashMap<String, Object>(), DocRouter.DEFAULT, 0, ZkStateReader.getCollectionPath("c2")));
+            new DocCollection("c2", new HashMap<String, Slice>(), new HashMap<String, Object>(), DocRouter.DEFAULT, 0));
         state = writer.enqueueUpdate(state, Collections.singletonList(c2), null);
         assertFalse(writer.hasPendingUpdates()); // first write is flushed immediately
 
-        int sharedClusterStateVersion = state.getZkClusterStateVersion();
-        int stateFormat2Version = state.getCollection("c2").getZNodeVersion();
+        int c2Version = state.getCollection("c2").getZNodeVersion();
 
         // Simulate an external modification to /collections/c2/state.json
         byte[] data = zkClient.getData(ZkStateReader.getCollectionPath("c2"), null, null);
@@ -306,8 +183,7 @@ public class ZkStateWriterTest extends SolrTestCaseJ4 {
         state = reader.getClusterState();
         log.info("Cluster state: {}", state);
         assertTrue(state.hasCollection("c2"));
-        assertEquals(sharedClusterStateVersion, (int) state.getZkClusterStateVersion());
-        assertEquals(stateFormat2Version + 1, state.getCollection("c2").getZNodeVersion());
+        assertEquals(c2Version + 1, state.getCollection("c2").getZNodeVersion());
 
         writer.enqueueUpdate(state, Collections.singletonList(c2), null);
         assertTrue(writer.hasPendingUpdates());
@@ -317,7 +193,7 @@ public class ZkStateWriterTest extends SolrTestCaseJ4 {
         // Will trigger flush
         Thread.sleep(Overseer.STATE_UPDATE_DELAY+100);
         ZkWriteCommand c1 = new ZkWriteCommand("c1",
-            new DocCollection("c1", new HashMap<String, Slice>(), new HashMap<String, Object>(), DocRouter.DEFAULT, 0, ZkStateReader.CLUSTER_STATE));
+            new DocCollection("c1", new HashMap<String, Slice>(), new HashMap<String, Object>(), DocRouter.DEFAULT, 0));
 
         try {
           writer.enqueueUpdate(state, Collections.singletonList(c1), null);
diff --git a/solr/core/src/test/org/apache/solr/core/snapshots/TestSolrCloudSnapshots.java b/solr/core/src/test/org/apache/solr/core/snapshots/TestSolrCloudSnapshots.java
index 0126636..0236991 100644
--- a/solr/core/src/test/org/apache/solr/core/snapshots/TestSolrCloudSnapshots.java
+++ b/solr/core/src/test/org/apache/solr/core/snapshots/TestSolrCloudSnapshots.java
@@ -246,7 +246,7 @@ public class TestSolrCloudSnapshots extends SolrCloudTestCase {
     CollectionAdminRequest.DeleteSnapshot deleteSnap = new CollectionAdminRequest.DeleteSnapshot(collectionName, commitName);
     deleteSnap.process(solrClient);
 
-    // Wait for a while so that the clusterstate.json updates are propagated to the client side.
+    // Wait for a while so that the cluster state updates are propagated to the client side.
     Thread.sleep(2000);
     collectionState = solrClient.getZkStateReader().getClusterState().getCollection(collectionName);
 
diff --git a/solr/core/src/test/org/apache/solr/handler/admin/TestCollectionAPIs.java b/solr/core/src/test/org/apache/solr/handler/admin/TestCollectionAPIs.java
index df75c3f..4fa6ff7 100644
--- a/solr/core/src/test/org/apache/solr/handler/admin/TestCollectionAPIs.java
+++ b/solr/core/src/test/org/apache/solr/handler/admin/TestCollectionAPIs.java
@@ -88,20 +88,20 @@ public class TestCollectionAPIs extends SolrTestCaseJ4 {
     //test a simple create collection call
     compareOutput(apiBag, "/collections", POST,
         "{create:{name:'newcoll', config:'schemaless', numShards:2, replicationFactor:2 }}", null,
-        "{name:newcoll, fromApi:'true', replicationFactor:'2', nrtReplicas:'2', collection.configName:schemaless, numShards:'2', stateFormat:'2', operation:create}");
+        "{name:newcoll, fromApi:'true', replicationFactor:'2', nrtReplicas:'2', collection.configName:schemaless, numShards:'2', operation:create}");
     
     compareOutput(apiBag, "/collections", POST,
         "{create:{name:'newcoll', config:'schemaless', numShards:2, nrtReplicas:2 }}", null,
-        "{name:newcoll, fromApi:'true', nrtReplicas:'2', replicationFactor:'2', collection.configName:schemaless, numShards:'2', stateFormat:'2', operation:create}");
+        "{name:newcoll, fromApi:'true', nrtReplicas:'2', replicationFactor:'2', collection.configName:schemaless, numShards:'2', operation:create}");
     
     compareOutput(apiBag, "/collections", POST,
         "{create:{name:'newcoll', config:'schemaless', numShards:2, nrtReplicas:2, tlogReplicas:2, pullReplicas:2 }}", null,
-        "{name:newcoll, fromApi:'true', nrtReplicas:'2', replicationFactor:'2', tlogReplicas:'2', pullReplicas:'2', collection.configName:schemaless, numShards:'2', stateFormat:'2', operation:create}");
+        "{name:newcoll, fromApi:'true', nrtReplicas:'2', replicationFactor:'2', tlogReplicas:'2', pullReplicas:'2', collection.configName:schemaless, numShards:'2', operation:create}");
 
     //test a create collection with custom properties
     compareOutput(apiBag, "/collections", POST,
         "{create:{name:'newcoll', config:'schemaless', numShards:2, replicationFactor:2, properties:{prop1:'prop1val', prop2: prop2val} }}", null,
-        "{name:newcoll, fromApi:'true', replicationFactor:'2', nrtReplicas:'2', collection.configName:schemaless, numShards:'2', stateFormat:'2', operation:create, property.prop1:prop1val, property.prop2:prop2val}");
+        "{name:newcoll, fromApi:'true', replicationFactor:'2', nrtReplicas:'2', collection.configName:schemaless, numShards:'2', operation:create, property.prop1:prop1val, property.prop2:prop2val}");
 
     // nocommit
 //    compareOutput(apiBag, "/collections", POST,
diff --git a/solr/core/src/test/org/apache/solr/handler/component/TestHttpShardHandlerFactory.java b/solr/core/src/test/org/apache/solr/handler/component/TestHttpShardHandlerFactory.java
index 5380a32..32c665e 100644
--- a/solr/core/src/test/org/apache/solr/handler/component/TestHttpShardHandlerFactory.java
+++ b/solr/core/src/test/org/apache/solr/handler/component/TestHttpShardHandlerFactory.java
@@ -142,7 +142,7 @@ public class TestHttpShardHandlerFactory extends SolrTestCaseJ4 {
         "1.2.3.4:9000_",
         "1.2.3.4:9001_solr-2",
     }));
-    ClusterState cs = new ClusterState(0, liveNodes, new HashMap<>());
+    ClusterState cs = new ClusterState(liveNodes, new HashMap<>());
     WhitelistHostChecker checker = new WhitelistHostChecker(null, true);
     Set<String> hostSet = checker.generateWhitelistFromLiveNodes(cs);
     assertThat(hostSet.size(), is(3));
diff --git a/solr/core/src/test/org/apache/solr/util/TestUtils.java b/solr/core/src/test/org/apache/solr/util/TestUtils.java
index beb0bf6..71ff9bc 100644
--- a/solr/core/src/test/org/apache/solr/util/TestUtils.java
+++ b/solr/core/src/test/org/apache/solr/util/TestUtils.java
@@ -316,7 +316,7 @@ public class TestUtils extends SolrTestCaseJ4 {
     assertEquals("b1", Utils.getObjectByPath(sink, true, "k1/k11/a1"));
 
     sink = new HashMap<>();
-    sink.put("legacyCloud", "false");
+    sink.put("autoAddReplicas", "false");
     assertTrue(Utils.mergeJson(sink, (Map<String, Object>) Utils.fromJSONString("collectionDefaults:{numShards:3 , nrtReplicas:2}")));
     assertEquals(3L, Utils.getObjectByPath(sink, true, ImmutableList.of(COLLECTION_DEF, NUM_SHARDS_PROP)));
     assertEquals(2L, Utils.getObjectByPath(sink, true, ImmutableList.of(COLLECTION_DEF, NRT_REPLICAS)));
diff --git a/solr/solr-ref-guide/src/cluster-node-management.adoc b/solr/solr-ref-guide/src/cluster-node-management.adoc
index e477802..4269ed0 100644
--- a/solr/solr-ref-guide/src/cluster-node-management.adoc
+++ b/solr/solr-ref-guide/src/cluster-node-management.adoc
@@ -131,7 +131,7 @@ Add, edit or delete a cluster-wide property.
 === CLUSTERPROP Parameters
 
 `name`::
-The name of the property. Supported properties names are `autoAddReplicas`, `legacyCloud`, `location`, `maxCoresPerNode`, `urlScheme` and `defaultShardPreferences`. Other properties can be set
+The name of the property. Supported properties names are `autoAddReplicas`, `location`, `maxCoresPerNode`, `urlScheme` and `defaultShardPreferences`. Other properties can be set
 (for example, if you need them for custom plugins) but they must begin with the prefix `ext.`. Unknown properties that don't begin with `ext.` will be rejected.
 
 `val`::
@@ -498,21 +498,4 @@ http://localhost:8983/solr/admin/collections?action=OVERSEERSTATUS
   ],
   "..."
  }
-----
-
-[[migratestateformat]]
-== MIGRATESTATEFORMAT: Migrate Cluster State
-
-A expert level utility API to move a collection from shared `clusterstate.json` ZooKeeper node (created with `stateFormat=1`, the default in all Solr releases prior to 5.0) to the per-collection `state.json` stored in ZooKeeper (created with `stateFormat=2`, the current default) seamlessly without any application down-time.
-
-`/admin/collections?action=MIGRATESTATEFORMAT&collection=<collection_name>`
-
-=== MIGRATESTATEFORMAT Parameters
-
-`collection`::
-The name of the collection to be migrated from `clusterstate.json` to its own `state.json` ZooKeeper node. This parameter is required.
-
-`async`::
-Request ID to track this action which will be <<collections-api.adoc#asynchronous-calls,processed asynchronously>>.
-
-This API is useful in migrating any collections created prior to Solr 5.0 to the more scalable cluster state format now used by default. If a collection was created in any Solr 5.x version or higher, then executing this command is not necessary.
+----
\ No newline at end of file
diff --git a/solr/solr-ref-guide/src/collection-management.adoc b/solr/solr-ref-guide/src/collection-management.adoc
index 981ad5f..6d2b2a9 100644
--- a/solr/solr-ref-guide/src/collection-management.adoc
+++ b/solr/solr-ref-guide/src/collection-management.adoc
@@ -903,7 +903,6 @@ http://localhost:8983/solr/admin/collections?action=COLSTATUS&collection=getting
         "QTime": 50
     },
     "gettingstarted": {
-        "stateFormat": 2,
         "znodeVersion": 16,
         "properties": {
             "autoAddReplicas": "false",
@@ -1049,7 +1048,6 @@ http://localhost:8983/solr/admin/collections?action=COLSTATUS&collection=getting
         "QTime": 26812
     },
     "gettingstarted": {
-        "stateFormat": 2,
         "znodeVersion": 33,
         "properties": {
             "autoAddReplicas": "false",
diff --git a/solr/solr-ref-guide/src/major-changes-in-solr-9.adoc b/solr/solr-ref-guide/src/major-changes-in-solr-9.adoc
index cdb0b86..1e7774a 100644
--- a/solr/solr-ref-guide/src/major-changes-in-solr-9.adoc
+++ b/solr/solr-ref-guide/src/major-changes-in-solr-9.adoc
@@ -105,8 +105,18 @@ _(raw; not yet edited)_
 
 * SOLR-11775: Return long value for facet count in Json Facet module irrespective of number of shards (hossman, Munendra S N)
 
+* SOLR-12823: Remove /clusterstate.json support, i.e. support for collections created with stateFormat=1 as well as support
+  for Collection API MIGRATESTATEFORMAT action. Also removes support for cluster property `legacyCloud` (as if always false now).
+
 === Upgrade Prerequisites in Solr 9
 
+* Upgrade all collections in stateFormat=1 to stateFormat=2 *before* upgrading to Solr 9, as Solr 9 does not support the
+older format and no longer supports migrating collections from the older format to the current format (previously known
+as stateFormat=2).
+Upgrade is to be done using Collection API MIGRATESTATEFORMAT action using a previous version of Solr.
+See for example https://lucene.apache.org/solr/guide/8_5/cluster-node-management.html#migratestateforma[Solr 8.5 Ref Guide].
+// Can't link directly to .adoc file, need to link to 8.something ref guide as MIGRATESTATEFORMAT no longer exists in 9.0.
+
 === Rolling Upgrades with Solr 9
 
 === Reindexing After Upgrades in Solr 9
diff --git a/solr/solr-ref-guide/src/rule-based-replica-placement.adoc b/solr/solr-ref-guide/src/rule-based-replica-placement.adoc
index 34f990c..b22383b 100644
--- a/solr/solr-ref-guide/src/rule-based-replica-placement.adoc
+++ b/solr/solr-ref-guide/src/rule-based-replica-placement.adoc
@@ -174,4 +174,4 @@ Rules are specified per collection during collection creation as request paramet
 snitch=class:EC2Snitch&rule=shard:*,replica:1,dc:dc1&rule=shard:*,replica:<2,dc:dc3
 ----
 
-These rules are persisted in `clusterstate.json` in ZooKeeper and are available throughout the lifetime of the collection. This enables the system to perform any future node allocation without direct user interaction. The rules added during collection creation can be modified later using the <<collection-management.adoc#modifycollection,MODIFYCOLLECTION>> API.
+These rules are persisted in the collection's `state.json` in ZooKeeper and are available throughout the lifetime of the collection. This enables the system to perform any future node allocation without direct user interaction. The rules added during collection creation can be modified later using the <<collection-management.adoc#modifycollection,MODIFYCOLLECTION>> API.
diff --git a/solr/solr-ref-guide/src/shard-management.adoc b/solr/solr-ref-guide/src/shard-management.adoc
index fa0712d..228d5f9 100644
--- a/solr/solr-ref-guide/src/shard-management.adoc
+++ b/solr/solr-ref-guide/src/shard-management.adoc
@@ -272,7 +272,7 @@ http://localhost:8983/solr/admin/collections?action=CREATESHARD&collection=anImp
 [[deleteshard]]
 == DELETESHARD: Delete a Shard
 
-Deleting a shard will unload all replicas of the shard, remove them from `clusterstate.json`, and (by default) delete the instanceDir and dataDir for each replica. It will only remove shards that are inactive, or which have no range given for custom sharding.
+Deleting a shard will unload all replicas of the shard, remove them from the collection's `state.json`, and (by default) delete the instanceDir and dataDir for each replica. It will only remove shards that are inactive, or which have no range given for custom sharding.
 
 `/admin/collections?action=DELETESHARD&shard=_shardID_&collection=_name_`
 
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/Policy.java b/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/Policy.java
index 56a4b74..83b1a20 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/Policy.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/Policy.java
@@ -552,7 +552,6 @@ public class Policy implements MapWriter {
     final SolrCloudManager cloudManager;
     final List<Row> matrix;
     final NodeStateProvider nodeStateProvider;
-    final int znodeVersion;
     Set<String> collections = new HashSet<>();
     final Policy policy;
     List<Clause> expandedClauses;
@@ -572,7 +571,6 @@ public class Policy implements MapWriter {
         ParWork.propegateInterrupt(e);
         log.trace("-- session created, can't obtain cluster state", e);
       }
-      this.znodeVersion = state != null ? state.getZNodeVersion() : -1;
       this.nodes = new ArrayList<>(cloudManager.getClusterStateProvider().getLiveNodes());
       this.cloudManager = cloudManager;
       for (String node : nodes) {
@@ -611,7 +609,7 @@ public class Policy implements MapWriter {
     }
 
     private Session(List<String> nodes, SolrCloudManager cloudManager,
-                   List<Row> matrix, List<Clause> expandedClauses, int znodeVersion,
+                   List<Row> matrix, List<Clause> expandedClauses,
                    NodeStateProvider nodeStateProvider, Policy policy, Transaction transaction) {
       this.transaction = transaction;
       this.policy = policy;
@@ -619,7 +617,6 @@ public class Policy implements MapWriter {
       this.cloudManager = cloudManager;
       this.matrix = matrix;
       this.expandedClauses = expandedClauses;
-      this.znodeVersion = znodeVersion;
       this.nodeStateProvider = nodeStateProvider;
       for (Row row : matrix) row.session = this;
     }
@@ -641,7 +638,7 @@ public class Policy implements MapWriter {
     }
 
     Session copy() {
-      return new Session(nodes, cloudManager, getMatrixCopy(), expandedClauses, znodeVersion, nodeStateProvider, policy, transaction);
+      return new Session(nodes, cloudManager, getMatrixCopy(), expandedClauses, nodeStateProvider, policy, transaction);
     }
 
     public Row getNode(String node) {
@@ -690,7 +687,6 @@ public class Policy implements MapWriter {
 
     @Override
     public void writeMap(EntryWriter ew) throws IOException {
-      ew.put("znodeVersion", znodeVersion);
       for (Row row : matrix) {
         ew.put(row.node, row);
       }
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/BaseCloudSolrClient.java b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/BaseCloudSolrClient.java
index 0ddd0b4..38ebc8f 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/BaseCloudSolrClient.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/BaseCloudSolrClient.java
@@ -241,7 +241,7 @@ public abstract class BaseCloudSolrClient extends SolrClient {
     this.requestRLTGenerator = new RequestReplicaListTransformerGenerator();
   }
 
-  /** Sets the cache ttl for DocCollection Objects cached  . This is only applicable for collections which are persisted outside of clusterstate.json
+  /** Sets the cache ttl for DocCollection Objects cached.
    * @param seconds ttl value in seconds
    */
   public void setCollectionCacheTTl(int seconds){
@@ -902,18 +902,16 @@ public abstract class BaseCloudSolrClient extends SolrClient {
           // TODO if we are creating it, we wouldn't find it?
         }
         int collVer = coll.getZNodeVersion();
-        if (coll.getStateFormat()>1) {
-          if(requestedCollections == null) requestedCollections = new ArrayList<>(requestedCollectionNames.size());
-          requestedCollections.add(coll);
+        if(requestedCollections == null) requestedCollections = new ArrayList<>(requestedCollectionNames.size());
+        requestedCollections.add(coll);
 
-          if (stateVerParamBuilder == null) {
-            stateVerParamBuilder = new StringBuilder();
-          } else {
-            stateVerParamBuilder.append("|"); // hopefully pipe is not an allowed char in a collection name
-          }
-
-          stateVerParamBuilder.append(coll.getName()).append(":").append(collVer);
+        if (stateVerParamBuilder == null) {
+          stateVerParamBuilder = new StringBuilder();
+        } else {
+          stateVerParamBuilder.append("|"); // hopefully pipe is not an allowed char in a collection name
         }
+
+        stateVerParamBuilder.append(coll.getName()).append(":").append(collVer);
       }
 
       if (stateVerParamBuilder != null) {
@@ -1323,8 +1321,7 @@ public abstract class BaseCloudSolrClient extends SolrClient {
         cacheEntry.setRetriedAt();//we retried and found that it is the same version
         cacheEntry.maybeStale = false;
       } else {
-        if (fetchedCol.getStateFormat() > 1)
-          collectionStateCache.put(collection, new ExpiringCachedDocCollection(fetchedCol, retryExpiryTime));
+        collectionStateCache.put(collection, new ExpiringCachedDocCollection(fetchedCol, retryExpiryTime));
       }
       return fetchedCol;
     }
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/BaseHttpClusterStateProvider.java b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/BaseHttpClusterStateProvider.java
index f8c9c02..10b7bcb 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/BaseHttpClusterStateProvider.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/BaseHttpClusterStateProvider.java
@@ -139,8 +139,7 @@ public abstract class BaseHttpClusterStateProvider implements ClusterStateProvid
     Set<String> liveNodes = new HashSet((List<String>)(cluster.get("live_nodes")));
     this.liveNodes = liveNodes;
     liveNodesTimestamp = System.nanoTime();
-    //TODO SOLR-11877 we don't know the znode path; CLUSTER_STATE is probably wrong leading to bad stateFormat
-    ClusterState cs = ClusterState.load(znodeVersion, collectionsMap, liveNodes, ZkStateReader.CLUSTER_STATE);
+    ClusterState cs = ClusterState.createFromCollectionMap(znodeVersion, collectionsMap, liveNodes);
     if (clusterProperties != null) {
       Map<String, Object> properties = (Map<String, Object>) cluster.get("properties");
       if (properties != null) {
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/request/CollectionAdminRequest.java b/solr/solrj/src/java/org/apache/solr/client/solrj/request/CollectionAdminRequest.java
index 24ab2da..d22acb4 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/request/CollectionAdminRequest.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/request/CollectionAdminRequest.java
@@ -447,7 +447,6 @@ public abstract class CollectionAdminRequest<T extends CollectionAdminResponse>
     protected Properties properties;
     protected Boolean autoAddReplicas;
     protected String alias;
-    protected Integer stateFormat;
     protected String[] rule , snitch;
     protected String withCollection;
 
@@ -486,7 +485,6 @@ public abstract class CollectionAdminRequest<T extends CollectionAdminResponse>
     public Create setPullReplicas(Integer pullReplicas) { this.pullReplicas = pullReplicas; return this;}
 
     public Create setReplicationFactor(Integer repl) { this.nrtReplicas = repl; return this; }
-    public Create setStateFormat(Integer stateFormat) { this.stateFormat = stateFormat; return this; }
     public Create setRule(String... s){ this.rule = s; return this; }
     public Create setSnitch(String... s){ this.snitch = s; return this; }
 
@@ -508,8 +506,6 @@ public abstract class CollectionAdminRequest<T extends CollectionAdminResponse>
     public Integer getNumTlogReplicas() {return tlogReplicas;}
     public Integer getNumPullReplicas() {return pullReplicas;}
 
-    public Integer getStateFormat() { return stateFormat; }
-
     /**
      * Provide the name of the shards to be created, separated by commas
      *
@@ -579,9 +575,6 @@ public abstract class CollectionAdminRequest<T extends CollectionAdminResponse>
       if (properties != null) {
         addProperties(params, properties);
       }
-      if (stateFormat != null) {
-        params.set(DocCollection.STATE_FORMAT, stateFormat);
-      }
       if (pullReplicas != null) {
         params.set(ZkStateReader.PULL_REPLICAS, pullReplicas);
       }
@@ -2798,35 +2791,6 @@ public abstract class CollectionAdminRequest<T extends CollectionAdminResponse>
       params.set("property", propertyName);
       return params;
     }
-
-
-  }
-
-  /**
-   * Returns a SolrRequest to migrate a collection state format
-   *
-   * This is an expert-level request, and should not generally be necessary.
-   */
-  public static MigrateClusterState migrateCollectionFormat(String collection) {
-    return new MigrateClusterState(collection);
-  }
-
-  // MIGRATECLUSTERSTATE request
-  public static class MigrateClusterState extends AsyncCollectionAdminRequest {
-
-    protected String collection;
-
-    private MigrateClusterState(String collection) {
-      super(CollectionAction.MIGRATESTATEFORMAT);
-      this.collection = checkNotNull(CoreAdminParams.COLLECTION, collection);
-    }
-
-    @Override
-    public SolrParams getParams() {
-      ModifiableSolrParams params = new ModifiableSolrParams(super.getParams());
-      params.set(CoreAdminParams.COLLECTION, collection);
-      return params;
-    }
   }
 
   /**
diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/ClusterState.java b/solr/solrj/src/java/org/apache/solr/common/cloud/ClusterState.java
index cc86d62..2a2cfe4 100644
--- a/solr/solrj/src/java/org/apache/solr/common/cloud/ClusterState.java
+++ b/solr/solrj/src/java/org/apache/solr/common/cloud/ClusterState.java
@@ -45,6 +45,12 @@ public class ClusterState implements JSONWriter.Writable {
   private final Map<String, CollectionRef> collectionStates, immutableCollectionStates;
   private Set<String> liveNodes;
 
+  // nocommit
+  public ClusterState(Set<String> liveNodes,
+      Map<String, DocCollection> collectionStates) {
+    this(-1, liveNodes, collectionStates);
+  }
+  
   /**
    * Use this constr when ClusterState is meant for consumption.
    */
@@ -224,31 +230,29 @@ public class ClusterState implements JSONWriter.Writable {
     return sb.toString();
   }
 
-  public static ClusterState load(Integer version, byte[] bytes, Set<String> liveNodes) {
-    return load(version, bytes, liveNodes, ZkStateReader.CLUSTER_STATE);
-  }
   /**
-   * Create ClusterState from json string that is typically stored in zookeeper.
+   * Create a ClusterState from Json.
    * 
    * @param version zk version of the clusterstate.json file (bytes)
-   * @param bytes clusterstate.json as a byte array
+   * @param bytes a byte array of a Json representation of a mapping from collection name to the Json representation of a
+   *              {@link DocCollection} as written by {@link #write(JSONWriter)}. It can represent
+   *              one or more collections.
    * @param liveNodes list of live nodes
    * @return the ClusterState
    */
-  public static ClusterState load(Integer version, byte[] bytes, Set<String> liveNodes, String znode) {
-    // System.out.println("######## ClusterState.load:" + version + "v " + (bytes==null ? null : new String(bytes)));
+  public static ClusterState createFromJson(int version, byte[] bytes, Set<String> liveNodes) {
     if (bytes == null || bytes.length == 0) {
       return new ClusterState(version, liveNodes, Collections.<String, DocCollection>emptyMap());
     }
     Map<String, Object> stateMap = (Map<String, Object>) Utils.fromJSON(bytes);
-    return load(version, stateMap, liveNodes, znode);
+    return createFromCollectionMap(version, stateMap, liveNodes);
   }
 
-  public static ClusterState load(Integer version, Map<String, Object> stateMap, Set<String> liveNodes, String znode) {
+  public static ClusterState createFromCollectionMap(int version, Map<String, Object> stateMap, Set<String> liveNodes) {
     Map<String,CollectionRef> collections = new LinkedHashMap<>(stateMap.size());
     for (Entry<String, Object> entry : stateMap.entrySet()) {
       String collectionName = entry.getKey();
-      DocCollection coll = collectionFromObjects(collectionName, (Map<String,Object>)entry.getValue(), version, znode);
+      DocCollection coll = collectionFromObjects(collectionName, (Map<String,Object>)entry.getValue(), version);
       collections.put(collectionName, new CollectionRef(coll));
     }
 
@@ -256,7 +260,7 @@ public class ClusterState implements JSONWriter.Writable {
   }
 
   // TODO move to static DocCollection.loadFromMap
-  private static DocCollection collectionFromObjects(String name, Map<String, Object> objs, Integer version, String znode) {
+  private static DocCollection collectionFromObjects(String name, Map<String, Object> objs, int version) {
     Map<String,Object> props;
     Map<String,Slice> slices;
 
@@ -283,20 +287,16 @@ public class ClusterState implements JSONWriter.Writable {
       router = DocRouter.getDocRouter((String) routerProps.get("name"));
     }
 
-    return new DocCollection(name, slices, props, router, version, znode);
+    return new DocCollection(name, slices, props, router, version);
   }
 
   @Override
   public void write(JSONWriter jsonWriter) {
     LinkedHashMap<String , DocCollection> map = new LinkedHashMap<>();
     for (Entry<String, CollectionRef> e : collectionStates.entrySet()) {
-      // using this class check to avoid fetching from ZK in case of lazily loaded collection
       if (e.getValue().getClass() == CollectionRef.class) {
-        // check if it is a lazily loaded collection outside of clusterstate.json
         DocCollection coll = e.getValue().get();
-        if (coll.getStateFormat() == 1) {
-          map.put(coll.getName(),coll);
-        }
+        map.put(coll.getName(),coll);
       }
     }
     jsonWriter.write(map);
diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/DocCollection.java b/solr/solrj/src/java/org/apache/solr/common/cloud/DocCollection.java
index cb0881a..686e74b 100644
--- a/solr/solrj/src/java/org/apache/solr/common/cloud/DocCollection.java
+++ b/solr/solrj/src/java/org/apache/solr/common/cloud/DocCollection.java
@@ -50,7 +50,6 @@ public class DocCollection extends ZkNodeProps implements Iterable<Slice> {
 
   public static final String DOC_ROUTER = "router";
   public static final String SHARDS = "shards";
-  public static final String STATE_FORMAT = "stateFormat";
   public static final String RULE = "rule";
   public static final String SNITCH = "snitch";
 
@@ -63,7 +62,6 @@ public class DocCollection extends ZkNodeProps implements Iterable<Slice> {
   private final Map<String, List<Replica>> nodeNameReplicas;
   private final Map<String, List<Replica>> nodeNameLeaderReplicas;
   private final DocRouter router;
-  private final String znode;
 
   private final Integer replicationFactor;
   private final Integer numNrtReplicas;
@@ -75,15 +73,16 @@ public class DocCollection extends ZkNodeProps implements Iterable<Slice> {
   private final Boolean readOnly;
 
   public DocCollection(String name, Map<String, Slice> slices, Map<String, Object> props, DocRouter router) {
-    this(name, slices, props, router, Integer.MAX_VALUE, ZkStateReader.CLUSTER_STATE);
+    this(name, slices, props, router, Integer.MAX_VALUE);
   }
 
   /**
    * @param name  The name of the collection
    * @param slices The logical shards of the collection.  This is used directly and a copy is not made.
    * @param props  The properties of the slice.  This is used directly and a copy is not made.
+   * @param zkVersion The version of the Collection node in Zookeeper (used for conditional updates).
    */
-  public DocCollection(String name, Map<String, Slice> slices, Map<String, Object> props, DocRouter router, int zkVersion, String znode) {
+  public DocCollection(String name, Map<String, Slice> slices, Map<String, Object> props, DocRouter router, int zkVersion) {
     super(props==null ? props = new HashMap<>() : props);
     this.znodeVersion = zkVersion == -1 ? 0 : zkVersion;
     this.name = name;
@@ -118,7 +117,6 @@ public class DocCollection extends ZkNodeProps implements Iterable<Slice> {
     }
     this.activeSlicesArr = activeSlices.values().toArray(new Slice[activeSlices.size()]);
     this.router = router;
-    this.znode = znode == null? ZkStateReader.CLUSTER_STATE : znode;
     assert name != null && slices != null;
   }
 
@@ -171,7 +169,7 @@ public class DocCollection extends ZkNodeProps implements Iterable<Slice> {
    * @return the resulting DocCollection
    */
   public DocCollection copyWithSlices(Map<String, Slice> slices){
-    return new DocCollection(getName(), slices, propMap, router, znodeVersion,znode);
+    return new DocCollection(getName(), slices, propMap, router, znodeVersion);
   }
 
   /**
@@ -269,10 +267,6 @@ public class DocCollection extends ZkNodeProps implements Iterable<Slice> {
     return maxShardsPerNode == 0 ? Integer.MAX_VALUE : maxShardsPerNode;
   }
 
-  public String getZNode(){
-    return znode;
-  }
-
 
   public DocRouter getRouter() {
     return router;
@@ -284,7 +278,7 @@ public class DocCollection extends ZkNodeProps implements Iterable<Slice> {
 
   @Override
   public String toString() {
-    return "DocCollection("+name+":" + znode + ":v=" + znodeVersion + ")=" + toJSONString(this);
+    return "DocCollection("+name+":" + ":v=" + znodeVersion + ")=" + toJSONString(this);
   }
 
   @Override
@@ -401,7 +395,7 @@ public class DocCollection extends ZkNodeProps implements Iterable<Slice> {
     if (that instanceof DocCollection == false)
       return false;
     DocCollection other = (DocCollection) that;
-    return super.equals(that) && Objects.equals(this.znode, other.znode) && this.znodeVersion == other.znodeVersion;
+    return super.equals(that) && Objects.equals(this.name, other.name) && this.znodeVersion == other.znodeVersion;
   }
 
   /**
diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/SolrZkClient.java b/solr/solrj/src/java/org/apache/solr/common/cloud/SolrZkClient.java
index dbf6a33..d3d23d8 100644
--- a/solr/solrj/src/java/org/apache/solr/common/cloud/SolrZkClient.java
+++ b/solr/solrj/src/java/org/apache/solr/common/cloud/SolrZkClient.java
@@ -812,14 +812,8 @@ public class SolrZkClient implements Closeable {
     string.append(dent).append(path).append(" (c=").append(children.size()).append(",v=" + (stat == null ? "?" : stat.getVersion()) + ")").append(NEWL);
     if (data != null) {
       String dataString = new String(data, StandardCharsets.UTF_8);
-      if ((stat != null && stat.getDataLength() < MAX_BYTES_FOR_ZK_LAYOUT_DATA_SHOW && dataString.split("\\r\\n|\\r|\\n").length < 12) || path.endsWith("state.json")) {
-        if (path.endsWith(".xml")) {
-          // this is the cluster state in xml format - lets pretty print
-          dataString = prettyPrint(path, dataString);
-        }
-
-        string.append(dent).append("DATA (" + (stat != null ? stat.getDataLength() : "?") + "b) :\n").append(dent).append("    ")
-                .append(dataString.replaceAll("\n", "\n" + dent + "    ")).append(NEWL);
+      if (!path.endsWith(".txt") && !path.endsWith(".xml")) {
+        string.append(dent).append("DATA:\n").append(dent).append("    ").append(dataString.replaceAll("\n", "\n" + dent + "    ")).append(NEWL);
       } else {
         string.append(dent).append("DATA (" + (stat != null ? stat.getDataLength() : "?") + "b) : ...supressed...").append(NEWL);
       }
diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java b/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java
index 7ddb17c..095524b 100644
--- a/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java
+++ b/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java
@@ -107,8 +107,13 @@ public class ZkStateReader implements SolrCloseable {
   public static final String COLLECTIONS_ZKNODE = "/collections";
   public static final String LIVE_NODES_ZKNODE = "/live_nodes";
   public static final String ALIASES = "/aliases.json";
-  public static final String CLUSTER_STATE = "/clusterstate.json";
   public static final String STATE_JSON = "/state.json";
+  /**
+   * This ZooKeeper file is no longer used starting with Solr 9 but keeping the name around to check if it
+   * is still present and non empty (in case of upgrade from previous Solr version). It used to contain collection
+   * state for all collections in the cluster.
+   */
+  public static final String UNSUPPORTED_CLUSTER_STATE = "/clusterstate.json";
   public static final String CLUSTER_PROPS = "/clusterprops.json";
   public static final String COLLECTION_PROPS_ZKNODE = "collectionprops.json";
   public static final String REJOIN_AT_HEAD_PROP = "rejoinAtHead";
@@ -135,7 +140,6 @@ public class ZkStateReader implements SolrCloseable {
   public static final String CONFIGS_ZKNODE = "/configs";
   public final static String CONFIGNAME_PROP = "configName";
 
-  public static final String LEGACY_CLOUD = "legacyCloud";
   public static final String SAMPLE_PERCENTAGE = "samplePercentage";
 
   public static final String CREATE_NODE_SET_EMPTY = "EMPTY";
@@ -156,9 +160,9 @@ public class ZkStateReader implements SolrCloseable {
   private final CloseTracker closeTracker;
 
   /**
-   * A view of the current state of all collections; combines all the different state sources into a single view.
+   * A view of the current state of all collections.
    */
-  protected volatile ClusterState clusterState = new ClusterState(Collections.emptySet(), Collections.emptyMap(), -1);
+  protected volatile ClusterState clusterState = new ClusterState(Collections.emptySet(), Collections.emptyMap());
 
   private final int GET_LEADER_RETRY_DEFAULT_TIMEOUT = Integer.parseInt(System.getProperty("zkReaderGetLeaderRetryTimeoutMs", "1000"));
 
@@ -168,12 +172,12 @@ public class ZkStateReader implements SolrCloseable {
   public static final String ELECTION_NODE = "election";
 
   /**
-   * Collections with format2 state.json, "interesting" and actively watched.
+   * "Interesting" and actively watched Collections.
    */
   private final ConcurrentHashMap<String, DocCollection> watchedCollectionStates = new ConcurrentHashMap<>();
 
   /**
-   * Collections with format2 state.json, not "interesting" and not actively watched.
+   * "Interesting" but not actively watched Collections.
    */
   private final ConcurrentHashMap<String, LazyCollectionRef> lazyCollectionStates = new ConcurrentHashMap<>();
 
@@ -183,7 +187,7 @@ public class ZkStateReader implements SolrCloseable {
   private final ConcurrentHashMap<String, VersionedCollectionProps> watchedCollectionProps = new ConcurrentHashMap<>();
 
   /**
-   * Collection properties being actively watched
+   * Watchers of Collection properties
    */
   private final ConcurrentHashMap<String, PropsWatcher> collectionPropsWatchers = new ConcurrentHashMap<>();
 
@@ -259,7 +263,6 @@ public class ZkStateReader implements SolrCloseable {
   }
 
   public static final Set<String> KNOWN_CLUSTER_PROPS = Set.of(
-      LEGACY_CLOUD,
       URL_SCHEME,
       AUTO_ADD_REPLICAS,
       CoreAdminParams.BACKUP_LOCATION,
@@ -462,7 +465,6 @@ public class ZkStateReader implements SolrCloseable {
       // on reconnect of SolrZkClient force refresh and re-add watches.
       loadClusterProperties();
       refreshLiveNodes(new LiveNodeWatcher());
-      refreshStateFormat2Collections();
       refreshCollectionList(new CollectionsChildWatcher());
 
       refreshAliases(aliasesManager);
@@ -553,13 +555,11 @@ public class ZkStateReader implements SolrCloseable {
 
     Set<String> liveNodes = this.liveNodes; // volatile read
 
-    // Legacy clusterstate is authoritative, for backwards compatibility.
-    // To move a collection's state to format2, first create the new state2 format node, then remove legacy entry.
     Map<String, ClusterState.CollectionRef> result = new LinkedHashMap<>();
 
-    // Add state format2 collections, but don't override legacy collection states.
+    // Add collections
     for (Map.Entry<String, DocCollection> entry : watchedCollectionStates.entrySet()) {
-      result.putIfAbsent(entry.getKey(), new ClusterState.CollectionRef(entry.getValue()));
+      result.put(entry.getKey(), new ClusterState.CollectionRef(entry.getValue()));
     }
 
     // Finally, add any lazy collections that aren't already accounted for.
@@ -594,9 +594,9 @@ public class ZkStateReader implements SolrCloseable {
   }
 
   /**
-   * Refresh state format2 collections.
+   * Refresh collections.
    */
-  private void refreshStateFormat2Collections() {
+  private void refreshCollections() {
     for (String coll : collectionWatches.keySet()) {
       new StateWatcher(coll).refreshAndWatch();
     }
@@ -606,17 +606,7 @@ public class ZkStateReader implements SolrCloseable {
   private final Object refreshCollectionListLock = new Object();
 
   /**
-   * Search for any lazy-loadable state format2 collections.
-   * <p>
-   * A stateFormat=1 collection which is not interesting to us can also
-   * be put into the {@link #lazyCollectionStates} map here. But that is okay
-   * because {@link #constructState(Set)} will give priority to collections in the
-   * shared collection state over this map.
-   * In fact this is a clever way to avoid doing a ZK exists check on
-   * the /collections/collection_name/state.json znode
-   * Such an exists check is done in {@link ClusterState#hasCollection(String)} and
-   * {@link ClusterState#getCollectionsMap()} methods
-   * have a safeguard against exposing wrong collection names to the users
+   * Search for any lazy-loadable collections.
    */
   private void refreshCollectionList(Watcher watcher) throws KeeperException, InterruptedException {
     synchronized (refreshCollectionListLock) {
@@ -1301,7 +1291,6 @@ public class ZkStateReader implements SolrCloseable {
     }
   }
 
-
   /**
    * Watches collection properties
    */
@@ -1485,8 +1474,7 @@ public class ZkStateReader implements SolrCloseable {
       try {
         Stat stat = new Stat();
         byte[] data = zkClient.getData(collectionPath, watcher, stat);
-        ClusterState state = ClusterState.load(stat.getVersion(), data,
-            Collections.<String>emptySet(), collectionPath);
+        ClusterState state = ClusterState.createFromJson(stat.getVersion(), data, Collections.emptySet());
         ClusterState.CollectionRef collectionRef = state.getCollectionStates().get(coll);
         return collectionRef == null ? null : collectionRef.get();
       } catch (KeeperException.NoNodeException e) {
diff --git a/solr/solrj/src/java/org/apache/solr/common/params/CollectionParams.java b/solr/solrj/src/java/org/apache/solr/common/params/CollectionParams.java
index 3e8ee37..8e8a027 100644
--- a/solr/solrj/src/java/org/apache/solr/common/params/CollectionParams.java
+++ b/solr/solrj/src/java/org/apache/solr/common/params/CollectionParams.java
@@ -77,7 +77,7 @@ public interface CollectionParams {
    * <p>Some of these actions are also used over the cluster state update queue at <code>/overseer/queue</code> and have a
    * different (though related) meaning there. These actions are:
    * {@link #CREATE}, {@link #DELETE}, {@link #CREATESHARD}, {@link #DELETESHARD}, {@link #ADDREPLICA}, {@link #ADDREPLICAPROP},
-   * {@link #DELETEREPLICAPROP}, {@link #BALANCESHARDUNIQUE}, {@link #MODIFYCOLLECTION} and {@link #MIGRATESTATEFORMAT}.</p>
+   * {@link #DELETEREPLICAPROP}, {@link #BALANCESHARDUNIQUE} and {@link #MODIFYCOLLECTION}.</p>
    */
   enum CollectionAction {
     CREATE(true, LockLevel.COLLECTION),
@@ -112,7 +112,6 @@ public interface CollectionParams {
     BALANCESHARDUNIQUE(true, LockLevel.SHARD),
     REBALANCELEADERS(true, LockLevel.COLLECTION),
     MODIFYCOLLECTION(true, LockLevel.COLLECTION),
-    MIGRATESTATEFORMAT(true, LockLevel.CLUSTER),
     BACKUP(true, LockLevel.COLLECTION),
     RESTORE(true, LockLevel.COLLECTION),
     CREATESNAPSHOT(true, LockLevel.COLLECTION),
diff --git a/solr/solrj/src/resources/apispec/cluster.Commands.json b/solr/solrj/src/resources/apispec/cluster.Commands.json
index 069cd1d..b72b67c 100644
--- a/solr/solrj/src/resources/apispec/cluster.Commands.json
+++ b/solr/solrj/src/resources/apispec/cluster.Commands.json
@@ -75,9 +75,6 @@
       "documentation": "https://lucene.apache.org/solr/guide/cluster-node-management.html#clusterprop",
       "description": "Add, edit, or delete a cluster-wide property.",
       "properties": {
-        "legacyCloud": {
-          "type": "boolean"
-        },
         "urlScheme": {
           "type": "string"
         },
diff --git a/solr/solrj/src/resources/apispec/collections.collection.shards.shard.delete.json b/solr/solrj/src/resources/apispec/collections.collection.shards.shard.delete.json
index ae7c36a..50c1e3b 100644
--- a/solr/solrj/src/resources/apispec/collections.collection.shards.shard.delete.json
+++ b/solr/solrj/src/resources/apispec/collections.collection.shards.shard.delete.json
@@ -1,6 +1,6 @@
 {
   "documentation": "https://lucene.apache.org/solr/guide/shard-management.html#deleteshard",
-  "description": "Deletes a shard by unloading all replicas of the shard, removing it from clusterstate.json, and by default deleting the instanceDir and dataDir. Only inactive shards or those which have no range for custom sharding will be deleted.",
+  "description": "Deletes a shard by unloading all replicas of the shard, removing it from the collection's state.json, and by default deleting the instanceDir and dataDir. Only inactive shards or those which have no range for custom sharding will be deleted.",
   "methods": [
     "DELETE"
   ],
diff --git a/solr/solrj/src/resources/apispec/collections.collection.shards.shard.replica.delete.json b/solr/solrj/src/resources/apispec/collections.collection.shards.shard.replica.delete.json
index 2d4691d..16efecb 100644
--- a/solr/solrj/src/resources/apispec/collections.collection.shards.shard.replica.delete.json
+++ b/solr/solrj/src/resources/apispec/collections.collection.shards.shard.replica.delete.json
@@ -1,6 +1,6 @@
 {
   "documentation": "https://lucene.apache.org/solr/guide/replica-management.html#deletereplica",
-  "description": "Deletes a replica. If the responding node is up, the core is unloaded, the entry removed from clusterstate.json, and the instanceDir and dataDir removed. If the node is not up, the entry for the replica is removed from clusterstate.json; if the nodes comes up later, the replica is automatically de-registered.",
+  "description": "Deletes a replica. If the responding node is up, the core is unloaded, the entry removed from the collection's state.json, and the instanceDir and dataDir removed. If the node is not up, the entry for the replica is removed from state.json; if the nodes comes up later, the replica is automatically de-registered.",
   "methods": [
     "DELETE"
   ],
diff --git a/solr/solrj/src/test/org/apache/solr/client/solrj/cloud/autoscaling/TestPolicy.java b/solr/solrj/src/test/org/apache/solr/client/solrj/cloud/autoscaling/TestPolicy.java
index eb75e4c..153228c 100644
--- a/solr/solrj/src/test/org/apache/solr/client/solrj/cloud/autoscaling/TestPolicy.java
+++ b/solr/solrj/src/test/org/apache/solr/client/solrj/cloud/autoscaling/TestPolicy.java
@@ -83,7 +83,6 @@ import static org.apache.solr.client.solrj.cloud.autoscaling.TestPolicy2.loadFro
 import static org.apache.solr.client.solrj.cloud.autoscaling.Variable.Type.CORES;
 import static org.apache.solr.client.solrj.cloud.autoscaling.Variable.Type.FREEDISK;
 import static org.apache.solr.client.solrj.cloud.autoscaling.Variable.Type.REPLICA;
-import static org.apache.solr.common.cloud.ZkStateReader.CLUSTER_STATE;
 import static org.apache.solr.common.params.CollectionParams.CollectionAction.ADDREPLICA;
 import static org.apache.solr.common.params.CollectionParams.CollectionAction.MOVEREPLICA;
 
@@ -143,9 +142,9 @@ public class TestPolicy extends SolrTestCaseJ4 {
 
 
   public void testWithCollection() {
-    ClusterState clusterState = ClusterState.load(1,
+    ClusterState clusterState = ClusterState.createFromCollectionMap(1,
         (Map) loadFromResource("testWithCollection.json"),
-        ImmutableSet.of("node1", "node2", "node3", "node4", "node5"), CLUSTER_STATE);
+        ImmutableSet.of("node1", "node2", "node3", "node4", "node5"));
     DelegatingClusterStateProvider clusterStateProvider = new DelegatingClusterStateProvider(null) {
       @Override
       public ClusterState getClusterState() throws IOException {
@@ -235,9 +234,9 @@ public class TestPolicy extends SolrTestCaseJ4 {
 
   public void testWithCollectionSuggestions() {
     ClusterState clusterState =
-        ClusterState.load(1,
+        ClusterState.createFromCollectionMap(1,
             (Map) loadFromResource("testWithCollectionSuggestions.json"),
-            ImmutableSet.of("node1", "node2", "node3", "node4", "node5"), CLUSTER_STATE);
+            ImmutableSet.of("node1", "node2", "node3", "node4", "node5"));
     DelegatingClusterStateProvider clusterStateProvider = new DelegatingClusterStateProvider(null) {
       @Override
       public ClusterState getClusterState() throws IOException {
@@ -326,11 +325,9 @@ public class TestPolicy extends SolrTestCaseJ4 {
   }
 
   public void testWithCollectionMoveVsAddSuggestions() throws IOException {
-    ClusterState clusterState = ClusterState.load(1,
+    ClusterState clusterState = ClusterState.createFromCollectionMap(1,
         (Map) loadFromResource("testWithCollectionMoveVsAddSuggestions.json"),
-        ImmutableSet.of("node1", "node2", "node3", "node4", "node5", "node6"),
-        CLUSTER_STATE
-    );
+        ImmutableSet.of("node1", "node2", "node3", "node4", "node5", "node6"));
     DelegatingClusterStateProvider clusterStateProvider = new DelegatingClusterStateProvider(null) {
       @Override
       public ClusterState getClusterState() {
@@ -434,9 +431,9 @@ public class TestPolicy extends SolrTestCaseJ4 {
   }
 
   public void testWithCollectionMoveReplica() {
-    ClusterState clusterState = ClusterState.load(1,
+    ClusterState clusterState = ClusterState.createFromCollectionMap(1,
         (Map) loadFromResource("testWithCollectionMoveReplica.json"),
-        ImmutableSet.of("node2", "node3", "node4", "node5"), CLUSTER_STATE);
+        ImmutableSet.of("node2", "node3", "node4", "node5"));
     DelegatingClusterStateProvider clusterStateProvider = new DelegatingClusterStateProvider(null) {
       @Override
       public ClusterState getClusterState() throws IOException {
@@ -810,7 +807,7 @@ public class TestPolicy extends SolrTestCaseJ4 {
         "}";
 
 
-    ClusterState clusterState = ClusterState.load(1, clusterStateStr.getBytes(UTF_8),
+    ClusterState clusterState = ClusterState.createFromJson(1, clusterStateStr.getBytes(UTF_8),
         ImmutableSet.of("node1", "node2", "node3", "node4", "node5"));
     DelegatingClusterStateProvider clusterStateProvider = new DelegatingClusterStateProvider(null) {
       @Override
@@ -1192,7 +1189,7 @@ public class TestPolicy extends SolrTestCaseJ4 {
         return new DelegatingClusterStateProvider(null) {
           @Override
           public ClusterState getClusterState() throws IOException {
-            return ClusterState.load(0, new HashMap<>(), getLiveNodes(), CLUSTER_STATE);
+            return ClusterState.createFromCollectionMap(0, new HashMap<>(), getLiveNodes());
           }
 
           @Override
@@ -2939,8 +2936,8 @@ public class TestPolicy extends SolrTestCaseJ4 {
           }
 
           @Override
-          public ClusterState getClusterState() throws IOException {
-            return ClusterState.load(0, clusterState, getLiveNodes(), ZkStateReader.getCollectionPath("c1"));
+          public ClusterState getClusterState() {
+            return ClusterState.createFromCollectionMap(0, clusterState, getLiveNodes());
           }
 
           @Override
@@ -2990,7 +2987,7 @@ public class TestPolicy extends SolrTestCaseJ4 {
     Map clusterStateMap = (Map) m.remove("clusterstate");
     Map replicaInfoMap = (Map) m.remove("replicaInfo");
 
-    ClusterState clusterState = ClusterState.load(1, clusterStateMap, ImmutableSet.of("node1", "node2"), CLUSTER_STATE);
+    ClusterState clusterState = ClusterState.createFromCollectionMap(1, clusterStateMap, ImmutableSet.of("node1", "node2"));
 
     List<String> shards = Arrays.asList("shard1", "shard2", "shard3");
 
diff --git a/solr/solrj/src/test/org/apache/solr/client/solrj/cloud/autoscaling/TestPolicy2.java b/solr/solrj/src/test/org/apache/solr/client/solrj/cloud/autoscaling/TestPolicy2.java
index ce3d8d4..b230773 100644
--- a/solr/solrj/src/test/org/apache/solr/client/solrj/cloud/autoscaling/TestPolicy2.java
+++ b/solr/solrj/src/test/org/apache/solr/client/solrj/cloud/autoscaling/TestPolicy2.java
@@ -177,7 +177,7 @@ public class TestPolicy2 extends SolrTestCaseJ4 {
   static SolrCloudManager createCloudManager(Map m, Map meta) {
     Map nodeVals = (Map) meta.get("nodeValues");
     List<Map> replicaVals = (List<Map>) meta.get("replicaValues");
-    ClusterState clusterState = ClusterState.load(0, m, Collections.emptySet(), null);
+    ClusterState clusterState = ClusterState.createFromCollectionMap(0, m, Collections.emptySet());
     Map<String, AtomicInteger> coreCount = new LinkedHashMap<>();
     Set<String> nodes = new HashSet<>(nodeVals.keySet());
     clusterState.getCollectionStates().forEach((s, collectionRef) -> collectionRef.get()
@@ -307,7 +307,7 @@ public class TestPolicy2 extends SolrTestCaseJ4 {
         if (clusterState == null) {
           Map map = (Map) getObjectByPath(m, false, "cluster/collections");
           if (map == null) map = new HashMap<>();
-          clusterState = ClusterState.load(0, map, liveNodes, "/clusterstate.json");
+          clusterState = ClusterState.createFromCollectionMap(0, map, liveNodes);
         }
 
         return new DelegatingClusterStateProvider(null) {
diff --git a/solr/solrj/src/test/org/apache/solr/client/solrj/impl/CloudSolrClientCacheTest.java b/solr/solrj/src/test/org/apache/solr/client/solrj/impl/CloudSolrClientCacheTest.java
index b647905..b8cb359 100644
--- a/solr/solrj/src/test/org/apache/solr/client/solrj/impl/CloudSolrClientCacheTest.java
+++ b/solr/solrj/src/test/org/apache/solr/client/solrj/impl/CloudSolrClientCacheTest.java
@@ -86,8 +86,7 @@ public class CloudSolrClientCacheTest extends SolrTestCaseJ4 {
         .withLBHttpSolrClient(mockLbclient)
         .build()) {
       livenodes.addAll(ImmutableSet.of("192.168.1.108:7574_solr", "192.168.1.108:8983_solr"));
-      ClusterState cs = ClusterState.load(1, coll1State.getBytes(UTF_8),
-          Collections.emptySet(), "/collections/gettingstarted/state.json");
+      ClusterState cs = ClusterState.createFromJson(1, coll1State.getBytes(UTF_8), Collections.emptySet());
       refs.put(collName, new Ref(collName));
       colls.put(collName, cs.getCollectionOrNull(collName));
       responses.put("request", o -> {
diff --git a/solr/solrj/src/test/org/apache/solr/common/cloud/TestCloudCollectionsListeners.java b/solr/solrj/src/test/org/apache/solr/common/cloud/TestCloudCollectionsListeners.java
index eccb382..939e05d 100644
--- a/solr/solrj/src/test/org/apache/solr/common/cloud/TestCloudCollectionsListeners.java
+++ b/solr/solrj/src/test/org/apache/solr/common/cloud/TestCloudCollectionsListeners.java
@@ -196,114 +196,4 @@ public class TestCloudCollectionsListeners extends SolrCloudTestCase {
 
     client.getZkStateReader().removeCloudCollectionsListener(watcher1);
   }
-
-  @Test
-  // commented out on: 24-Dec-2018   @BadApple(bugUrl="https://issues.apache.org/jira/browse/SOLR-12028") // added 17-Aug-2018
-  public void testWatchesWorkForBothStateFormats() throws Exception {
-    CloudHttp2SolrClient client = cluster.getSolrClient();
-
-    Map<Integer, Set<String>> oldResults = new HashMap<>();
-    Map<Integer, Set<String>> newResults = new HashMap<>();
-
-    CloudCollectionsListener watcher1 = (oldCollections, newCollections) -> {
-      log.info("New set of collections: {}, {}", oldCollections, newCollections);
-      oldResults.put(1, oldCollections);
-      newResults.put(1, newCollections);
-    };
-    client.getZkStateReader().registerCloudCollectionsListener(watcher1);
-    CloudCollectionsListener watcher2 = (oldCollections, newCollections) -> {
-      log.info("New set of collections: {}, {}", oldCollections, newCollections);
-      oldResults.put(2, oldCollections);
-      newResults.put(2, newCollections);
-    };
-    client.getZkStateReader().registerCloudCollectionsListener(watcher2);
-
-    assertEquals("CloudCollectionsListener has old collections with size > 0 after registration", 0, oldResults.get(1).size());
-    assertEquals("CloudCollectionsListener has old collections with size > 0 after registration", 0, oldResults.get(2).size());
-    assertEquals("CloudCollectionsListener has new collections with size > 0 after registration", 0, newResults.get(1).size());
-    assertEquals("CloudCollectionsListener has new collections with size > 0 after registration", 0, newResults.get(2).size());
-
-    // Creating old state format collection
-
-    CollectionAdminRequest.createCollection("testcollection1", "config", 4, 1)
-        .setStateFormat(1)
-        .processAndWait(client, MAX_WAIT_TIMEOUT);
-    cluster.waitForActiveCollection("testcollection1", 4, 4);
-
-    assertEquals("CloudCollectionsListener has old collections with size > 0 after collection created with old stateFormat", 0, oldResults.get(1).size());
-    assertEquals("CloudCollectionsListener has old collections with size > 0 after collection created with old stateFormat", 0, oldResults.get(2).size());
-    assertEquals("CloudCollectionsListener not updated with created collection with old stateFormat", 1, newResults.get(1).size());
-    assertTrue("CloudCollectionsListener not updated with created collection with old stateFormat", newResults.get(1).contains("testcollection1"));
-    assertEquals("CloudCollectionsListener not updated with created collection with old stateFormat", 1, newResults.get(2).size());
-    assertTrue("CloudCollectionsListener not updated with created collection with old stateFormat", newResults.get(2).contains("testcollection1"));
-
-    // Creating new state format collection
-
-    CollectionAdminRequest.createCollection("testcollection2", "config", 4, 1)
-        .processAndWait(client, MAX_WAIT_TIMEOUT);
-    cluster.waitForActiveCollection("testcollection2", 4, 4);
-
-    assertEquals("CloudCollectionsListener has incorrect old collections after collection created with new stateFormat", 1, oldResults.get(1).size());
-    assertEquals("CloudCollectionsListener has incorrect old collections after collection created with new stateFormat", 1, oldResults.get(2).size());
-    assertEquals("CloudCollectionsListener not updated with created collection with new stateFormat", 2, newResults.get(1).size());
-    assertTrue("CloudCollectionsListener not updated with created collection with new stateFormat", newResults.get(1).contains("testcollection2"));
-    assertEquals("CloudCollectionsListener not updated with created collection with new stateFormat", 2, newResults.get(2).size());
-    assertTrue("CloudCollectionsListener not updated with created collection with new stateFormat", newResults.get(2).contains("testcollection2"));
-
-    client.getZkStateReader().removeCloudCollectionsListener(watcher2);
-
-    // Creating old state format collection
-
-    CollectionAdminRequest.createCollection("testcollection3", "config", 4, 1)
-        .setStateFormat(1)
-        .processAndWait(client, MAX_WAIT_TIMEOUT);
-    cluster.waitForActiveCollection("testcollection3", 4, 4);
-
-    assertEquals("CloudCollectionsListener has incorrect old collections after collection created with old stateFormat", 2, oldResults.get(1).size());
-    assertEquals("CloudCollectionsListener updated after removal", 1, oldResults.get(2).size());
-    assertEquals("CloudCollectionsListener not updated with created collection with old stateFormat", 3, newResults.get(1).size());
-    assertTrue("CloudCollectionsListener not updated with created collection with old stateFormat", newResults.get(1).contains("testcollection3"));
-    assertEquals("CloudCollectionsListener updated after removal", 2, newResults.get(2).size());
-    assertFalse("CloudCollectionsListener updated after removal", newResults.get(2).contains("testcollection3"));
-
-    // Adding back listener
-    client.getZkStateReader().registerCloudCollectionsListener(watcher2);
-
-    assertEquals("CloudCollectionsListener has old collections after registration", 0, oldResults.get(2).size());
-    assertEquals("CloudCollectionsListener doesn't have all collections after registration", 3, newResults.get(2).size());
-
-    // Deleting old state format collection
-
-    CollectionAdminRequest.deleteCollection("testcollection1").processAndWait(client, MAX_WAIT_TIMEOUT);
-
-    assertEquals("CloudCollectionsListener doesn't have all old collections after collection removal", 3, oldResults.get(1).size());
-    assertEquals("CloudCollectionsListener doesn't have all old collections after collection removal", 3, oldResults.get(2).size());
-    assertEquals("CloudCollectionsListener doesn't have correct new collections after collection removal", 2, newResults.get(1).size());
-    assertEquals("CloudCollectionsListener doesn't have correct new collections after collection removal", 2, newResults.get(2).size());
-    assertFalse("CloudCollectionsListener not updated with deleted collection with old stateFormat", newResults.get(1).contains("testcollection1"));
-    assertFalse("CloudCollectionsListener not updated with deleted collection with old stateFormat", newResults.get(2).contains("testcollection1"));
-
-    CollectionAdminRequest.deleteCollection("testcollection2").processAndWait(client, MAX_WAIT_TIMEOUT);
-
-    assertEquals("CloudCollectionsListener doesn't have all old collections after collection removal", 2, oldResults.get(1).size());
-    assertEquals("CloudCollectionsListener doesn't have all old collections after collection removal", 2, oldResults.get(2).size());
-    assertEquals("CloudCollectionsListener doesn't have correct new collections after collection removal", 1, newResults.get(1).size());
-    assertEquals("CloudCollectionsListener doesn't have correct new collections after collection removal", 1, newResults.get(2).size());
-    assertFalse("CloudCollectionsListener not updated with deleted collection with new stateFormat", newResults.get(1).contains("testcollection2"));
-    assertFalse("CloudCollectionsListener not updated with deleted collection with new stateFormat", newResults.get(2).contains("testcollection2"));
-
-    client.getZkStateReader().removeCloudCollectionsListener(watcher1);
-
-    CollectionAdminRequest.deleteCollection("testcollection3").processAndWait(client, MAX_WAIT_TIMEOUT);
-
-    assertEquals("CloudCollectionsListener updated after removal", 2, oldResults.get(1).size());
-    assertEquals("CloudCollectionsListener doesn't have all old collections after collection removal", 1, oldResults.get(2).size());
-    assertEquals("CloudCollectionsListener updated after removal", 1, newResults.get(1).size());
-    assertEquals("CloudCollectionsListener doesn't have correct new collections after collection removal", 0, newResults.get(2).size());
-    assertTrue("CloudCollectionsListener updated after removal", newResults.get(1).contains("testcollection3"));
-    assertFalse("CloudCollectionsListener not updated with deleted collection with old stateFormat", newResults.get(2).contains("testcollection3"));
-
-    client.getZkStateReader().removeCloudCollectionsListener(watcher2);
-  }
-
 }
diff --git a/solr/solrj/src/test/org/apache/solr/common/cloud/TestCollectionStateWatchers.java b/solr/solrj/src/test/org/apache/solr/common/cloud/TestCollectionStateWatchers.java
index 1ef595c..373e3f0 100644
--- a/solr/solrj/src/test/org/apache/solr/common/cloud/TestCollectionStateWatchers.java
+++ b/solr/solrj/src/test/org/apache/solr/common/cloud/TestCollectionStateWatchers.java
@@ -303,24 +303,4 @@ public class TestCollectionStateWatchers extends SolrCloudTestCase {
     await("CollectionStateWatcher should be removed").atMost(MAX_WAIT_TIMEOUT, TimeUnit.SECONDS)
         .until(() -> client.getZkStateReader().getStateWatchers("test_collection").size() == 0);
   }
-
-  @Test
-  public void testWatchesWorkForStateFormat1() throws Exception {
-
-    final CloudHttp2SolrClient client = cluster.getSolrClient();
-
-    Future<Boolean> future = waitInBackground("stateformat1", MAX_WAIT_TIMEOUT, TimeUnit.SECONDS,
-                                              (n, c) -> DocCollection.isFullyActive(n, c, 1, 1));
-
-    CollectionAdminRequest.createCollection("stateformat1", "config", 1, 1).setStateFormat(1)
-      .processAndWait(client, MAX_WAIT_TIMEOUT);
-    assertTrue("CollectionStateWatcher not notified of stateformat=1 collection creation",
-               future.get());
-
-    Future<Boolean> migrated = waitInBackground("stateformat1", MAX_WAIT_TIMEOUT, TimeUnit.SECONDS,
-                                                (n, c) -> c != null && c.getStateFormat() == 2);
-
-    CollectionAdminRequest.migrateCollectionFormat("stateformat1").processAndWait(client, MAX_WAIT_TIMEOUT);
-    assertTrue("CollectionStateWatcher did not persist over state format migration", migrated.get());
-  }
 }
diff --git a/solr/solrj/src/test/org/apache/solr/common/cloud/TestDocCollectionWatcher.java b/solr/solrj/src/test/org/apache/solr/common/cloud/TestDocCollectionWatcher.java
index 5b4e6c7..f6cd999 100644
--- a/solr/solrj/src/test/org/apache/solr/common/cloud/TestDocCollectionWatcher.java
+++ b/solr/solrj/src/test/org/apache/solr/common/cloud/TestDocCollectionWatcher.java
@@ -234,27 +234,4 @@ public class TestDocCollectionWatcher extends SolrCloudTestCase {
 
     assertTrue("DocCollectionWatcher not notified of delete call", future.get());
   }
-  
-  @Test
-  public void testWatchesWorkForStateFormat1() throws Exception {
-
-    final CloudHttp2SolrClient client = cluster.getSolrClient();
-
-    Future<Boolean> future = waitInBackground("stateformat1", MAX_WAIT_TIMEOUT, TimeUnit.SECONDS,
-                                              (c) -> (null != c) );
-
-    CollectionAdminRequest.createCollection("stateformat1", "config", 1, 1).setStateFormat(1)
-      .processAndWait(client, MAX_WAIT_TIMEOUT);
-    client.waitForState("stateformat1", MAX_WAIT_TIMEOUT, TimeUnit.SECONDS,
-                         (n, c) -> DocCollection.isFullyActive(n, c, 1, 1));
-    
-    assertTrue("DocCollectionWatcher not notified of stateformat=1 collection creation",
-               future.get());
-
-    Future<Boolean> migrated = waitInBackground("stateformat1", MAX_WAIT_TIMEOUT, TimeUnit.SECONDS,
-                                                (c) -> c != null && c.getStateFormat() == 2);
-
-    CollectionAdminRequest.migrateCollectionFormat("stateformat1").processAndWait(client, MAX_WAIT_TIMEOUT);
-    assertTrue("DocCollectionWatcher did not persist over state format migration", migrated.get());
-  }
 }
diff --git a/solr/test-framework/src/java/org/apache/solr/cloud/AbstractFullDistribZkTestBase.java b/solr/test-framework/src/java/org/apache/solr/cloud/AbstractFullDistribZkTestBase.java
index 73cacf0..372e87c 100644
--- a/solr/test-framework/src/java/org/apache/solr/cloud/AbstractFullDistribZkTestBase.java
+++ b/solr/test-framework/src/java/org/apache/solr/cloud/AbstractFullDistribZkTestBase.java
@@ -422,16 +422,6 @@ public abstract class AbstractFullDistribZkTestBase extends AbstractDistribZkTes
     }
   }
 
-  protected String defaultStateFormat = String.valueOf( 1 + random().nextInt(2));
-
-  protected String getStateFormat()  {
-    String stateFormat = System.getProperty("tests.solr.stateFormat", null);
-    if (stateFormat != null)  {
-      defaultStateFormat = stateFormat;
-    }
-    return defaultStateFormat; // random
-  }
-
   protected List<JettySolrRunner> createJettys(int numJettys) throws Exception {
     List<JettySolrRunner> jettys = Collections.synchronizedList(new ArrayList<>());
     List<SolrClient> clients = Collections.synchronizedList(new ArrayList<>());
@@ -447,7 +437,6 @@ public abstract class AbstractFullDistribZkTestBase extends AbstractDistribZkTes
     // jetty instances are started)
     assertEquals(0, CollectionAdminRequest
             .createCollection(DEFAULT_COLLECTION, "_default", sliceCount, 1) // not real rep factor!
-            .setStateFormat(Integer.parseInt(getStateFormat()))
             .setCreateNodeSet("") // empty node set prevents creation of cores
             .process(cloudClient).getStatus());
 
@@ -1791,10 +1780,6 @@ public abstract class AbstractFullDistribZkTestBase extends AbstractDistribZkTes
       collectionInfos.put(collectionName, list);
     }
     params.set("name", collectionName);
-    if ("1".equals(getStateFormat()) ) {
-      log.info("Creating collection with stateFormat=1: {}", collectionName);
-      params.set(DocCollection.STATE_FORMAT, "1");
-    }
     SolrRequest request = new QueryRequest(params);
     request.setPath("/admin/collections");
 
diff --git a/solr/test-framework/src/java/org/apache/solr/cloud/ZkTestServer.java b/solr/test-framework/src/java/org/apache/solr/cloud/ZkTestServer.java
index 78bf83a..16cf2ea 100644
--- a/solr/test-framework/src/java/org/apache/solr/cloud/ZkTestServer.java
+++ b/solr/test-framework/src/java/org/apache/solr/cloud/ZkTestServer.java
@@ -51,6 +51,8 @@ import org.apache.solr.common.SolrException;
 import org.apache.solr.common.cloud.SolrZkClient;
 import org.apache.solr.common.cloud.ZkStateReader;
 import org.apache.solr.common.util.CloseTracker;
+import org.apache.solr.common.cloud.ZkNodeProps;
+import org.apache.solr.common.util.IOUtils;
 import org.apache.solr.common.util.ObjectReleaseTracker;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
@@ -842,7 +844,7 @@ public class ZkTestServer implements Closeable {
   // static to share with distrib test
   public void buildZooKeeper(File solrhome, String config, String schema) throws Exception {
     // this workaround is acceptable until we remove legacyCloud because we just init a single core here
-    String defaultClusterProps = "{\"" + ZkStateReader.LEGACY_CLOUD + "\":\"false\"}";
+    String defaultClusterProps = "{}";
     chRootClient.makePath("/solr" + ZkStateReader.CLUSTER_PROPS, defaultClusterProps.getBytes(StandardCharsets.UTF_8),
             CreateMode.PERSISTENT, false);
   }