You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ab...@apache.org on 2017/12/14 21:31:19 UTC

[09/11] lucene-solr:branch_7x: SOLR-11285: Simulation framework for autoscaling.

SOLR-11285: Simulation framework for autoscaling.


Project: http://git-wip-us.apache.org/repos/asf/lucene-solr/repo
Commit: http://git-wip-us.apache.org/repos/asf/lucene-solr/commit/b1ce5e20
Tree: http://git-wip-us.apache.org/repos/asf/lucene-solr/tree/b1ce5e20
Diff: http://git-wip-us.apache.org/repos/asf/lucene-solr/diff/b1ce5e20

Branch: refs/heads/branch_7x
Commit: b1ce5e2085ad6dda1a5adf5d30c4822d0fe4ce59
Parents: 99f089b
Author: Andrzej Bialecki <ab...@apache.org>
Authored: Thu Dec 14 12:13:05 2017 +0100
Committer: Andrzej Bialecki <ab...@apache.org>
Committed: Thu Dec 14 22:30:49 2017 +0100

----------------------------------------------------------------------
 solr/CHANGES.txt                                |    4 +
 .../org/apache/solr/cloud/ActionThrottle.java   |   14 +-
 .../org/apache/solr/cloud/AddReplicaCmd.java    |  149 +-
 .../src/java/org/apache/solr/cloud/Assign.java  |   24 +-
 .../solr/cloud/CloudConfigSetService.java       |    2 +-
 .../java/org/apache/solr/cloud/CloudUtil.java   |    7 +
 .../apache/solr/cloud/CreateCollectionCmd.java  |  244 ++--
 .../org/apache/solr/cloud/CreateShardCmd.java   |  102 +-
 .../apache/solr/cloud/DeleteCollectionCmd.java  |    9 +-
 .../org/apache/solr/cloud/DeleteShardCmd.java   |    9 +-
 .../java/org/apache/solr/cloud/MigrateCmd.java  |    9 +-
 .../org/apache/solr/cloud/MoveReplicaCmd.java   |    9 +-
 .../java/org/apache/solr/cloud/Overseer.java    |   16 +-
 .../cloud/OverseerCollectionMessageHandler.java |   46 +-
 .../java/org/apache/solr/cloud/RestoreCmd.java  |    2 +-
 .../org/apache/solr/cloud/SplitShardCmd.java    |  248 ++--
 .../solr/cloud/autoscaling/AutoScaling.java     |   14 +-
 .../cloud/autoscaling/AutoScalingHandler.java   |   70 +-
 .../cloud/autoscaling/ComputePlanAction.java    |   24 +-
 .../cloud/autoscaling/ExecutePlanAction.java    |   38 +-
 .../cloud/autoscaling/HttpTriggerListener.java  |    6 +-
 .../cloud/autoscaling/NodeAddedTrigger.java     |   15 +-
 .../solr/cloud/autoscaling/NodeLostTrigger.java |   14 +-
 .../autoscaling/OverseerTriggerThread.java      |   20 +-
 .../cloud/autoscaling/ScheduledTriggers.java    |   53 +-
 .../cloud/autoscaling/SearchRateTrigger.java    |   10 +-
 .../cloud/autoscaling/SystemLogListener.java    |    6 +-
 .../cloud/autoscaling/TriggerEventQueue.java    |    4 +-
 .../cloud/autoscaling/TriggerListenerBase.java  |    6 +-
 .../solr/cloud/overseer/CollectionMutator.java  |    8 +-
 .../solr/cloud/overseer/ReplicaMutator.java     |    2 +-
 .../solr/cloud/overseer/SliceMutator.java       |    2 +-
 .../org/apache/solr/core/CoreContainer.java     |    6 +-
 .../solr/handler/CdcrReplicatorManager.java     |   11 +-
 .../solr/handler/admin/CollectionsHandler.java  |    2 +-
 .../org/apache/solr/schema/SchemaManager.java   |    3 +-
 .../org/apache/solr/servlet/HttpSolrCall.java   |    5 +-
 .../processor/DistributedUpdateProcessor.java   |    3 +-
 .../src/java/org/apache/solr/util/IdUtils.java  |    1 +
 .../src/java/org/apache/solr/util/TimeOut.java  |   18 +-
 .../java/org/apache/solr/util/TimeSource.java   |   57 -
 .../solr/util/xslt/TransformerProvider.java     |    3 +-
 .../apache/solr/cloud/ActionThrottleTest.java   |   12 +-
 .../org/apache/solr/cloud/AddReplicaTest.java   |    2 +-
 .../test/org/apache/solr/cloud/AssignTest.java  |   10 +-
 ...MonkeyNothingIsSafeWithPullReplicasTest.java |    3 +-
 ...aosMonkeySafeLeaderWithPullReplicasTest.java |    3 +-
 .../cloud/CollectionsAPIDistributedZkTest.java  |    5 +-
 ...ConcurrentDeleteAndCreateCollectionTest.java |    3 +-
 ...DistribDocExpirationUpdateProcessorTest.java |    3 +-
 .../cloud/LeaderFailureAfterFreshStartTest.java |    3 +-
 .../apache/solr/cloud/MigrateRouteKeyTest.java  |    3 +-
 ...verseerCollectionConfigSetProcessorTest.java |    9 +-
 .../apache/solr/cloud/OverseerRolesTest.java    |    3 +-
 .../solr/cloud/PeerSyncReplicationTest.java     |    3 +-
 .../cloud/SharedFSAutoReplicaFailoverTest.java  |    3 +-
 .../TestLeaderInitiatedRecoveryThread.java      |    7 +-
 .../org/apache/solr/cloud/TestPullReplica.java  |    7 +-
 .../cloud/TestPullReplicaErrorHandling.java     |    9 +-
 .../apache/solr/cloud/TestRebalanceLeaders.java |    5 +-
 .../org/apache/solr/cloud/TestTlogReplica.java  |   11 +-
 .../solr/cloud/UnloadDistributedZkTest.java     |    3 +-
 .../AutoAddReplicasIntegrationTest.java         |    3 +-
 .../AutoAddReplicasPlanActionTest.java          |    4 +-
 .../autoscaling/AutoScalingHandlerTest.java     |    3 +-
 .../solr/cloud/autoscaling/CapturedEvent.java   |   63 +
 .../autoscaling/ComputePlanActionTest.java      |   16 +
 .../autoscaling/ExecutePlanActionTest.java      |    2 +-
 .../cloud/autoscaling/NodeAddedTriggerTest.java |    2 +-
 .../cloud/autoscaling/NodeLostTriggerTest.java  |    2 +-
 .../autoscaling/TriggerIntegrationTest.java     |  155 +--
 .../solr/cloud/autoscaling/sim/ActionError.java |   24 +
 .../sim/GenericDistributedQueue.java            |  599 ++++++++
 .../sim/GenericDistributedQueueFactory.java     |   45 +
 .../cloud/autoscaling/sim/LiveNodesSet.java     |   99 ++
 .../cloud/autoscaling/sim/SimCloudManager.java  |  607 +++++++++
 .../sim/SimClusterStateProvider.java            | 1275 ++++++++++++++++++
 .../autoscaling/sim/SimDistribStateManager.java |  580 ++++++++
 .../sim/SimDistributedQueueFactory.java         |  284 ++++
 .../autoscaling/sim/SimNodeStateProvider.java   |  267 ++++
 .../autoscaling/sim/SimSolrCloudTestCase.java   |  251 ++++
 .../sim/TestClusterStateProvider.java           |  221 +++
 .../autoscaling/sim/TestComputePlanAction.java  |  357 +++++
 .../sim/TestDistribStateManager.java            |  284 ++++
 .../autoscaling/sim/TestExecutePlanAction.java  |  216 +++
 .../sim/TestGenericDistributedQueue.java        |   32 +
 .../cloud/autoscaling/sim/TestLargeCluster.java |  266 ++++
 .../autoscaling/sim/TestNodeAddedTrigger.java   |  306 +++++
 .../autoscaling/sim/TestNodeLostTrigger.java    |  331 +++++
 .../cloud/autoscaling/sim/TestPolicyCloud.java  |  357 +++++
 .../sim/TestSimDistributedQueue.java            |  220 +++
 .../autoscaling/sim/TestTriggerIntegration.java | 1217 +++++++++++++++++
 .../cloud/autoscaling/sim/package-info.java     |   21 +
 .../cloud/cdcr/BaseCdcrDistributedZkTest.java   |    3 +-
 .../apache/solr/cloud/hdfs/StressHdfsTest.java  |    3 +-
 .../solr/cloud/overseer/ZkStateReaderTest.java  |    3 +-
 .../apache/solr/cloud/rule/RuleEngineTest.java  |    3 +
 .../solr/core/OpenCloseCoreStressTest.java      |    7 +-
 .../cloud/autoscaling/AutoScalingConfig.java    |    9 +
 .../autoscaling/DelegatingCloudManager.java     |   17 +
 .../DelegatingDistribStateManager.java          |   17 +-
 .../DelegatingNodeStateProvider.java            |   11 +
 .../cloud/autoscaling/DistribStateManager.java  |   23 +-
 .../cloud/autoscaling/NodeStateProvider.java    |    4 +-
 .../client/solrj/cloud/autoscaling/Policy.java  |   21 +-
 .../solrj/cloud/autoscaling/PolicyHelper.java   |   26 +-
 .../solrj/cloud/autoscaling/ReplicaInfo.java    |   33 +-
 .../cloud/autoscaling/SolrCloudManager.java     |   10 +-
 .../solrj/cloud/autoscaling/Suggestion.java     |    2 +-
 .../solr/client/solrj/impl/CloudSolrClient.java |    5 +-
 .../client/solrj/impl/ClusterStateProvider.java |    4 +-
 .../solrj/impl/HttpClusterStateProvider.java    |   21 +-
 .../solrj/impl/SolrClientCloudManager.java      |    6 +
 .../solrj/impl/SolrClientNodeStateProvider.java |    6 +
 .../solrj/impl/ZkDistribStateManager.java       |   32 +-
 .../apache/solr/common/cloud/ClusterState.java  |   10 +
 .../org/apache/solr/common/cloud/Replica.java   |   11 +
 .../apache/solr/common/cloud/ZkStateReader.java |   19 +-
 .../org/apache/solr/common/util/TimeSource.java |  161 +++
 .../java/org/apache/solr/common/util/Utils.java |   23 +-
 .../solr/client/solrj/SolrExampleTestsBase.java |    3 +-
 .../solr/client/solrj/TestLBHttpSolrClient.java |    3 +-
 .../solrj/cloud/autoscaling/TestPolicy.java     |   12 +
 .../cloud/AbstractFullDistribZkTestBase.java    |    7 +-
 .../java/org/apache/solr/cloud/ChaosMonkey.java |    3 +-
 .../org/apache/solr/cloud/ZkTestServer.java     |    3 +-
 126 files changed, 9186 insertions(+), 846 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/b1ce5e20/solr/CHANGES.txt
----------------------------------------------------------------------
diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt
index 2949945..d59de38 100644
--- a/solr/CHANGES.txt
+++ b/solr/CHANGES.txt
@@ -29,6 +29,10 @@ Apache UIMA 2.3.1
 Apache ZooKeeper 3.4.10
 Jetty 9.3.20.v20170531
 
+New Features
+----------------------
+* SOLR-11285: Simulation framework for autoscaling. (ab)
+
 Optimizations
 ----------------------
 

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/b1ce5e20/solr/core/src/java/org/apache/solr/cloud/ActionThrottle.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/ActionThrottle.java b/solr/core/src/java/org/apache/solr/cloud/ActionThrottle.java
index f60332c..520a269 100644
--- a/solr/core/src/java/org/apache/solr/cloud/ActionThrottle.java
+++ b/solr/core/src/java/org/apache/solr/cloud/ActionThrottle.java
@@ -20,7 +20,7 @@ import java.lang.invoke.MethodHandles;
 
 import java.util.concurrent.TimeUnit;
 
-import org.apache.solr.util.TimeSource;
+import org.apache.solr.common.util.TimeSource;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -35,9 +35,7 @@ public class ActionThrottle {
   private final TimeSource timeSource;
 
   public ActionThrottle(String name, long minMsBetweenActions) {
-    this.name = name;
-    this.minMsBetweenActions = minMsBetweenActions;
-    this.timeSource = TimeSource.NANO_TIME;
+    this(name, minMsBetweenActions, TimeSource.NANO_TIME);
   }
   
   public ActionThrottle(String name, long minMsBetweenActions, TimeSource timeSource) {
@@ -47,16 +45,20 @@ public class ActionThrottle {
   }
 
   public ActionThrottle(String name, long minMsBetweenActions, long lastActionStartedAt)  {
+    this(name, minMsBetweenActions, lastActionStartedAt, TimeSource.NANO_TIME);
+  }
+
+  public ActionThrottle(String name, long minMsBetweenActions, long lastActionStartedAt, TimeSource timeSource)  {
     this.name = name;
     this.minMsBetweenActions = minMsBetweenActions;
     this.lastActionStartedAt = lastActionStartedAt;
-    this.timeSource = TimeSource.NANO_TIME;
+    this.timeSource = timeSource;
   }
 
   public void reset() {
     lastActionStartedAt = null;
   }
-  
+
   public void markAttemptingAction() {
     lastActionStartedAt = timeSource.getTime();
   }

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/b1ce5e20/solr/core/src/java/org/apache/solr/cloud/AddReplicaCmd.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/AddReplicaCmd.java b/solr/core/src/java/org/apache/solr/cloud/AddReplicaCmd.java
index c785f9f..71a54c14 100644
--- a/solr/core/src/java/org/apache/solr/cloud/AddReplicaCmd.java
+++ b/solr/core/src/java/org/apache/solr/cloud/AddReplicaCmd.java
@@ -31,6 +31,7 @@ import java.util.concurrent.atomic.AtomicReference;
 import org.apache.commons.lang.StringUtils;
 import org.apache.solr.client.solrj.cloud.autoscaling.Policy;
 import org.apache.solr.client.solrj.cloud.autoscaling.PolicyHelper;
+import org.apache.solr.client.solrj.cloud.autoscaling.SolrCloudManager;
 import org.apache.solr.common.SolrCloseableLatch;
 import org.apache.solr.common.SolrException;
 import org.apache.solr.common.cloud.ClusterState;
@@ -48,7 +49,6 @@ import org.apache.solr.handler.component.ShardHandler;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import static org.apache.solr.cloud.Assign.getNodesForNewReplicas;
 import static org.apache.solr.cloud.OverseerCollectionMessageHandler.COLL_CONF;
 import static org.apache.solr.cloud.OverseerCollectionMessageHandler.SKIP_CREATE_REPLICA_IN_CLUSTER_STATE;
 import static org.apache.solr.common.cloud.ZkStateReader.COLLECTION_PROP;
@@ -56,6 +56,7 @@ import static org.apache.solr.common.cloud.ZkStateReader.CORE_NAME_PROP;
 import static org.apache.solr.common.cloud.ZkStateReader.SHARD_ID_PROP;
 import static org.apache.solr.common.params.CollectionParams.CollectionAction.ADDREPLICA;
 import static org.apache.solr.common.params.CommonAdminParams.ASYNC;
+import static org.apache.solr.common.params.CommonAdminParams.TIMEOUT;
 import static org.apache.solr.common.params.CommonAdminParams.WAIT_FOR_FINAL_STATE;
 
 public class AddReplicaCmd implements OverseerCollectionMessageHandler.Cmd {
@@ -69,81 +70,30 @@ public class AddReplicaCmd implements OverseerCollectionMessageHandler.Cmd {
 
   @Override
   public void call(ClusterState state, ZkNodeProps message, NamedList results) throws Exception {
-    addReplica(ocmh.zkStateReader.getClusterState(), message, results, null);
+    addReplica(state, message, results, null);
   }
 
   ZkNodeProps addReplica(ClusterState clusterState, ZkNodeProps message, NamedList results, Runnable onComplete)
       throws IOException, InterruptedException {
     log.debug("addReplica() : {}", Utils.toJSONString(message));
+    boolean waitForFinalState = message.getBool(WAIT_FOR_FINAL_STATE, false);
+    boolean skipCreateReplicaInClusterState = message.getBool(SKIP_CREATE_REPLICA_IN_CLUSTER_STATE, false);
+    final String asyncId = message.getStr(ASYNC);
+
+    AtomicReference<PolicyHelper.SessionWrapper> sessionWrapper = new AtomicReference<>();
+    message = assignReplicaDetails(ocmh.cloudManager, clusterState, message, sessionWrapper);
+
     String collection = message.getStr(COLLECTION_PROP);
+    DocCollection coll = clusterState.getCollection(collection);
+
     String node = message.getStr(CoreAdminParams.NODE);
     String shard = message.getStr(SHARD_ID_PROP);
     String coreName = message.getStr(CoreAdminParams.NAME);
     String coreNodeName = message.getStr(CoreAdminParams.CORE_NODE_NAME);
-    int timeout = message.getInt("timeout", 10 * 60); // 10 minutes
+    int timeout = message.getInt(TIMEOUT, 10 * 60); // 10 minutes
     Replica.Type replicaType = Replica.Type.valueOf(message.getStr(ZkStateReader.REPLICA_TYPE, Replica.Type.NRT.name()).toUpperCase(Locale.ROOT));
     boolean parallel = message.getBool("parallel", false);
-    boolean waitForFinalState = message.getBool(WAIT_FOR_FINAL_STATE, false);
-    if (StringUtils.isBlank(coreName)) {
-      coreName = message.getStr(CoreAdminParams.PROPERTY_PREFIX + CoreAdminParams.NAME);
-    }
-
-    final String asyncId = message.getStr(ASYNC);
-
-    DocCollection coll = clusterState.getCollection(collection);
-    if (coll == null) {
-      throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Collection: " + collection + " does not exist");
-    }
-    if (coll.getSlice(shard) == null) {
-      throw new SolrException(SolrException.ErrorCode.BAD_REQUEST,
-          "Collection: " + collection + " shard: " + shard + " does not exist");
-    }
-    ShardHandler shardHandler = ocmh.shardHandlerFactory.getShardHandler();
-    boolean skipCreateReplicaInClusterState = message.getBool(SKIP_CREATE_REPLICA_IN_CLUSTER_STATE, false);
-
 
-    AtomicReference<PolicyHelper.SessionWrapper> sessionWrapper = new AtomicReference<>();
-    // Kind of unnecessary, but it does put the logic of whether to override maxShardsPerNode in one place.
-    if (!skipCreateReplicaInClusterState) {
-      if (CreateShardCmd.usePolicyFramework(coll, ocmh)) {
-        if (node == null) {
-          if(coll.getPolicyName() != null) message.getProperties().put(Policy.POLICY, coll.getPolicyName());
-          node = Assign.identifyNodes(ocmh,
-              clusterState,
-              Collections.emptyList(),
-              collection,
-              message,
-              Collections.singletonList(shard),
-              replicaType == Replica.Type.NRT ? 0 : 1,
-              replicaType == Replica.Type.TLOG ? 0 : 1,
-              replicaType == Replica.Type.PULL ? 0 : 1
-          ).get(0).node;
-          sessionWrapper.set(PolicyHelper.getLastSessionWrapper(true));
-        }
-      } else {
-        node = getNodesForNewReplicas(clusterState, collection, shard, 1, node,
-            ocmh.overseer.getSolrCloudManager()).get(0).nodeName;// TODO: use replica type in this logic too
-      }
-    }
-    log.info("Node Identified {} for creating new replica", node);
-
-    if (!clusterState.liveNodesContain(node)) {
-      throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Node: " + node + " is not live");
-    }
-    if (coreName == null) {
-      coreName = Assign.buildCoreName(ocmh.overseer.getSolrCloudManager().getDistribStateManager(), coll, shard, replicaType);
-    } else if (!skipCreateReplicaInClusterState) {
-      //Validate that the core name is unique in that collection
-      for (Slice slice : coll.getSlices()) {
-        for (Replica replica : slice.getReplicas()) {
-          String replicaCoreName = replica.getStr(CORE_NAME_PROP);
-          if (coreName.equals(replicaCoreName)) {
-            throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Another replica with the same core name already exists" +
-                " for this collection");
-          }
-        }
-      }
-    }
     ModifiableSolrParams params = new ModifiableSolrParams();
 
     ZkStateReader zkStateReader = ocmh.zkStateReader;
@@ -210,6 +160,8 @@ public class AddReplicaCmd implements OverseerCollectionMessageHandler.Cmd {
 
     // For tracking async calls.
     Map<String,String> requestMap = new HashMap<>();
+    ShardHandler shardHandler = ocmh.shardHandlerFactory.getShardHandler();
+
     ocmh.sendShardRequest(node, params, shardHandler, asyncId, requestMap);
 
     final String fnode = node;
@@ -253,4 +205,75 @@ public class AddReplicaCmd implements OverseerCollectionMessageHandler.Cmd {
     );
   }
 
+  public static ZkNodeProps assignReplicaDetails(SolrCloudManager cloudManager, ClusterState clusterState,
+                                                 ZkNodeProps message, AtomicReference<PolicyHelper.SessionWrapper> sessionWrapper) throws IOException, InterruptedException {
+    boolean skipCreateReplicaInClusterState = message.getBool(SKIP_CREATE_REPLICA_IN_CLUSTER_STATE, false);
+
+    String collection = message.getStr(COLLECTION_PROP);
+    String node = message.getStr(CoreAdminParams.NODE);
+    String shard = message.getStr(SHARD_ID_PROP);
+    String coreName = message.getStr(CoreAdminParams.NAME);
+    String coreNodeName = message.getStr(CoreAdminParams.CORE_NODE_NAME);
+    Replica.Type replicaType = Replica.Type.valueOf(message.getStr(ZkStateReader.REPLICA_TYPE, Replica.Type.NRT.name()).toUpperCase(Locale.ROOT));
+    if (StringUtils.isBlank(coreName)) {
+      coreName = message.getStr(CoreAdminParams.PROPERTY_PREFIX + CoreAdminParams.NAME);
+    }
+
+    DocCollection coll = clusterState.getCollection(collection);
+    if (coll == null) {
+      throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Collection: " + collection + " does not exist");
+    }
+    if (coll.getSlice(shard) == null) {
+      throw new SolrException(SolrException.ErrorCode.BAD_REQUEST,
+          "Collection: " + collection + " shard: " + shard + " does not exist");
+    }
+
+    // Kind of unnecessary, but it does put the logic of whether to override maxShardsPerNode in one place.
+    if (!skipCreateReplicaInClusterState) {
+      if (CloudUtil.usePolicyFramework(coll, cloudManager)) {
+        if (node == null) {
+          if(coll.getPolicyName() != null) message.getProperties().put(Policy.POLICY, coll.getPolicyName());
+          node = Assign.identifyNodes(cloudManager,
+              clusterState,
+              Collections.emptyList(),
+              collection,
+              message,
+              Collections.singletonList(shard),
+              replicaType == Replica.Type.NRT ? 0 : 1,
+              replicaType == Replica.Type.TLOG ? 0 : 1,
+              replicaType == Replica.Type.PULL ? 0 : 1
+          ).get(0).node;
+          sessionWrapper.set(PolicyHelper.getLastSessionWrapper(true));
+        }
+      } else {
+        node = Assign.getNodesForNewReplicas(clusterState, collection, shard, 1, node,
+            cloudManager).get(0).nodeName;// TODO: use replica type in this logic too
+      }
+    }
+    log.info("Node Identified {} for creating new replica", node);
+
+    if (!clusterState.liveNodesContain(node)) {
+      throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Node: " + node + " is not live");
+    }
+    if (coreName == null) {
+      coreName = Assign.buildSolrCoreName(cloudManager.getDistribStateManager(), coll, shard, replicaType);
+    } else if (!skipCreateReplicaInClusterState) {
+      //Validate that the core name is unique in that collection
+      for (Slice slice : coll.getSlices()) {
+        for (Replica replica : slice.getReplicas()) {
+          String replicaCoreName = replica.getStr(CORE_NAME_PROP);
+          if (coreName.equals(replicaCoreName)) {
+            throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Another replica with the same core name already exists" +
+                " for this collection");
+          }
+        }
+      }
+    }
+    if (coreNodeName != null) {
+      message = message.plus(CoreAdminParams.CORE_NODE_NAME, coreNodeName);
+    }
+    message = message.plus(CoreAdminParams.NAME, coreName);
+    message = message.plus(CoreAdminParams.NODE, node);
+    return message;
+  }
 }

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/b1ce5e20/solr/core/src/java/org/apache/solr/cloud/Assign.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/Assign.java b/solr/core/src/java/org/apache/solr/cloud/Assign.java
index fd0738f..c746c94 100644
--- a/solr/core/src/java/org/apache/solr/cloud/Assign.java
+++ b/solr/core/src/java/org/apache/solr/cloud/Assign.java
@@ -114,7 +114,7 @@ public class Assign {
     }
   }
 
-  public static String assignNode(DistribStateManager stateManager, DocCollection collection) {
+  public static String assignCoreNodeName(DistribStateManager stateManager, DocCollection collection) {
     // for backward compatibility;
     int defaultValue = defaultCounterValue(collection, false);
     String coreNodeName = "core_node" + incAndGetId(stateManager, collection.getName(), defaultValue);
@@ -170,7 +170,7 @@ public class Assign {
     return returnShardId;
   }
 
-  private static String buildCoreName(String collectionName, String shard, Replica.Type type, int replicaNum) {
+  private static String buildSolrCoreName(String collectionName, String shard, Replica.Type type, int replicaNum) {
     // TODO: Adding the suffix is great for debugging, but may be an issue if at some point we want to support a way to change replica type
     return String.format(Locale.ROOT, "%s_%s_replica_%s%s", collectionName, shard, type.name().substring(0,1).toLowerCase(Locale.ROOT), replicaNum);
   }
@@ -187,20 +187,20 @@ public class Assign {
     return defaultValue * 20;
   }
 
-  public static String buildCoreName(DistribStateManager stateManager, DocCollection collection, String shard, Replica.Type type, boolean newCollection) {
+  public static String buildSolrCoreName(DistribStateManager stateManager, DocCollection collection, String shard, Replica.Type type, boolean newCollection) {
     Slice slice = collection.getSlice(shard);
     int defaultValue = defaultCounterValue(collection, newCollection);
     int replicaNum = incAndGetId(stateManager, collection.getName(), defaultValue);
-    String coreName = buildCoreName(collection.getName(), shard, type, replicaNum);
+    String coreName = buildSolrCoreName(collection.getName(), shard, type, replicaNum);
     while (existCoreName(coreName, slice)) {
       replicaNum = incAndGetId(stateManager, collection.getName(), defaultValue);
-      coreName = buildCoreName(collection.getName(), shard, type, replicaNum);
+      coreName = buildSolrCoreName(collection.getName(), shard, type, replicaNum);
     }
     return coreName;
   }
 
-  public static String buildCoreName(DistribStateManager stateManager, DocCollection collection, String shard, Replica.Type type) {
-    return buildCoreName(stateManager, collection, shard, type, false);
+  public static String buildSolrCoreName(DistribStateManager stateManager, DocCollection collection, String shard, Replica.Type type) {
+    return buildSolrCoreName(stateManager, collection, shard, type, false);
   }
 
   private static boolean existCoreName(String coreName, Slice slice) {
@@ -237,7 +237,7 @@ public class Assign {
     return nodeList;
   }
 
-  public static List<ReplicaPosition> identifyNodes(OverseerCollectionMessageHandler ocmh,
+  public static List<ReplicaPosition> identifyNodes(SolrCloudManager cloudManager,
                                                     ClusterState clusterState,
                                                     List<String> nodeList,
                                                     String collectionName,
@@ -248,7 +248,7 @@ public class Assign {
                                                     int numPullReplicas) throws IOException, InterruptedException {
     List<Map> rulesMap = (List) message.get("rule");
     String policyName = message.getStr(POLICY);
-    AutoScalingConfig autoScalingConfig = ocmh.overseer.getSolrCloudManager().getDistribStateManager().getAutoScalingConfig();
+    AutoScalingConfig autoScalingConfig = cloudManager.getDistribStateManager().getAutoScalingConfig();
 
     if (rulesMap == null && policyName == null && autoScalingConfig.getPolicy().getClusterPolicy().isEmpty()) {
       log.debug("Identify nodes using default");
@@ -283,7 +283,7 @@ public class Assign {
           (List<Map>) message.get(SNITCH),
           new HashMap<>(),//this is a new collection. So, there are no nodes in any shard
           nodeList,
-          ocmh.overseer.getSolrCloudManager(),
+          cloudManager,
           clusterState);
 
       Map<ReplicaPosition, String> nodeMappings = replicaAssigner.getNodeMappings();
@@ -294,7 +294,7 @@ public class Assign {
       if (message.getStr(CREATE_NODE_SET) == null)
         nodeList = Collections.emptyList();// unless explicitly specified do not pass node list to Policy
       return getPositionsUsingPolicy(collectionName,
-          shardNames, numNrtReplicas, numTlogReplicas, numPullReplicas, policyName, ocmh.overseer.getSolrCloudManager(), nodeList);
+          shardNames, numNrtReplicas, numTlogReplicas, numPullReplicas, policyName, cloudManager, nodeList);
     }
   }
 
@@ -397,7 +397,7 @@ public class Assign {
           nodesList);
       return replicaPositions;
     } catch (Exception e) {
-      throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Error closing CloudSolrClient",e);
+      throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Error getting replica locations", e);
     } finally {
       if (log.isTraceEnabled()) {
         if (replicaPositions != null)

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/b1ce5e20/solr/core/src/java/org/apache/solr/cloud/CloudConfigSetService.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/CloudConfigSetService.java b/solr/core/src/java/org/apache/solr/cloud/CloudConfigSetService.java
index 6e0583f..3cdc903 100644
--- a/solr/core/src/java/org/apache/solr/cloud/CloudConfigSetService.java
+++ b/solr/core/src/java/org/apache/solr/cloud/CloudConfigSetService.java
@@ -42,7 +42,7 @@ public class CloudConfigSetService extends ConfigSetService {
     try {
       // for back compat with cores that can create collections without the collections API
       if (!zkController.getZkClient().exists(ZkStateReader.COLLECTIONS_ZKNODE + "/" + cd.getCollectionName(), true)) {
-        CreateCollectionCmd.createCollectionZkNode(zkController.getZkClient(), cd.getCollectionName(), cd.getCloudDescriptor().getParams());
+        CreateCollectionCmd.createCollectionZkNode(zkController.getSolrCloudManager().getDistribStateManager(), cd.getCollectionName(), cd.getCloudDescriptor().getParams());
       }
     } catch (KeeperException e) {
       SolrException.log(log, null, e);

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/b1ce5e20/solr/core/src/java/org/apache/solr/cloud/CloudUtil.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/CloudUtil.java b/solr/core/src/java/org/apache/solr/cloud/CloudUtil.java
index 62cde7c..30de3d4 100644
--- a/solr/core/src/java/org/apache/solr/cloud/CloudUtil.java
+++ b/solr/core/src/java/org/apache/solr/cloud/CloudUtil.java
@@ -25,6 +25,8 @@ import java.util.List;
 import java.util.Map;
 
 import org.apache.commons.io.FileUtils;
+import org.apache.solr.client.solrj.cloud.autoscaling.AutoScalingConfig;
+import org.apache.solr.client.solrj.cloud.autoscaling.SolrCloudManager;
 import org.apache.solr.common.SolrException;
 import org.apache.solr.common.SolrException.ErrorCode;
 import org.apache.solr.common.cloud.DocCollection;
@@ -130,4 +132,9 @@ public class CloudUtil {
 
   }
 
+  static boolean usePolicyFramework(DocCollection collection, SolrCloudManager cloudManager)
+      throws IOException, InterruptedException {
+    AutoScalingConfig autoScalingConfig = cloudManager.getDistribStateManager().getAutoScalingConfig();
+    return !autoScalingConfig.getPolicy().getClusterPolicy().isEmpty() || collection.getPolicyName() != null;
+  }
 }

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/b1ce5e20/solr/core/src/java/org/apache/solr/cloud/CreateCollectionCmd.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/CreateCollectionCmd.java b/solr/core/src/java/org/apache/solr/cloud/CreateCollectionCmd.java
index 2c4f01e..2171c60 100644
--- a/solr/core/src/java/org/apache/solr/cloud/CreateCollectionCmd.java
+++ b/solr/core/src/java/org/apache/solr/cloud/CreateCollectionCmd.java
@@ -18,6 +18,7 @@
 package org.apache.solr.cloud;
 
 
+import java.io.IOException;
 import java.lang.invoke.MethodHandles;
 import java.util.ArrayList;
 import java.util.Collections;
@@ -26,11 +27,18 @@ import java.util.HashSet;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.NoSuchElementException;
 import java.util.Properties;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
 
+import org.apache.solr.client.solrj.cloud.autoscaling.AlreadyExistsException;
+import org.apache.solr.client.solrj.cloud.autoscaling.AutoScalingConfig;
+import org.apache.solr.client.solrj.cloud.autoscaling.DistribStateManager;
 import org.apache.solr.client.solrj.cloud.autoscaling.Policy;
 import org.apache.solr.client.solrj.cloud.autoscaling.PolicyHelper;
+import org.apache.solr.client.solrj.cloud.autoscaling.SolrCloudManager;
+import org.apache.solr.client.solrj.cloud.autoscaling.VersionedData;
 import org.apache.solr.cloud.OverseerCollectionMessageHandler.Cmd;
 import org.apache.solr.cloud.overseer.ClusterStateMutator;
 import org.apache.solr.common.SolrException;
@@ -40,7 +48,6 @@ import org.apache.solr.common.cloud.DocRouter;
 import org.apache.solr.common.cloud.ImplicitDocRouter;
 import org.apache.solr.common.cloud.Replica;
 import org.apache.solr.common.cloud.ReplicaPosition;
-import org.apache.solr.common.cloud.SolrZkClient;
 import org.apache.solr.common.cloud.ZkConfigManager;
 import org.apache.solr.common.cloud.ZkNodeProps;
 import org.apache.solr.common.cloud.ZkStateReader;
@@ -51,6 +58,7 @@ import org.apache.solr.common.params.CoreAdminParams;
 import org.apache.solr.common.params.ModifiableSolrParams;
 import org.apache.solr.common.util.NamedList;
 import org.apache.solr.common.util.SimpleOrderedMap;
+import org.apache.solr.common.util.TimeSource;
 import org.apache.solr.common.util.Utils;
 import org.apache.solr.handler.admin.ConfigSetsHandlerApi;
 import org.apache.solr.handler.component.ShardHandler;
@@ -70,7 +78,6 @@ import static org.apache.solr.common.cloud.ZkStateReader.MAX_SHARDS_PER_NODE;
 import static org.apache.solr.common.cloud.ZkStateReader.NRT_REPLICAS;
 import static org.apache.solr.common.cloud.ZkStateReader.PULL_REPLICAS;
 import static org.apache.solr.common.cloud.ZkStateReader.REPLICATION_FACTOR;
-import static org.apache.solr.common.cloud.ZkStateReader.SOLR_AUTOSCALING_CONF_PATH;
 import static org.apache.solr.common.cloud.ZkStateReader.TLOG_REPLICAS;
 import static org.apache.solr.common.params.CollectionParams.CollectionAction.ADDREPLICA;
 import static org.apache.solr.common.params.CommonAdminParams.ASYNC;
@@ -81,11 +88,13 @@ import static org.apache.solr.common.util.StrUtils.formatString;
 public class CreateCollectionCmd implements Cmd {
   private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
   private final OverseerCollectionMessageHandler ocmh;
-  private SolrZkClient zkClient;
+  private final TimeSource timeSource;
+  private final DistribStateManager stateManager;
 
   public CreateCollectionCmd(OverseerCollectionMessageHandler ocmh) {
     this.ocmh = ocmh;
-    this.zkClient = ocmh.zkStateReader.getZkClient();
+    this.stateManager = ocmh.cloudManager.getDistribStateManager();
+    this.timeSource = ocmh.cloudManager.getTimeSource();
   }
 
   @Override
@@ -103,95 +112,20 @@ public class CreateCollectionCmd implements Cmd {
     }
 
     ocmh.validateConfigOrThrowSolrException(configName);
-    PolicyHelper.SessionWrapper sessionWrapper = null;
+    AtomicReference<PolicyHelper.SessionWrapper> sessionWrapper = new AtomicReference<>();
 
     try {
-      // look at the replication factor and see if it matches reality
-      // if it does not, find best nodes to create more cores
 
-      int numTlogReplicas = message.getInt(TLOG_REPLICAS, 0);
-      int numNrtReplicas = message.getInt(NRT_REPLICAS, message.getInt(REPLICATION_FACTOR, numTlogReplicas>0?0:1));
-      int numPullReplicas = message.getInt(PULL_REPLICAS, 0);
-      Map autoScalingJson = Utils.getJson(ocmh.zkStateReader.getZkClient(), SOLR_AUTOSCALING_CONF_PATH, true);
-      String policy = message.getStr(Policy.POLICY);
-      boolean usePolicyFramework = autoScalingJson.get(Policy.CLUSTER_POLICY) != null || policy != null;
-
-      ShardHandler shardHandler = ocmh.shardHandlerFactory.getShardHandler();
       final String async = message.getStr(ASYNC);
 
-      Integer numSlices = message.getInt(NUM_SLICES, null);
-      String router = message.getStr("router.name", DocRouter.DEFAULT_NAME);
+      List<String> nodeList = new ArrayList<>();
       List<String> shardNames = new ArrayList<>();
-      if(ImplicitDocRouter.NAME.equals(router)){
-        ClusterStateMutator.getShardNames(shardNames, message.getStr("shards", null));
-        numSlices = shardNames.size();
-      } else {
-        if (numSlices == null ) {
-          throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, NUM_SLICES + " is a required param (when using CompositeId router).");
-        }
-        ClusterStateMutator.getShardNames(numSlices, shardNames);
-      }
-
-      int maxShardsPerNode = message.getInt(MAX_SHARDS_PER_NODE, 1);
-      if (usePolicyFramework && message.getStr(MAX_SHARDS_PER_NODE) != null && maxShardsPerNode > 0) {
-        throw new SolrException(ErrorCode.BAD_REQUEST, "'maxShardsPerNode>0' is not supported when autoScaling policies are used");
-      }
-      if (maxShardsPerNode == -1 || usePolicyFramework) maxShardsPerNode = Integer.MAX_VALUE;
-      if (numNrtReplicas + numTlogReplicas <= 0) {
-        throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, NRT_REPLICAS + " + " + TLOG_REPLICAS + " must be greater than 0");
-      }
-
-      if (numSlices <= 0) {
-        throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, NUM_SLICES + " must be > 0");
-      }
-
-      // we need to look at every node and see how many cores it serves
-      // add our new cores to existing nodes serving the least number of cores
-      // but (for now) require that each core goes on a distinct node.
-
-      final List<String> nodeList = Assign.getLiveOrLiveAndCreateNodeSetList(clusterState.getLiveNodes(), message, RANDOM);
-      List<ReplicaPosition> replicaPositions;
-      if (nodeList.isEmpty()) {
-        log.warn("It is unusual to create a collection ("+collectionName+") without cores.");
-
-        replicaPositions = new ArrayList<>();
-      } else {
-        int totalNumReplicas = numNrtReplicas + numTlogReplicas + numPullReplicas;
-        if (totalNumReplicas > nodeList.size()) {
-          log.warn("Specified number of replicas of "
-              + totalNumReplicas
-              + " on collection "
-              + collectionName
-              + " is higher than the number of Solr instances currently live or live and part of your " + CREATE_NODE_SET + "("
-              + nodeList.size()
-              + "). It's unusual to run two replica of the same slice on the same Solr-instance.");
-        }
-
-        int maxShardsAllowedToCreate = maxShardsPerNode == Integer.MAX_VALUE ?
-            Integer.MAX_VALUE :
-            maxShardsPerNode * nodeList.size();
-        int requestedShardsToCreate = numSlices * totalNumReplicas;
-        if (maxShardsAllowedToCreate < requestedShardsToCreate) {
-          throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Cannot create collection " + collectionName + ". Value of "
-              + MAX_SHARDS_PER_NODE + " is " + maxShardsPerNode
-              + ", and the number of nodes currently live or live and part of your "+CREATE_NODE_SET+" is " + nodeList.size()
-              + ". This allows a maximum of " + maxShardsAllowedToCreate
-              + " to be created. Value of " + NUM_SLICES + " is " + numSlices
-              + ", value of " + NRT_REPLICAS + " is " + numNrtReplicas
-              + ", value of " + TLOG_REPLICAS + " is " + numTlogReplicas
-              + " and value of " + PULL_REPLICAS + " is " + numPullReplicas
-              + ". This requires " + requestedShardsToCreate
-              + " shards to be created (higher than the allowed number)");
-        }
-        replicaPositions = Assign.identifyNodes(ocmh
-            , clusterState, nodeList, collectionName, message, shardNames, numNrtReplicas, numTlogReplicas, numPullReplicas);
-        sessionWrapper = PolicyHelper.getLastSessionWrapper(true);
-      }
-
+      List<ReplicaPosition> replicaPositions = buildReplicaPositions(ocmh.cloudManager, clusterState, message,
+          nodeList, shardNames, sessionWrapper);
       ZkStateReader zkStateReader = ocmh.zkStateReader;
       boolean isLegacyCloud = Overseer.isLegacy(zkStateReader);
 
-      ocmh.createConfNode(configName, collectionName, isLegacyCloud);
+      ocmh.createConfNode(stateManager, configName, collectionName, isLegacyCloud);
 
       Map<String,String> collectionParams = new HashMap<>();
       Map<String,Object> collectionProps = message.getProperties();
@@ -201,16 +135,16 @@ public class CreateCollectionCmd implements Cmd {
         }
       }
       
-      createCollectionZkNode(zkClient, collectionName, collectionParams);
+      createCollectionZkNode(stateManager, collectionName, collectionParams);
       
       Overseer.getStateUpdateQueue(zkStateReader.getZkClient()).offer(Utils.toJSON(message));
 
       // wait for a while until we don't see the collection
-      TimeOut waitUntil = new TimeOut(30, TimeUnit.SECONDS);
+      TimeOut waitUntil = new TimeOut(30, TimeUnit.SECONDS, timeSource);
       boolean created = false;
       while (! waitUntil.hasTimedOut()) {
-        Thread.sleep(100);
-        created = zkStateReader.getClusterState().hasCollection(collectionName);
+        waitUntil.sleep(100);
+        created = ocmh.cloudManager.getClusterStateProvider().getClusterState().hasCollection(collectionName);
         if(created) break;
       }
       if (!created)
@@ -225,12 +159,14 @@ public class CreateCollectionCmd implements Cmd {
       Map<String, String> requestMap = new HashMap<>();
 
 
-      log.debug(formatString("Creating SolrCores for new collection {0}, shardNames {1} , nrtReplicas : {2}, tlogReplicas: {3}, pullReplicas: {4}",
-          collectionName, shardNames, numNrtReplicas, numTlogReplicas, numPullReplicas));
+      log.debug(formatString("Creating SolrCores for new collection {0}, shardNames {1} , message : {2}",
+          collectionName, shardNames, message));
       Map<String,ShardRequest> coresToCreate = new LinkedHashMap<>();
+      ShardHandler shardHandler = ocmh.shardHandlerFactory.getShardHandler();
       for (ReplicaPosition replicaPosition : replicaPositions) {
         String nodeName = replicaPosition.node;
-        String coreName = Assign.buildCoreName(ocmh.overseer.getSolrCloudManager().getDistribStateManager(), zkStateReader.getClusterState().getCollection(collectionName),
+        String coreName = Assign.buildSolrCoreName(ocmh.cloudManager.getDistribStateManager(),
+            ocmh.cloudManager.getClusterStateProvider().getClusterState().getCollection(collectionName),
             replicaPosition.shard, replicaPosition.type, true);
         log.debug(formatString("Creating core {0} as part of shard {1} of collection {2} on {3}"
             , coreName, replicaPosition.shard, collectionName, nodeName));
@@ -260,7 +196,7 @@ public class CreateCollectionCmd implements Cmd {
         params.set(COLL_CONF, configName);
         params.set(CoreAdminParams.COLLECTION, collectionName);
         params.set(CoreAdminParams.SHARD, replicaPosition.shard);
-        params.set(ZkStateReader.NUM_SHARDS_PROP, numSlices);
+        params.set(ZkStateReader.NUM_SHARDS_PROP, shardNames.size());
         params.set(CoreAdminParams.NEW_COLLECTION, "true");
         params.set(CoreAdminParams.REPLICA_TYPE, replicaPosition.type.name());
 
@@ -320,10 +256,93 @@ public class CreateCollectionCmd implements Cmd {
     } catch (Exception ex) {
       throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, null, ex);
     } finally {
-      if(sessionWrapper != null) sessionWrapper.release();
+      if (sessionWrapper.get() != null) sessionWrapper.get().release();
     }
   }
 
+  public static List<ReplicaPosition> buildReplicaPositions(SolrCloudManager cloudManager, ClusterState clusterState,
+                                                            ZkNodeProps message,
+                                                            List<String> nodeList, List<String> shardNames,
+                                                            AtomicReference<PolicyHelper.SessionWrapper> sessionWrapper) throws IOException, InterruptedException {
+    final String collectionName = message.getStr(NAME);
+    // look at the replication factor and see if it matches reality
+    // if it does not, find best nodes to create more cores
+    int numTlogReplicas = message.getInt(TLOG_REPLICAS, 0);
+    int numNrtReplicas = message.getInt(NRT_REPLICAS, message.getInt(REPLICATION_FACTOR, numTlogReplicas>0?0:1));
+    int numPullReplicas = message.getInt(PULL_REPLICAS, 0);
+    AutoScalingConfig autoScalingConfig = cloudManager.getDistribStateManager().getAutoScalingConfig();
+    String policy = message.getStr(Policy.POLICY);
+    boolean usePolicyFramework = !autoScalingConfig.getPolicy().getClusterPolicy().isEmpty() || policy != null;
+
+    Integer numSlices = message.getInt(NUM_SLICES, null);
+    String router = message.getStr("router.name", DocRouter.DEFAULT_NAME);
+    if(ImplicitDocRouter.NAME.equals(router)){
+      ClusterStateMutator.getShardNames(shardNames, message.getStr("shards", null));
+      numSlices = shardNames.size();
+    } else {
+      if (numSlices == null ) {
+        throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, NUM_SLICES + " is a required param (when using CompositeId router).");
+      }
+      if (numSlices <= 0) {
+        throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, NUM_SLICES + " must be > 0");
+      }
+      ClusterStateMutator.getShardNames(numSlices, shardNames);
+    }
+
+    int maxShardsPerNode = message.getInt(MAX_SHARDS_PER_NODE, 1);
+    if (usePolicyFramework && message.getStr(MAX_SHARDS_PER_NODE) != null && maxShardsPerNode > 0) {
+      throw new SolrException(ErrorCode.BAD_REQUEST, "'maxShardsPerNode>0' is not supported when autoScaling policies are used");
+    }
+    if (maxShardsPerNode == -1 || usePolicyFramework) maxShardsPerNode = Integer.MAX_VALUE;
+    if (numNrtReplicas + numTlogReplicas <= 0) {
+      throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, NRT_REPLICAS + " + " + TLOG_REPLICAS + " must be greater than 0");
+    }
+
+    // we need to look at every node and see how many cores it serves
+    // add our new cores to existing nodes serving the least number of cores
+    // but (for now) require that each core goes on a distinct node.
+
+    List<ReplicaPosition> replicaPositions;
+    nodeList.addAll(Assign.getLiveOrLiveAndCreateNodeSetList(clusterState.getLiveNodes(), message, RANDOM));
+    if (nodeList.isEmpty()) {
+      log.warn("It is unusual to create a collection ("+collectionName+") without cores.");
+
+      replicaPositions = new ArrayList<>();
+    } else {
+      int totalNumReplicas = numNrtReplicas + numTlogReplicas + numPullReplicas;
+      if (totalNumReplicas > nodeList.size()) {
+        log.warn("Specified number of replicas of "
+            + totalNumReplicas
+            + " on collection "
+            + collectionName
+            + " is higher than the number of Solr instances currently live or live and part of your " + CREATE_NODE_SET + "("
+            + nodeList.size()
+            + "). It's unusual to run two replica of the same slice on the same Solr-instance.");
+      }
+
+      int maxShardsAllowedToCreate = maxShardsPerNode == Integer.MAX_VALUE ?
+          Integer.MAX_VALUE :
+          maxShardsPerNode * nodeList.size();
+      int requestedShardsToCreate = numSlices * totalNumReplicas;
+      if (maxShardsAllowedToCreate < requestedShardsToCreate) {
+        throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Cannot create collection " + collectionName + ". Value of "
+            + MAX_SHARDS_PER_NODE + " is " + maxShardsPerNode
+            + ", and the number of nodes currently live or live and part of your "+CREATE_NODE_SET+" is " + nodeList.size()
+            + ". This allows a maximum of " + maxShardsAllowedToCreate
+            + " to be created. Value of " + NUM_SLICES + " is " + numSlices
+            + ", value of " + NRT_REPLICAS + " is " + numNrtReplicas
+            + ", value of " + TLOG_REPLICAS + " is " + numTlogReplicas
+            + " and value of " + PULL_REPLICAS + " is " + numPullReplicas
+            + ". This requires " + requestedShardsToCreate
+            + " shards to be created (higher than the allowed number)");
+      }
+      replicaPositions = Assign.identifyNodes(cloudManager
+          , clusterState, nodeList, collectionName, message, shardNames, numNrtReplicas, numTlogReplicas, numPullReplicas);
+      sessionWrapper.set(PolicyHelper.getLastSessionWrapper(true));
+    }
+    return replicaPositions;
+  }
+
   String getConfigName(String coll, ZkNodeProps message) throws KeeperException, InterruptedException {
     String configName = message.getStr(COLL_CONF);
 
@@ -375,12 +394,12 @@ public class CreateCollectionCmd implements Cmd {
     }
   }
 
-  public static void createCollectionZkNode(SolrZkClient zkClient, String collection, Map<String,String> params) {
+  public static void createCollectionZkNode(DistribStateManager stateManager, String collection, Map<String,String> params) {
     log.debug("Check for collection zkNode:" + collection);
     String collectionPath = ZkStateReader.COLLECTIONS_ZKNODE + "/" + collection;
 
     try {
-      if (!zkClient.exists(collectionPath, true)) {
+      if (!stateManager.hasData(collectionPath)) {
         log.debug("Creating collection in ZooKeeper:" + collection);
 
         try {
@@ -394,7 +413,7 @@ public class CreateCollectionCmd implements Cmd {
             // if the config name wasn't passed in, use the default
             if (!collectionProps.containsKey(ZkController.CONFIGNAME_PROP)) {
               // users can create the collection node and conf link ahead of time, or this may return another option
-              getConfName(zkClient, collection, collectionPath, collectionProps);
+              getConfName(stateManager, collection, collectionPath, collectionProps);
             }
 
           } else if (System.getProperty("bootstrap_confdir") != null) {
@@ -417,19 +436,21 @@ public class CreateCollectionCmd implements Cmd {
             // the conf name should should be the collection name of this core
             collectionProps.put(ZkController.CONFIGNAME_PROP, collection);
           } else {
-            getConfName(zkClient, collection, collectionPath, collectionProps);
+            getConfName(stateManager, collection, collectionPath, collectionProps);
           }
 
           collectionProps.remove(ZkStateReader.NUM_SHARDS_PROP);  // we don't put numShards in the collections properties
 
           ZkNodeProps zkProps = new ZkNodeProps(collectionProps);
-          zkClient.makePath(collectionPath, Utils.toJSON(zkProps), CreateMode.PERSISTENT, null, true);
+          stateManager.makePath(collectionPath, Utils.toJSON(zkProps), CreateMode.PERSISTENT, false);
 
         } catch (KeeperException e) {
           // it's okay if the node already exists
           if (e.code() != KeeperException.Code.NODEEXISTS) {
             throw e;
           }
+        } catch (AlreadyExistsException e) {
+          // it's okay if the node already exists
         }
       } else {
         log.debug("Collection zkNode exists");
@@ -441,6 +462,8 @@ public class CreateCollectionCmd implements Cmd {
         return;
       }
       throw new SolrException(ErrorCode.SERVER_ERROR, "Error creating collection node in Zookeeper", e);
+    } catch (IOException e) {
+      throw new SolrException(ErrorCode.SERVER_ERROR, "Error creating collection node in Zookeeper", e);
     } catch (InterruptedException e) {
       Thread.interrupted();
       throw new SolrException(ErrorCode.SERVER_ERROR, "Error creating collection node in Zookeeper", e);
@@ -448,8 +471,8 @@ public class CreateCollectionCmd implements Cmd {
 
   }
   
-  private static void getConfName(SolrZkClient zkClient, String collection, String collectionPath, Map<String,Object> collectionProps) throws KeeperException,
-  InterruptedException {
+  private static void getConfName(DistribStateManager stateManager, String collection, String collectionPath, Map<String,Object> collectionProps) throws IOException,
+      KeeperException, InterruptedException {
     // check for configName
     log.debug("Looking for collection configName");
     if (collectionProps.containsKey("configName")) {
@@ -461,17 +484,17 @@ public class CreateCollectionCmd implements Cmd {
     int retry = 1;
     int retryLimt = 6;
     for (; retry < retryLimt; retry++) {
-      if (zkClient.exists(collectionPath, true)) {
-        ZkNodeProps cProps = ZkNodeProps.load(zkClient.getData(collectionPath, null, null, true));
+      if (stateManager.hasData(collectionPath)) {
+        VersionedData data = stateManager.getData(collectionPath);
+        ZkNodeProps cProps = ZkNodeProps.load(data.getData());
         if (cProps.containsKey(ZkController.CONFIGNAME_PROP)) {
           break;
         }
       }
 
       try {
-        configNames = zkClient.getChildren(ZkConfigManager.CONFIGS_ZKNODE, null,
-            true);
-      } catch (NoNodeException e) {
+        configNames = stateManager.listData(ZkConfigManager.CONFIGS_ZKNODE);
+      } catch (NoSuchElementException | NoNodeException e) {
         // just keep trying
       }
 
@@ -507,15 +530,4 @@ public class CreateCollectionCmd implements Cmd {
           "Could not find configName for collection " + collection + " found:" + configNames);
     }
   }
-
-  public static boolean usePolicyFramework(ZkStateReader zkStateReader, ZkNodeProps message) {
-    Map autoScalingJson = Collections.emptyMap();
-    try {
-      autoScalingJson = Utils.getJson(zkStateReader.getZkClient(), SOLR_AUTOSCALING_CONF_PATH, true);
-    } catch (Exception e) {
-      return false;
-    }
-    return autoScalingJson.get(Policy.CLUSTER_POLICY) != null || message.getStr(Policy.POLICY) != null;
-  }
-
 }

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/b1ce5e20/solr/core/src/java/org/apache/solr/cloud/CreateShardCmd.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/CreateShardCmd.java b/solr/core/src/java/org/apache/solr/cloud/CreateShardCmd.java
index 18b0b63..c6afdcc 100644
--- a/solr/core/src/java/org/apache/solr/cloud/CreateShardCmd.java
+++ b/solr/core/src/java/org/apache/solr/cloud/CreateShardCmd.java
@@ -25,11 +25,12 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
 
 import com.google.common.collect.ImmutableMap;
-import org.apache.solr.client.solrj.cloud.autoscaling.AutoScalingConfig;
 import org.apache.solr.client.solrj.cloud.autoscaling.Policy;
 import org.apache.solr.client.solrj.cloud.autoscaling.PolicyHelper;
+import org.apache.solr.client.solrj.cloud.autoscaling.SolrCloudManager;
 import org.apache.solr.cloud.OverseerCollectionMessageHandler.Cmd;
 import org.apache.solr.common.SolrCloseableLatch;
 import org.apache.solr.common.SolrException;
@@ -76,59 +77,21 @@ public class CreateShardCmd implements Cmd {
       throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "'collection' and 'shard' are required parameters");
 
     DocCollection collection = clusterState.getCollection(collectionName);
-    int numNrtReplicas = message.getInt(NRT_REPLICAS, message.getInt(REPLICATION_FACTOR, collection.getInt(NRT_REPLICAS, collection.getInt(REPLICATION_FACTOR, 1))));
-    int numPullReplicas = message.getInt(PULL_REPLICAS, collection.getInt(PULL_REPLICAS, 0));
-    int numTlogReplicas = message.getInt(TLOG_REPLICAS, collection.getInt(TLOG_REPLICAS, 0));
-    int totalReplicas = numNrtReplicas + numPullReplicas + numTlogReplicas;
-    
-    if (numNrtReplicas + numTlogReplicas <= 0) {
-      throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, NRT_REPLICAS + " + " + TLOG_REPLICAS + " must be greater than 0");
-    }
-    
-    Object createNodeSetStr = message.get(OverseerCollectionMessageHandler.CREATE_NODE_SET);
 
     ZkStateReader zkStateReader = ocmh.zkStateReader;
-    PolicyHelper.SessionWrapper sessionWrapper = null;
-    boolean usePolicyFramework = usePolicyFramework(collection,ocmh);
-    List<ReplicaPosition> positions = null;
+    AtomicReference<PolicyHelper.SessionWrapper> sessionWrapper = new AtomicReference<>();
     SolrCloseableLatch countDownLatch;
     try {
-      if (usePolicyFramework) {
-        if (collection.getPolicyName() != null) message.getProperties().put(Policy.POLICY, collection.getPolicyName());
-        positions = Assign.identifyNodes(ocmh,
-            clusterState,
-            Assign.getLiveOrLiveAndCreateNodeSetList(clusterState.getLiveNodes(), message, RANDOM),
-            collectionName,
-            message,
-            Collections.singletonList(sliceName),
-            numNrtReplicas,
-            numTlogReplicas,
-            numPullReplicas);
-        sessionWrapper = PolicyHelper.getLastSessionWrapper(true);
-      } else {
-        List<Assign.ReplicaCount> sortedNodeList = getNodesForNewReplicas(clusterState, collectionName, sliceName, totalReplicas,
-            createNodeSetStr, ocmh.overseer.getSolrCloudManager());
-        int i = 0;
-        positions = new ArrayList<>();
-        for (Map.Entry<Replica.Type, Integer> e : ImmutableMap.of(Replica.Type.NRT, numNrtReplicas,
-            Replica.Type.TLOG, numTlogReplicas,
-            Replica.Type.PULL, numPullReplicas
-        ).entrySet()) {
-          for (int j = 0; j < e.getValue(); j++) {
-            positions.add(new ReplicaPosition(sliceName, j + 1, e.getKey(), sortedNodeList.get(i % sortedNodeList.size()).nodeName));
-            i++;
-          }
-        }
-      }
+      List<ReplicaPosition> positions = buildReplicaPositions(ocmh.cloudManager, clusterState, collectionName, message, sessionWrapper);
       Overseer.getStateUpdateQueue(zkStateReader.getZkClient()).offer(Utils.toJSON(message));
       // wait for a while until we see the shard
       ocmh.waitForNewShard(collectionName, sliceName);
 
       String async = message.getStr(ASYNC);
-      countDownLatch = new SolrCloseableLatch(totalReplicas, ocmh);
+      countDownLatch = new SolrCloseableLatch(positions.size(), ocmh);
       for (ReplicaPosition position : positions) {
         String nodeName = position.node;
-        String coreName = Assign.buildCoreName(ocmh.overseer.getSolrCloudManager().getDistribStateManager(), collection, sliceName, position.type);
+        String coreName = Assign.buildSolrCoreName(ocmh.cloudManager.getDistribStateManager(), collection, sliceName, position.type);
         log.info("Creating replica " + coreName + " as part of slice " + sliceName + " of collection " + collectionName
             + " on " + nodeName);
 
@@ -166,7 +129,7 @@ public class CreateShardCmd implements Cmd {
         });
       }
     } finally {
-      if(sessionWrapper != null) sessionWrapper.release();
+      if (sessionWrapper.get() != null) sessionWrapper.get().release();
     }
 
     log.debug("Waiting for create shard action to complete");
@@ -177,9 +140,52 @@ public class CreateShardCmd implements Cmd {
 
   }
 
-  static boolean usePolicyFramework(DocCollection collection, OverseerCollectionMessageHandler ocmh)
-      throws IOException, InterruptedException {
-    AutoScalingConfig autoScalingConfig = ocmh.overseer.getSolrCloudManager().getDistribStateManager().getAutoScalingConfig();
-    return !autoScalingConfig.getPolicy().getClusterPolicy().isEmpty() || collection.getPolicyName() != null;
+  public static List<ReplicaPosition> buildReplicaPositions(SolrCloudManager cloudManager, ClusterState clusterState,
+         String collectionName, ZkNodeProps message, AtomicReference< PolicyHelper.SessionWrapper> sessionWrapper) throws IOException, InterruptedException {
+    String sliceName = message.getStr(SHARD_ID_PROP);
+    DocCollection collection = clusterState.getCollection(collectionName);
+
+    int numNrtReplicas = message.getInt(NRT_REPLICAS, message.getInt(REPLICATION_FACTOR, collection.getInt(NRT_REPLICAS, collection.getInt(REPLICATION_FACTOR, 1))));
+    int numPullReplicas = message.getInt(PULL_REPLICAS, collection.getInt(PULL_REPLICAS, 0));
+    int numTlogReplicas = message.getInt(TLOG_REPLICAS, collection.getInt(TLOG_REPLICAS, 0));
+    int totalReplicas = numNrtReplicas + numPullReplicas + numTlogReplicas;
+
+    if (numNrtReplicas + numTlogReplicas <= 0) {
+      throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, NRT_REPLICAS + " + " + TLOG_REPLICAS + " must be greater than 0");
+    }
+
+    Object createNodeSetStr = message.get(OverseerCollectionMessageHandler.CREATE_NODE_SET);
+
+    boolean usePolicyFramework = CloudUtil.usePolicyFramework(collection, cloudManager);
+    List<ReplicaPosition> positions;
+    if (usePolicyFramework) {
+      if (collection.getPolicyName() != null) message.getProperties().put(Policy.POLICY, collection.getPolicyName());
+      positions = Assign.identifyNodes(cloudManager,
+          clusterState,
+          Assign.getLiveOrLiveAndCreateNodeSetList(clusterState.getLiveNodes(), message, RANDOM),
+          collection.getName(),
+          message,
+          Collections.singletonList(sliceName),
+          numNrtReplicas,
+          numTlogReplicas,
+          numPullReplicas);
+      sessionWrapper.set(PolicyHelper.getLastSessionWrapper(true));
+    } else {
+      List<Assign.ReplicaCount> sortedNodeList = getNodesForNewReplicas(clusterState, collection.getName(), sliceName, totalReplicas,
+          createNodeSetStr, cloudManager);
+      int i = 0;
+      positions = new ArrayList<>();
+      for (Map.Entry<Replica.Type, Integer> e : ImmutableMap.of(Replica.Type.NRT, numNrtReplicas,
+          Replica.Type.TLOG, numTlogReplicas,
+          Replica.Type.PULL, numPullReplicas
+      ).entrySet()) {
+        for (int j = 0; j < e.getValue(); j++) {
+          positions.add(new ReplicaPosition(sliceName, j + 1, e.getKey(), sortedNodeList.get(i % sortedNodeList.size()).nodeName));
+          i++;
+        }
+      }
+    }
+    return positions;
   }
+
 }

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/b1ce5e20/solr/core/src/java/org/apache/solr/cloud/DeleteCollectionCmd.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/DeleteCollectionCmd.java b/solr/core/src/java/org/apache/solr/cloud/DeleteCollectionCmd.java
index d2e40f7..dc91905 100644
--- a/solr/core/src/java/org/apache/solr/cloud/DeleteCollectionCmd.java
+++ b/solr/core/src/java/org/apache/solr/cloud/DeleteCollectionCmd.java
@@ -35,6 +35,7 @@ import org.apache.solr.common.cloud.ZkStateReader;
 import org.apache.solr.common.params.CoreAdminParams;
 import org.apache.solr.common.params.ModifiableSolrParams;
 import org.apache.solr.common.util.NamedList;
+import org.apache.solr.common.util.TimeSource;
 import org.apache.solr.common.util.Utils;
 import org.apache.solr.core.snapshots.SolrSnapshotManager;
 import org.apache.solr.util.TimeOut;
@@ -49,9 +50,11 @@ import static org.apache.solr.common.params.CommonParams.NAME;
 public class DeleteCollectionCmd implements OverseerCollectionMessageHandler.Cmd {
   private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
   private final OverseerCollectionMessageHandler ocmh;
+  private final TimeSource timeSource;
 
   public DeleteCollectionCmd(OverseerCollectionMessageHandler ocmh) {
     this.ocmh = ocmh;
+    this.timeSource = ocmh.cloudManager.getTimeSource();
   }
 
   @Override
@@ -94,13 +97,13 @@ public class DeleteCollectionCmd implements OverseerCollectionMessageHandler.Cmd
       Overseer.getStateUpdateQueue(zkStateReader.getZkClient()).offer(Utils.toJSON(m));
 
       // wait for a while until we don't see the collection
-      TimeOut timeout = new TimeOut(30, TimeUnit.SECONDS);
+      TimeOut timeout = new TimeOut(30, TimeUnit.SECONDS, timeSource);
       boolean removed = false;
       while (! timeout.hasTimedOut()) {
-        Thread.sleep(100);
+        timeout.sleep(100);
         removed = !zkStateReader.getClusterState().hasCollection(collection);
         if (removed) {
-          Thread.sleep(500); // just a bit of time so it's more likely other
+          timeout.sleep(500); // just a bit of time so it's more likely other
           // readers see on return
           break;
         }

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/b1ce5e20/solr/core/src/java/org/apache/solr/cloud/DeleteShardCmd.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/DeleteShardCmd.java b/solr/core/src/java/org/apache/solr/cloud/DeleteShardCmd.java
index f13fed5..58c4e63 100644
--- a/solr/core/src/java/org/apache/solr/cloud/DeleteShardCmd.java
+++ b/solr/core/src/java/org/apache/solr/cloud/DeleteShardCmd.java
@@ -39,6 +39,7 @@ import org.apache.solr.common.cloud.ZkStateReader;
 import org.apache.solr.common.params.CoreAdminParams;
 import org.apache.solr.common.util.NamedList;
 import org.apache.solr.common.util.SimpleOrderedMap;
+import org.apache.solr.common.util.TimeSource;
 import org.apache.solr.common.util.Utils;
 import org.apache.solr.util.TimeOut;
 import org.apache.zookeeper.KeeperException;
@@ -55,9 +56,11 @@ import static org.apache.solr.common.params.CommonAdminParams.ASYNC;
 public class DeleteShardCmd implements Cmd {
   private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
   private final OverseerCollectionMessageHandler ocmh;
+  private final TimeSource timeSource;
 
   public DeleteShardCmd(OverseerCollectionMessageHandler ocmh) {
     this.ocmh = ocmh;
+    this.timeSource = ocmh.cloudManager.getTimeSource();
   }
 
   @Override
@@ -134,14 +137,14 @@ public class DeleteShardCmd implements Cmd {
       Overseer.getStateUpdateQueue(zkStateReader.getZkClient()).offer(Utils.toJSON(m));
 
       // wait for a while until we don't see the shard
-      TimeOut timeout = new TimeOut(30, TimeUnit.SECONDS);
+      TimeOut timeout = new TimeOut(30, TimeUnit.SECONDS, timeSource);
       boolean removed = false;
       while (!timeout.hasTimedOut()) {
-        Thread.sleep(100);
+        timeout.sleep(100);
         DocCollection collection = zkStateReader.getClusterState().getCollection(collectionName);
         removed = collection.getSlice(sliceId) == null;
         if (removed) {
-          Thread.sleep(100); // just a bit of time so it's more likely other readers see on return
+          timeout.sleep(100); // just a bit of time so it's more likely other readers see on return
           break;
         }
       }

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/b1ce5e20/solr/core/src/java/org/apache/solr/cloud/MigrateCmd.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/MigrateCmd.java b/solr/core/src/java/org/apache/solr/cloud/MigrateCmd.java
index cacccb0..02fdb5c 100644
--- a/solr/core/src/java/org/apache/solr/cloud/MigrateCmd.java
+++ b/solr/core/src/java/org/apache/solr/cloud/MigrateCmd.java
@@ -38,6 +38,7 @@ import org.apache.solr.common.cloud.ZkStateReader;
 import org.apache.solr.common.params.CoreAdminParams;
 import org.apache.solr.common.params.ModifiableSolrParams;
 import org.apache.solr.common.util.NamedList;
+import org.apache.solr.common.util.TimeSource;
 import org.apache.solr.common.util.Utils;
 import org.apache.solr.handler.component.ShardHandler;
 import org.apache.solr.handler.component.ShardHandlerFactory;
@@ -63,9 +64,11 @@ import static org.apache.solr.common.util.Utils.makeMap;
 public class MigrateCmd implements OverseerCollectionMessageHandler.Cmd {
   private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
   private final OverseerCollectionMessageHandler ocmh;
+  private final TimeSource timeSource;
 
   public MigrateCmd(OverseerCollectionMessageHandler ocmh) {
     this.ocmh = ocmh;
+    this.timeSource = ocmh.cloudManager.getTimeSource();
   }
 
 
@@ -179,10 +182,10 @@ public class MigrateCmd implements OverseerCollectionMessageHandler.Cmd {
 
     // wait for a while until we see the new rule
     log.info("Waiting to see routing rule updated in clusterstate");
-    TimeOut waitUntil = new TimeOut(60, TimeUnit.SECONDS);
+    TimeOut waitUntil = new TimeOut(60, TimeUnit.SECONDS, timeSource);
     boolean added = false;
     while (!waitUntil.hasTimedOut()) {
-      Thread.sleep(100);
+      waitUntil.sleep(100);
       sourceCollection = zkStateReader.getClusterState().getCollection(sourceCollection.getName());
       sourceSlice = sourceCollection.getSlice(sourceSlice.getName());
       Map<String, RoutingRule> rules = sourceSlice.getRoutingRules();
@@ -257,7 +260,7 @@ public class MigrateCmd implements OverseerCollectionMessageHandler.Cmd {
 
     log.info("Creating a replica of temporary collection: {} on the target leader node: {}",
         tempSourceCollectionName, targetLeader.getNodeName());
-    String tempCollectionReplica2 = Assign.buildCoreName(ocmh.overseer.getSolrCloudManager().getDistribStateManager(),
+    String tempCollectionReplica2 = Assign.buildSolrCoreName(ocmh.overseer.getSolrCloudManager().getDistribStateManager(),
         zkStateReader.getClusterState().getCollection(tempSourceCollectionName), tempSourceSlice.getName(), Replica.Type.NRT);
     props = new HashMap<>();
     props.put(Overseer.QUEUE_OPERATION, ADDREPLICA.toLower());

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/b1ce5e20/solr/core/src/java/org/apache/solr/cloud/MoveReplicaCmd.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/MoveReplicaCmd.java b/solr/core/src/java/org/apache/solr/cloud/MoveReplicaCmd.java
index 71d5c82..44493ec 100644
--- a/solr/core/src/java/org/apache/solr/cloud/MoveReplicaCmd.java
+++ b/solr/core/src/java/org/apache/solr/cloud/MoveReplicaCmd.java
@@ -35,6 +35,7 @@ import org.apache.solr.common.cloud.ZkStateReader;
 import org.apache.solr.common.params.CollectionParams;
 import org.apache.solr.common.params.CoreAdminParams;
 import org.apache.solr.common.util.NamedList;
+import org.apache.solr.common.util.TimeSource;
 import org.apache.solr.common.util.Utils;
 import org.apache.solr.update.UpdateLog;
 import org.apache.solr.util.TimeOut;
@@ -54,9 +55,11 @@ public class MoveReplicaCmd implements Cmd{
   private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
 
   private final OverseerCollectionMessageHandler ocmh;
+  private final TimeSource timeSource;
 
   public MoveReplicaCmd(OverseerCollectionMessageHandler ocmh) {
     this.ocmh = ocmh;
+    this.timeSource = ocmh.cloudManager.getTimeSource();
   }
 
   @Override
@@ -158,11 +161,11 @@ public class MoveReplicaCmd implements Cmd{
         return;
       }
 
-      TimeOut timeOut = new TimeOut(20L, TimeUnit.SECONDS);
+      TimeOut timeOut = new TimeOut(20L, TimeUnit.SECONDS, timeSource);
       while (!timeOut.hasTimedOut()) {
         coll = ocmh.zkStateReader.getClusterState().getCollection(coll.getName());
         if (coll.getReplica(replica.getName()) != null) {
-          Thread.sleep(100);
+          timeOut.sleep(100);
         } else {
           break;
         }
@@ -233,7 +236,7 @@ public class MoveReplicaCmd implements Cmd{
 
   private void moveNormalReplica(ClusterState clusterState, NamedList results, String targetNode, String async,
                                  DocCollection coll, Replica replica, Slice slice, int timeout, boolean waitForFinalState) throws Exception {
-    String newCoreName = Assign.buildCoreName(ocmh.overseer.getSolrCloudManager().getDistribStateManager(), coll, slice.getName(), replica.getType());
+    String newCoreName = Assign.buildSolrCoreName(ocmh.overseer.getSolrCloudManager().getDistribStateManager(), coll, slice.getName(), replica.getType());
     ZkNodeProps addReplicasProps = new ZkNodeProps(
         COLLECTION_PROP, coll.getName(),
         SHARD_ID_PROP, slice.getName(),

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/b1ce5e20/solr/core/src/java/org/apache/solr/cloud/Overseer.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/Overseer.java b/solr/core/src/java/org/apache/solr/cloud/Overseer.java
index 3b65d6f..d1bb13a 100644
--- a/solr/core/src/java/org/apache/solr/cloud/Overseer.java
+++ b/solr/core/src/java/org/apache/solr/cloud/Overseer.java
@@ -80,7 +80,7 @@ public class Overseer implements SolrCloseable {
   enum LeaderStatus {DONT_KNOW, NO, YES}
 
   private class ClusterStateUpdater implements Runnable, Closeable {
-    
+
     private final ZkStateReader reader;
     private final SolrZkClient zkClient;
     private final String myId;
@@ -88,7 +88,7 @@ public class Overseer implements SolrCloseable {
     private final ZkDistributedQueue stateUpdateQueue;
     //TODO remove in 9.0, we do not push message into this queue anymore
     //Internal queue where overseer stores events that have not yet been published into cloudstate
-    //If Overseer dies while extracting the main queue a new overseer will start from this queue 
+    //If Overseer dies while extracting the main queue a new overseer will start from this queue
     private final ZkDistributedQueue workQueue;
     // Internal map which holds the information about running tasks.
     private final DistributedMap runningMap;
@@ -120,7 +120,7 @@ public class Overseer implements SolrCloseable {
     public Stats getWorkQueueStats()  {
       return workQueue.getZkStats();
     }
-    
+
     @Override
     public void run() {
 
@@ -442,7 +442,7 @@ public class Overseer implements SolrCloseable {
 
   }
 
-  static class OverseerThread extends Thread implements Closeable {
+  public static class OverseerThread extends Thread implements Closeable {
 
     protected volatile boolean isClosed;
     private Closeable thread;
@@ -466,9 +466,9 @@ public class Overseer implements SolrCloseable {
     public boolean isClosed() {
       return this.isClosed;
     }
-    
+
   }
-  
+
   private OverseerThread ccThread;
 
   private OverseerThread updaterThread;
@@ -478,7 +478,7 @@ public class Overseer implements SolrCloseable {
   private final ZkStateReader reader;
 
   private final ShardHandler shardHandler;
-  
+
   private final UpdateShardHandler updateShardHandler;
 
   private final String adminPath;
@@ -505,7 +505,7 @@ public class Overseer implements SolrCloseable {
     this.stats = new Stats();
     this.config = config;
   }
-  
+
   public synchronized void start(String id) {
     this.id = id;
     closed = false;

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/b1ce5e20/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionMessageHandler.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionMessageHandler.java b/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionMessageHandler.java
index af04453..60b098a 100644
--- a/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionMessageHandler.java
+++ b/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionMessageHandler.java
@@ -36,7 +36,10 @@ import org.apache.commons.lang.StringUtils;
 import org.apache.solr.client.solrj.SolrResponse;
 import org.apache.solr.client.solrj.SolrServerException;
 import org.apache.solr.client.solrj.cloud.DistributedQueue;
-import org.apache.solr.client.solrj.cloud.autoscaling.PolicyHelper;
+import org.apache.solr.client.solrj.cloud.autoscaling.AlreadyExistsException;
+import org.apache.solr.client.solrj.cloud.autoscaling.BadVersionException;
+import org.apache.solr.client.solrj.cloud.autoscaling.DistribStateManager;
+import org.apache.solr.client.solrj.cloud.autoscaling.SolrCloudManager;
 import org.apache.solr.client.solrj.impl.HttpSolrClient;
 import org.apache.solr.client.solrj.impl.HttpSolrClient.RemoteSolrException;
 import org.apache.solr.client.solrj.request.AbstractUpdateRequest;
@@ -66,6 +69,7 @@ import org.apache.solr.common.util.NamedList;
 import org.apache.solr.common.util.SimpleOrderedMap;
 import org.apache.solr.common.util.StrUtils;
 import org.apache.solr.common.util.SuppressForbidden;
+import org.apache.solr.common.util.TimeSource;
 import org.apache.solr.common.util.Utils;
 import org.apache.solr.handler.component.ShardHandler;
 import org.apache.solr.handler.component.ShardHandlerFactory;
@@ -74,6 +78,7 @@ import org.apache.solr.handler.component.ShardResponse;
 import org.apache.solr.util.DefaultSolrThreadFactory;
 import org.apache.solr.util.RTimer;
 import org.apache.solr.util.TimeOut;
+import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -144,8 +149,10 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler,
   ShardHandlerFactory shardHandlerFactory;
   String adminPath;
   ZkStateReader zkStateReader;
+  SolrCloudManager cloudManager;
   String myId;
   Stats stats;
+  TimeSource timeSource;
 
   // Set that tracks collections that are currently being processed by a running task.
   // This is used for handling mutual exclusion of the tasks.
@@ -183,6 +190,8 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler,
     this.myId = myId;
     this.stats = stats;
     this.overseer = overseer;
+    this.cloudManager = overseer.getSolrCloudManager();
+    this.timeSource = cloudManager.getTimeSource();
     this.isClosed = false;
     commandMap = new ImmutableMap.Builder<CollectionAction, Cmd>()
         .put(REPLACENODE, new ReplaceNodeCmd(this))
@@ -230,7 +239,7 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler,
       CollectionAction action = getCollectionAction(operation);
       Cmd command = commandMap.get(action);
       if (command != null) {
-        command.call(zkStateReader.getClusterState(), message, results);
+        command.call(cloudManager.getClusterStateProvider().getClusterState(), message, results);
       } else {
         throw new SolrException(ErrorCode.BAD_REQUEST, "Unknown operation:"
             + operation);
@@ -424,9 +433,9 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler,
   }
 
   boolean waitForCoreNodeGone(String collectionName, String shard, String replicaName, int timeoutms) throws InterruptedException {
-    TimeOut timeout = new TimeOut(timeoutms, TimeUnit.MILLISECONDS);
+    TimeOut timeout = new TimeOut(timeoutms, TimeUnit.MILLISECONDS, timeSource);
     while (! timeout.hasTimedOut()) {
-      Thread.sleep(100);
+      timeout.sleep(100);
       DocCollection docCollection = zkStateReader.getClusterState().getCollection(collectionName);
       if (docCollection == null) { // someone already deleted the collection
         return true;
@@ -466,7 +475,7 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler,
 
     boolean firstLoop = true;
     // wait for a while until the state format changes
-    TimeOut timeout = new TimeOut(30, TimeUnit.SECONDS);
+    TimeOut timeout = new TimeOut(30, TimeUnit.SECONDS, timeSource);
     while (! timeout.hasTimedOut()) {
       DocCollection collection = zkStateReader.getClusterState().getCollection(collectionName);
       if (collection == null) {
@@ -484,7 +493,7 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler,
         ZkNodeProps m = new ZkNodeProps(Overseer.QUEUE_OPERATION, MIGRATESTATEFORMAT.toLower(), COLLECTION_PROP, collectionName);
         Overseer.getStateUpdateQueue(zkStateReader.getZkClient()).offer(Utils.toJSON(m));
       }
-      Thread.sleep(100);
+      timeout.sleep(100);
     }
     throw new SolrException(ErrorCode.SERVER_ERROR, "Could not migrate state format for collection: " + collectionName);
   }
@@ -643,16 +652,16 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler,
       validateConfigOrThrowSolrException(configName);
       
       boolean isLegacyCloud =  Overseer.isLegacy(zkStateReader);
-      createConfNode(configName, collectionName, isLegacyCloud);
+      createConfNode(cloudManager.getDistribStateManager(), configName, collectionName, isLegacyCloud);
       reloadCollection(null, new ZkNodeProps(NAME, collectionName), results);
     }
     
     overseer.getStateUpdateQueue(zkStateReader.getZkClient()).offer(Utils.toJSON(message));
 
-    TimeOut timeout = new TimeOut(30, TimeUnit.SECONDS);
+    TimeOut timeout = new TimeOut(30, TimeUnit.SECONDS, timeSource);
     boolean areChangesVisible = true;
     while (!timeout.hasTimedOut()) {
-      DocCollection collection = zkStateReader.getClusterState().getCollection(collectionName);
+      DocCollection collection = cloudManager.getClusterStateProvider().getClusterState().getCollection(collectionName);
       areChangesVisible = true;
       for (Map.Entry<String,Object> updateEntry : message.getProperties().entrySet()) {
         String updateKey = updateEntry.getKey();
@@ -664,7 +673,7 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler,
         }
       }
       if (areChangesVisible) break;
-      Thread.sleep(100);
+      timeout.sleep(100);
     }
 
     if (!areChangesVisible)
@@ -681,7 +690,7 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler,
 
   Map<String, Replica> waitToSeeReplicasInState(String collectionName, Collection<String> coreNames) throws InterruptedException {
     Map<String, Replica> result = new HashMap<>();
-    TimeOut timeout = new TimeOut(30, TimeUnit.SECONDS);
+    TimeOut timeout = new TimeOut(30, TimeUnit.SECONDS, timeSource);
     while (true) {
       DocCollection coll = zkStateReader.getClusterState().getCollection(collectionName);
       for (String coreName : coreNames) {
@@ -747,8 +756,8 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler,
   }
 
 
-  void validateConfigOrThrowSolrException(String configName) throws KeeperException, InterruptedException {
-    boolean isValid = zkStateReader.getZkClient().exists(ZkConfigManager.CONFIGS_ZKNODE + "/" + configName, true);
+  void validateConfigOrThrowSolrException(String configName) throws IOException, KeeperException, InterruptedException {
+    boolean isValid = cloudManager.getDistribStateManager().hasData(ZkConfigManager.CONFIGS_ZKNODE + "/" + configName);
     if(!isValid) {
       throw new SolrException(ErrorCode.BAD_REQUEST, "Can not find the specified config set: " + configName);
     }
@@ -758,16 +767,16 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler,
    * This doesn't validate the config (path) itself and is just responsible for creating the confNode.
    * That check should be done before the config node is created.
    */
-  void createConfNode(String configName, String coll, boolean isLegacyCloud) throws KeeperException, InterruptedException {
+  public static void createConfNode(DistribStateManager stateManager, String configName, String coll, boolean isLegacyCloud) throws IOException, AlreadyExistsException, BadVersionException, KeeperException, InterruptedException {
     
     if (configName != null) {
       String collDir = ZkStateReader.COLLECTIONS_ZKNODE + "/" + coll;
       log.debug("creating collections conf node {} ", collDir);
       byte[] data = Utils.toJSON(makeMap(ZkController.CONFIGNAME_PROP, configName));
-      if (zkStateReader.getZkClient().exists(collDir, true)) {
-        zkStateReader.getZkClient().setData(collDir, data, true);
+      if (stateManager.hasData(collDir)) {
+        stateManager.setData(collDir, data, -1);
       } else {
-        zkStateReader.getZkClient().makePath(collDir, data, true);
+        stateManager.makePath(collDir, data, CreateMode.PERSISTENT, false);
       }
     } else {
       if(isLegacyCloud){
@@ -776,7 +785,6 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler,
         throw new SolrException(ErrorCode.BAD_REQUEST,"Unable to get config name");
       }
     }
-
   }
   
   private void collectionCmd(ZkNodeProps message, ModifiableSolrParams params,
@@ -972,8 +980,6 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler,
     );
   }
 
-  public final PolicyHelper.SessionRef policySessionRef = new PolicyHelper.SessionRef();
-
   @Override
   public void close() throws IOException {
     this.isClosed = true;

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/b1ce5e20/solr/core/src/java/org/apache/solr/cloud/RestoreCmd.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/RestoreCmd.java b/solr/core/src/java/org/apache/solr/cloud/RestoreCmd.java
index 039ab5c..9c9a5c9 100644
--- a/solr/core/src/java/org/apache/solr/cloud/RestoreCmd.java
+++ b/solr/core/src/java/org/apache/solr/cloud/RestoreCmd.java
@@ -223,7 +223,7 @@ public class RestoreCmd implements OverseerCollectionMessageHandler.Cmd {
 
     try {
       List<ReplicaPosition> replicaPositions = Assign.identifyNodes(
-          ocmh, clusterState,
+          ocmh.cloudManager, clusterState,
           nodeList, restoreCollectionName,
           message, sliceNames,
           numNrtReplicas, numTlogReplicas, numPullReplicas);