You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by da...@apache.org on 2018/01/23 10:30:50 UTC

[20/41] lucene-solr:jira/solr-11702: SOLR-11817: Move Collections API classes to it's own package

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/a3c4f738/solr/core/src/java/org/apache/solr/cloud/RoutedAliasCreateCollectionCmd.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/RoutedAliasCreateCollectionCmd.java b/solr/core/src/java/org/apache/solr/cloud/RoutedAliasCreateCollectionCmd.java
deleted file mode 100644
index 607588c..0000000
--- a/solr/core/src/java/org/apache/solr/cloud/RoutedAliasCreateCollectionCmd.java
+++ /dev/null
@@ -1,182 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.solr.cloud;
-
-import java.lang.invoke.MethodHandles;
-import java.time.Instant;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.TimeZone;
-
-import org.apache.solr.common.SolrException;
-import org.apache.solr.common.cloud.Aliases;
-import org.apache.solr.common.cloud.ClusterState;
-import org.apache.solr.common.cloud.ZkNodeProps;
-import org.apache.solr.common.cloud.ZkStateReader;
-import org.apache.solr.common.params.CollectionParams;
-import org.apache.solr.common.params.CommonParams;
-import org.apache.solr.common.params.ModifiableSolrParams;
-import org.apache.solr.common.util.NamedList;
-import org.apache.solr.common.util.StrUtils;
-import org.apache.solr.handler.admin.CollectionsHandler;
-import org.apache.solr.request.LocalSolrQueryRequest;
-import org.apache.solr.update.processor.TimeRoutedAliasUpdateProcessor;
-import org.apache.solr.util.TimeZoneUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import static org.apache.solr.cloud.OverseerCollectionMessageHandler.COLL_CONF;
-import static org.apache.solr.common.params.CommonParams.NAME;
-import static org.apache.solr.update.processor.TimeRoutedAliasUpdateProcessor.ROUTER_FIELD_METADATA;
-import static org.apache.solr.update.processor.TimeRoutedAliasUpdateProcessor.ROUTER_INTERVAL_METADATA;
-
-/**
- * For "routed aliases", creates another collection and adds it to the alias. In some cases it will not
- * add a new collection.
- * If a collection is created, then collection creation info is returned.
- *
- * Note: this logic is within an Overseer because we want to leverage the mutual exclusion
- * property afforded by the lock it obtains on the alias name.
- * @since 7.3
- */
-public class RoutedAliasCreateCollectionCmd implements OverseerCollectionMessageHandler.Cmd {
-  private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
-
-  public static final String IF_MOST_RECENT_COLL_NAME = "ifMostRecentCollName";
-
-  public static final String COLL_METAPREFIX = "collection-create.";
-
-  private final OverseerCollectionMessageHandler ocmh;
-
-  public RoutedAliasCreateCollectionCmd(OverseerCollectionMessageHandler ocmh) {
-    this.ocmh = ocmh;
-  }
-
-  /* TODO:
-  There are a few classes related to time routed alias processing.  We need to share some logic better.
-   */
-
-
-  @Override
-  public void call(ClusterState clusterState, ZkNodeProps message, NamedList results) throws Exception {
-    //---- PARSE PRIMARY MESSAGE PARAMS
-    // important that we use NAME for the alias as that is what the Overseer will get a lock on before calling us
-    final String aliasName = message.getStr(NAME);
-    // the client believes this is the mostRecent collection name.  We assert this if provided.
-    final String ifMostRecentCollName = message.getStr(IF_MOST_RECENT_COLL_NAME); // optional
-
-    // TODO collection param (or intervalDateMath override?), useful for data capped collections
-
-    //---- PARSE ALIAS INFO FROM ZK
-    final ZkStateReader.AliasesManager aliasesHolder = ocmh.zkStateReader.aliasesHolder;
-    final Aliases aliases = aliasesHolder.getAliases();
-    final Map<String, String> aliasMetadata = aliases.getCollectionAliasMetadata(aliasName);
-    if (aliasMetadata == null) {
-      throw newAliasMustExistException(aliasName); // if it did exist, we'd have a non-null map
-    }
-
-    String routeField = aliasMetadata.get(ROUTER_FIELD_METADATA);
-    if (routeField == null) {
-      throw new SolrException(SolrException.ErrorCode.BAD_REQUEST,
-          "This command only works on time routed aliases.  Expected alias metadata not found.");
-    }
-    String intervalDateMath = aliasMetadata.getOrDefault(ROUTER_INTERVAL_METADATA, "+1DAY");
-    TimeZone intervalTimeZone = TimeZoneUtils.parseTimezone(aliasMetadata.get(CommonParams.TZ));
-
-    //TODO this is ugly; how can we organize the code related to this feature better?
-    final List<Map.Entry<Instant, String>> parsedCollections =
-        TimeRoutedAliasUpdateProcessor.parseCollections(aliasName, aliases, () -> newAliasMustExistException(aliasName));
-
-    //---- GET MOST RECENT COLL
-    final Map.Entry<Instant, String> mostRecentEntry = parsedCollections.get(0);
-    final Instant mostRecentCollTimestamp = mostRecentEntry.getKey();
-    final String mostRecentCollName = mostRecentEntry.getValue();
-    if (ifMostRecentCollName != null) {
-      if (!mostRecentCollName.equals(ifMostRecentCollName)) {
-        // Possibly due to race conditions in URPs on multiple leaders calling us at the same time
-        String msg = IF_MOST_RECENT_COLL_NAME + " expected " + ifMostRecentCollName + " but it's " + mostRecentCollName;
-        if (parsedCollections.stream().map(Map.Entry::getValue).noneMatch(ifMostRecentCollName::equals)) {
-          msg += ". Furthermore this collection isn't in the list of collections referenced by the alias.";
-        }
-        log.info(msg);
-        results.add("message", msg);
-        return;
-      }
-    } else if (mostRecentCollTimestamp.isAfter(Instant.now())) {
-      final String msg = "Most recent collection is in the future, so we won't create another.";
-      log.info(msg);
-      results.add("message", msg);
-      return;
-    }
-
-    //---- COMPUTE NEXT COLLECTION NAME
-    final Instant nextCollTimestamp = TimeRoutedAliasUpdateProcessor.computeNextCollTimestamp(mostRecentCollTimestamp, intervalDateMath, intervalTimeZone);
-    assert nextCollTimestamp.isAfter(mostRecentCollTimestamp);
-    final String createCollName = TimeRoutedAliasUpdateProcessor.formatCollectionNameFromInstant(aliasName, nextCollTimestamp);
-
-    //---- CREATE THE COLLECTION
-    // Map alias metadata starting with a prefix to a create-collection API request
-    final ModifiableSolrParams createReqParams = new ModifiableSolrParams();
-    for (Map.Entry<String, String> e : aliasMetadata.entrySet()) {
-      if (e.getKey().startsWith(COLL_METAPREFIX)) {
-        createReqParams.set(e.getKey().substring(COLL_METAPREFIX.length()), e.getValue());
-      }
-    }
-    if (createReqParams.get(COLL_CONF) == null) {
-      throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
-          "We require an explicit " + COLL_CONF );
-    }
-    createReqParams.set(NAME, createCollName);
-    createReqParams.set("property." + TimeRoutedAliasUpdateProcessor.TIME_PARTITION_ALIAS_NAME_CORE_PROP, aliasName);
-    // a CollectionOperation reads params and produces a message (Map) that is supposed to be sent to the Overseer.
-    //   Although we could create the Map without it, there are a fair amount of rules we don't want to reproduce.
-    final Map<String, Object> createMsgMap = CollectionsHandler.CollectionOperation.CREATE_OP.execute(
-        new LocalSolrQueryRequest(null, createReqParams),
-        null,
-        ocmh.overseer.getCoreContainer().getCollectionsHandler());
-    createMsgMap.put(Overseer.QUEUE_OPERATION, "create");
-    // Since we are running in the Overseer here, send the message directly to the Overseer CreateCollectionCmd
-    ocmh.commandMap.get(CollectionParams.CollectionAction.CREATE).call(clusterState, new ZkNodeProps(createMsgMap), results);
-
-    CollectionsHandler.waitForActiveCollection(createCollName, null, ocmh.overseer.getCoreContainer(), new OverseerSolrResponse(results));
-
-    //TODO delete some of the oldest collection(s) ?
-
-    //---- UPDATE THE ALIAS
-    aliasesHolder.applyModificationAndExportToZk(curAliases -> {
-      final List<String> curTargetCollections = curAliases.getCollectionAliasListMap().get(aliasName);
-      if (curTargetCollections.contains(createCollName)) {
-        return curAliases;
-      } else {
-        List<String> newTargetCollections = new ArrayList<>(curTargetCollections.size() + 1);
-        // prepend it on purpose (thus reverse sorted). Solr alias resolution defaults to the first collection in a list
-        newTargetCollections.add(createCollName);
-        newTargetCollections.addAll(curTargetCollections);
-        return curAliases.cloneWithCollectionAlias(aliasName, StrUtils.join(newTargetCollections, ','));
-      }
-    });
-
-  }
-
-  private SolrException newAliasMustExistException(String aliasName) {
-    return new SolrException(SolrException.ErrorCode.BAD_REQUEST,
-        "Alias " + aliasName + " does not exist.");
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/a3c4f738/solr/core/src/java/org/apache/solr/cloud/SplitShardCmd.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/SplitShardCmd.java b/solr/core/src/java/org/apache/solr/cloud/SplitShardCmd.java
deleted file mode 100644
index 9732616..0000000
--- a/solr/core/src/java/org/apache/solr/cloud/SplitShardCmd.java
+++ /dev/null
@@ -1,542 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.solr.cloud;
-
-
-import java.lang.invoke.MethodHandles;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.atomic.AtomicReference;
-
-import org.apache.solr.client.solrj.cloud.DistributedQueue;
-import org.apache.solr.client.solrj.cloud.autoscaling.PolicyHelper;
-import org.apache.solr.client.solrj.cloud.autoscaling.SolrCloudManager;
-import org.apache.solr.client.solrj.request.CoreAdminRequest;
-import org.apache.solr.cloud.OverseerCollectionMessageHandler.Cmd;
-import org.apache.solr.cloud.overseer.OverseerAction;
-import org.apache.solr.common.SolrException;
-import org.apache.solr.common.cloud.ClusterState;
-import org.apache.solr.common.cloud.CompositeIdRouter;
-import org.apache.solr.common.cloud.DocCollection;
-import org.apache.solr.common.cloud.DocRouter;
-import org.apache.solr.common.cloud.PlainIdRouter;
-import org.apache.solr.common.cloud.Replica;
-import org.apache.solr.common.cloud.ReplicaPosition;
-import org.apache.solr.common.cloud.Slice;
-import org.apache.solr.common.cloud.ZkNodeProps;
-import org.apache.solr.common.cloud.ZkStateReader;
-import org.apache.solr.common.params.CommonAdminParams;
-import org.apache.solr.common.params.CoreAdminParams;
-import org.apache.solr.common.params.ModifiableSolrParams;
-import org.apache.solr.common.util.NamedList;
-import org.apache.solr.common.util.Utils;
-import org.apache.solr.handler.component.ShardHandler;
-import org.apache.solr.util.TestInjection;
-import org.apache.zookeeper.data.Stat;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import static org.apache.solr.cloud.OverseerCollectionMessageHandler.COLL_PROP_PREFIX;
-import static org.apache.solr.cloud.OverseerCollectionMessageHandler.SKIP_CREATE_REPLICA_IN_CLUSTER_STATE;
-import static org.apache.solr.common.cloud.ZkStateReader.COLLECTION_PROP;
-import static org.apache.solr.common.cloud.ZkStateReader.SHARD_ID_PROP;
-import static org.apache.solr.common.params.CollectionParams.CollectionAction.ADDREPLICA;
-import static org.apache.solr.common.params.CollectionParams.CollectionAction.CREATESHARD;
-import static org.apache.solr.common.params.CollectionParams.CollectionAction.DELETESHARD;
-import static org.apache.solr.common.params.CommonAdminParams.ASYNC;
-
-
-public class SplitShardCmd implements Cmd {
-  private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
-
-  private final OverseerCollectionMessageHandler ocmh;
-
-  public SplitShardCmd(OverseerCollectionMessageHandler ocmh) {
-    this.ocmh = ocmh;
-  }
-
-  @Override
-  public void call(ClusterState state, ZkNodeProps message, NamedList results) throws Exception {
-    split(state, message, results);
-  }
-
-  public boolean split(ClusterState clusterState, ZkNodeProps message, NamedList results) throws Exception {
-    boolean waitForFinalState = message.getBool(CommonAdminParams.WAIT_FOR_FINAL_STATE, false);
-    String collectionName = message.getStr(CoreAdminParams.COLLECTION);
-
-    log.info("Split shard invoked");
-    ZkStateReader zkStateReader = ocmh.zkStateReader;
-    zkStateReader.forceUpdateCollection(collectionName);
-    AtomicReference<String> slice = new AtomicReference<>();
-    slice.set(message.getStr(ZkStateReader.SHARD_ID_PROP));
-
-    String splitKey = message.getStr("split.key");
-    DocCollection collection = clusterState.getCollection(collectionName);
-
-    PolicyHelper.SessionWrapper sessionWrapper = null;
-
-    Slice parentSlice = getParentSlice(clusterState, collectionName, slice, splitKey);
-
-    // find the leader for the shard
-    Replica parentShardLeader = null;
-    try {
-      parentShardLeader = zkStateReader.getLeaderRetry(collectionName, slice.get(), 10000);
-    } catch (InterruptedException e) {
-      Thread.currentThread().interrupt();
-    }
-
-    // let's record the ephemeralOwner of the parent leader node
-    Stat leaderZnodeStat = zkStateReader.getZkClient().exists(ZkStateReader.LIVE_NODES_ZKNODE + "/" + parentShardLeader.getNodeName(), null, true);
-    if (leaderZnodeStat == null)  {
-      // we just got to know the leader but its live node is gone already!
-      throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "The shard leader node: " + parentShardLeader.getNodeName() + " is not live anymore!");
-    }
-
-    List<DocRouter.Range> subRanges = new ArrayList<>();
-    List<String> subSlices = new ArrayList<>();
-    List<String> subShardNames = new ArrayList<>();
-
-    String rangesStr = fillRanges(ocmh.cloudManager, message, collection, parentSlice, subRanges, subSlices, subShardNames);
-
-    try {
-
-      boolean oldShardsDeleted = false;
-      for (String subSlice : subSlices) {
-        Slice oSlice = collection.getSlice(subSlice);
-        if (oSlice != null) {
-          final Slice.State state = oSlice.getState();
-          if (state == Slice.State.ACTIVE) {
-            throw new SolrException(SolrException.ErrorCode.BAD_REQUEST,
-                "Sub-shard: " + subSlice + " exists in active state. Aborting split shard.");
-          } else if (state == Slice.State.CONSTRUCTION || state == Slice.State.RECOVERY) {
-            // delete the shards
-            log.info("Sub-shard: {} already exists therefore requesting its deletion", subSlice);
-            Map<String, Object> propMap = new HashMap<>();
-            propMap.put(Overseer.QUEUE_OPERATION, "deleteshard");
-            propMap.put(COLLECTION_PROP, collectionName);
-            propMap.put(SHARD_ID_PROP, subSlice);
-            ZkNodeProps m = new ZkNodeProps(propMap);
-            try {
-              ocmh.commandMap.get(DELETESHARD).call(clusterState, m, new NamedList());
-            } catch (Exception e) {
-              throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Unable to delete already existing sub shard: " + subSlice,
-                  e);
-            }
-
-            oldShardsDeleted = true;
-          }
-        }
-      }
-
-      if (oldShardsDeleted) {
-        // refresh the locally cached cluster state
-        // we know we have the latest because otherwise deleteshard would have failed
-        clusterState = zkStateReader.getClusterState();
-        collection = clusterState.getCollection(collectionName);
-      }
-
-      final String asyncId = message.getStr(ASYNC);
-      Map<String, String> requestMap = new HashMap<>();
-      String nodeName = parentShardLeader.getNodeName();
-
-      for (int i = 0; i < subRanges.size(); i++) {
-        String subSlice = subSlices.get(i);
-        String subShardName = subShardNames.get(i);
-        DocRouter.Range subRange = subRanges.get(i);
-
-        log.info("Creating slice " + subSlice + " of collection " + collectionName + " on " + nodeName);
-
-        Map<String, Object> propMap = new HashMap<>();
-        propMap.put(Overseer.QUEUE_OPERATION, CREATESHARD.toLower());
-        propMap.put(ZkStateReader.SHARD_ID_PROP, subSlice);
-        propMap.put(ZkStateReader.COLLECTION_PROP, collectionName);
-        propMap.put(ZkStateReader.SHARD_RANGE_PROP, subRange.toString());
-        propMap.put(ZkStateReader.SHARD_STATE_PROP, Slice.State.CONSTRUCTION.toString());
-        propMap.put(ZkStateReader.SHARD_PARENT_PROP, parentSlice.getName());
-        propMap.put("shard_parent_node", parentShardLeader.getNodeName());
-        propMap.put("shard_parent_zk_session", leaderZnodeStat.getEphemeralOwner());
-        DistributedQueue inQueue = Overseer.getStateUpdateQueue(zkStateReader.getZkClient());
-        inQueue.offer(Utils.toJSON(new ZkNodeProps(propMap)));
-
-        // wait until we are able to see the new shard in cluster state
-        ocmh.waitForNewShard(collectionName, subSlice);
-
-        // refresh cluster state
-        clusterState = zkStateReader.getClusterState();
-
-        log.info("Adding replica " + subShardName + " as part of slice " + subSlice + " of collection " + collectionName
-            + " on " + nodeName);
-        propMap = new HashMap<>();
-        propMap.put(Overseer.QUEUE_OPERATION, ADDREPLICA.toLower());
-        propMap.put(COLLECTION_PROP, collectionName);
-        propMap.put(SHARD_ID_PROP, subSlice);
-        propMap.put("node", nodeName);
-        propMap.put(CoreAdminParams.NAME, subShardName);
-        propMap.put(CommonAdminParams.WAIT_FOR_FINAL_STATE, Boolean.toString(waitForFinalState));
-        // copy over property params:
-        for (String key : message.keySet()) {
-          if (key.startsWith(COLL_PROP_PREFIX)) {
-            propMap.put(key, message.getStr(key));
-          }
-        }
-        // add async param
-        if (asyncId != null) {
-          propMap.put(ASYNC, asyncId);
-        }
-        ocmh.addReplica(clusterState, new ZkNodeProps(propMap), results, null);
-      }
-
-      ShardHandler shardHandler = ocmh.shardHandlerFactory.getShardHandler();
-
-      ocmh.processResponses(results, shardHandler, true, "SPLITSHARD failed to create subshard leaders", asyncId, requestMap);
-
-      for (String subShardName : subShardNames) {
-        // wait for parent leader to acknowledge the sub-shard core
-        log.info("Asking parent leader to wait for: " + subShardName + " to be alive on: " + nodeName);
-        String coreNodeName = ocmh.waitForCoreNodeName(collectionName, nodeName, subShardName);
-        CoreAdminRequest.WaitForState cmd = new CoreAdminRequest.WaitForState();
-        cmd.setCoreName(subShardName);
-        cmd.setNodeName(nodeName);
-        cmd.setCoreNodeName(coreNodeName);
-        cmd.setState(Replica.State.ACTIVE);
-        cmd.setCheckLive(true);
-        cmd.setOnlyIfLeader(true);
-
-        ModifiableSolrParams p = new ModifiableSolrParams(cmd.getParams());
-        ocmh.sendShardRequest(nodeName, p, shardHandler, asyncId, requestMap);
-      }
-
-      ocmh.processResponses(results, shardHandler, true, "SPLITSHARD timed out waiting for subshard leaders to come up",
-          asyncId, requestMap);
-
-      log.info("Successfully created all sub-shards for collection " + collectionName + " parent shard: " + slice
-          + " on: " + parentShardLeader);
-
-      log.info("Splitting shard " + parentShardLeader.getName() + " as part of slice " + slice + " of collection "
-          + collectionName + " on " + parentShardLeader);
-
-      ModifiableSolrParams params = new ModifiableSolrParams();
-      params.set(CoreAdminParams.ACTION, CoreAdminParams.CoreAdminAction.SPLIT.toString());
-      params.set(CoreAdminParams.CORE, parentShardLeader.getStr("core"));
-      for (int i = 0; i < subShardNames.size(); i++) {
-        String subShardName = subShardNames.get(i);
-        params.add(CoreAdminParams.TARGET_CORE, subShardName);
-      }
-      params.set(CoreAdminParams.RANGES, rangesStr);
-
-      ocmh.sendShardRequest(parentShardLeader.getNodeName(), params, shardHandler, asyncId, requestMap);
-
-      ocmh.processResponses(results, shardHandler, true, "SPLITSHARD failed to invoke SPLIT core admin command", asyncId,
-          requestMap);
-
-      log.info("Index on shard: " + nodeName + " split into two successfully");
-
-      // apply buffered updates on sub-shards
-      for (int i = 0; i < subShardNames.size(); i++) {
-        String subShardName = subShardNames.get(i);
-
-        log.info("Applying buffered updates on : " + subShardName);
-
-        params = new ModifiableSolrParams();
-        params.set(CoreAdminParams.ACTION, CoreAdminParams.CoreAdminAction.REQUESTAPPLYUPDATES.toString());
-        params.set(CoreAdminParams.NAME, subShardName);
-
-        ocmh.sendShardRequest(nodeName, params, shardHandler, asyncId, requestMap);
-      }
-
-      ocmh.processResponses(results, shardHandler, true, "SPLITSHARD failed while asking sub shard leaders" +
-          " to apply buffered updates", asyncId, requestMap);
-
-      log.info("Successfully applied buffered updates on : " + subShardNames);
-
-      // Replica creation for the new Slices
-
-      // look at the replication factor and see if it matches reality
-      // if it does not, find best nodes to create more cores
-
-      // TODO: Have replication factor decided in some other way instead of numShards for the parent
-
-      int repFactor = parentSlice.getReplicas().size();
-
-      // we need to look at every node and see how many cores it serves
-      // add our new cores to existing nodes serving the least number of cores
-      // but (for now) require that each core goes on a distinct node.
-
-      // TODO: add smarter options that look at the current number of cores per
-      // node?
-      // for now we just go random
-      Set<String> nodes = clusterState.getLiveNodes();
-      List<String> nodeList = new ArrayList<>(nodes.size());
-      nodeList.addAll(nodes);
-
-      // TODO: Have maxShardsPerNode param for this operation?
-
-      // Remove the node that hosts the parent shard for replica creation.
-      nodeList.remove(nodeName);
-
-      // TODO: change this to handle sharding a slice into > 2 sub-shards.
-
-      List<ReplicaPosition> replicaPositions = Assign.identifyNodes(ocmh.cloudManager,
-          clusterState,
-          new ArrayList<>(clusterState.getLiveNodes()),
-          collectionName,
-          new ZkNodeProps(collection.getProperties()),
-          subSlices, repFactor - 1, 0, 0);
-      sessionWrapper = PolicyHelper.getLastSessionWrapper(true);
-
-      List<Map<String, Object>> replicas = new ArrayList<>((repFactor - 1) * 2);
-
-      for (ReplicaPosition replicaPosition : replicaPositions) {
-        String sliceName = replicaPosition.shard;
-        String subShardNodeName = replicaPosition.node;
-        String solrCoreName = collectionName + "_" + sliceName + "_replica" + (replicaPosition.index);
-
-        log.info("Creating replica shard " + solrCoreName + " as part of slice " + sliceName + " of collection "
-            + collectionName + " on " + subShardNodeName);
-
-        ZkNodeProps props = new ZkNodeProps(Overseer.QUEUE_OPERATION, ADDREPLICA.toLower(),
-            ZkStateReader.COLLECTION_PROP, collectionName,
-            ZkStateReader.SHARD_ID_PROP, sliceName,
-            ZkStateReader.CORE_NAME_PROP, solrCoreName,
-            ZkStateReader.STATE_PROP, Replica.State.DOWN.toString(),
-            ZkStateReader.BASE_URL_PROP, zkStateReader.getBaseUrlForNodeName(subShardNodeName),
-            ZkStateReader.NODE_NAME_PROP, subShardNodeName,
-            CommonAdminParams.WAIT_FOR_FINAL_STATE, Boolean.toString(waitForFinalState));
-        Overseer.getStateUpdateQueue(zkStateReader.getZkClient()).offer(Utils.toJSON(props));
-
-        HashMap<String, Object> propMap = new HashMap<>();
-        propMap.put(Overseer.QUEUE_OPERATION, ADDREPLICA.toLower());
-        propMap.put(COLLECTION_PROP, collectionName);
-        propMap.put(SHARD_ID_PROP, sliceName);
-        propMap.put("node", subShardNodeName);
-        propMap.put(CoreAdminParams.NAME, solrCoreName);
-        // copy over property params:
-        for (String key : message.keySet()) {
-          if (key.startsWith(COLL_PROP_PREFIX)) {
-            propMap.put(key, message.getStr(key));
-          }
-        }
-        // add async param
-        if (asyncId != null) {
-          propMap.put(ASYNC, asyncId);
-        }
-        // special flag param to instruct addReplica not to create the replica in cluster state again
-        propMap.put(SKIP_CREATE_REPLICA_IN_CLUSTER_STATE, "true");
-
-        propMap.put(CommonAdminParams.WAIT_FOR_FINAL_STATE, Boolean.toString(waitForFinalState));
-
-        replicas.add(propMap);
-      }
-
-      assert TestInjection.injectSplitFailureBeforeReplicaCreation();
-
-      long ephemeralOwner = leaderZnodeStat.getEphemeralOwner();
-      // compare against the ephemeralOwner of the parent leader node
-      leaderZnodeStat = zkStateReader.getZkClient().exists(ZkStateReader.LIVE_NODES_ZKNODE + "/" + parentShardLeader.getNodeName(), null, true);
-      if (leaderZnodeStat == null || ephemeralOwner != leaderZnodeStat.getEphemeralOwner()) {
-        // put sub-shards in recovery_failed state
-        DistributedQueue inQueue = Overseer.getStateUpdateQueue(zkStateReader.getZkClient());
-        Map<String, Object> propMap = new HashMap<>();
-        propMap.put(Overseer.QUEUE_OPERATION, OverseerAction.UPDATESHARDSTATE.toLower());
-        for (String subSlice : subSlices) {
-          propMap.put(subSlice, Slice.State.RECOVERY_FAILED.toString());
-        }
-        propMap.put(ZkStateReader.COLLECTION_PROP, collectionName);
-        ZkNodeProps m = new ZkNodeProps(propMap);
-        inQueue.offer(Utils.toJSON(m));
-
-        if (leaderZnodeStat == null)  {
-          // the leader is not live anymore, fail the split!
-          throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "The shard leader node: " + parentShardLeader.getNodeName() + " is not live anymore!");
-        } else if (ephemeralOwner != leaderZnodeStat.getEphemeralOwner()) {
-          // there's a new leader, fail the split!
-          throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
-              "The zk session id for the shard leader node: " + parentShardLeader.getNodeName() + " has changed from "
-                  + ephemeralOwner + " to " + leaderZnodeStat.getEphemeralOwner() + ". This can cause data loss so we must abort the split");
-        }
-      }
-
-      // we must set the slice state into recovery before actually creating the replica cores
-      // this ensures that the logic inside Overseer to update sub-shard state to 'active'
-      // always gets a chance to execute. See SOLR-7673
-
-      if (repFactor == 1) {
-        // switch sub shard states to 'active'
-        log.info("Replication factor is 1 so switching shard states");
-        DistributedQueue inQueue = Overseer.getStateUpdateQueue(zkStateReader.getZkClient());
-        Map<String, Object> propMap = new HashMap<>();
-        propMap.put(Overseer.QUEUE_OPERATION, OverseerAction.UPDATESHARDSTATE.toLower());
-        propMap.put(slice.get(), Slice.State.INACTIVE.toString());
-        for (String subSlice : subSlices) {
-          propMap.put(subSlice, Slice.State.ACTIVE.toString());
-        }
-        propMap.put(ZkStateReader.COLLECTION_PROP, collectionName);
-        ZkNodeProps m = new ZkNodeProps(propMap);
-        inQueue.offer(Utils.toJSON(m));
-      } else {
-        log.info("Requesting shard state be set to 'recovery'");
-        DistributedQueue inQueue = Overseer.getStateUpdateQueue(zkStateReader.getZkClient());
-        Map<String, Object> propMap = new HashMap<>();
-        propMap.put(Overseer.QUEUE_OPERATION, OverseerAction.UPDATESHARDSTATE.toLower());
-        for (String subSlice : subSlices) {
-          propMap.put(subSlice, Slice.State.RECOVERY.toString());
-        }
-        propMap.put(ZkStateReader.COLLECTION_PROP, collectionName);
-        ZkNodeProps m = new ZkNodeProps(propMap);
-        inQueue.offer(Utils.toJSON(m));
-      }
-
-      // now actually create replica cores on sub shard nodes
-      for (Map<String, Object> replica : replicas) {
-        ocmh.addReplica(clusterState, new ZkNodeProps(replica), results, null);
-      }
-
-      ocmh.processResponses(results, shardHandler, true, "SPLITSHARD failed to create subshard replicas", asyncId, requestMap);
-
-      log.info("Successfully created all replica shards for all sub-slices " + subSlices);
-
-      ocmh.commit(results, slice.get(), parentShardLeader);
-
-      return true;
-    } catch (SolrException e) {
-      throw e;
-    } catch (Exception e) {
-      log.error("Error executing split operation for collection: " + collectionName + " parent shard: " + slice, e);
-      throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, null, e);
-    } finally {
-      if (sessionWrapper != null) sessionWrapper.release();
-    }
-  }
-
-  public static Slice getParentSlice(ClusterState clusterState, String collectionName, AtomicReference<String> slice, String splitKey) {
-    DocCollection collection = clusterState.getCollection(collectionName);
-    DocRouter router = collection.getRouter() != null ? collection.getRouter() : DocRouter.DEFAULT;
-
-    Slice parentSlice;
-
-    if (slice.get() == null) {
-      if (router instanceof CompositeIdRouter) {
-        Collection<Slice> searchSlices = router.getSearchSlicesSingle(splitKey, new ModifiableSolrParams(), collection);
-        if (searchSlices.isEmpty()) {
-          throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Unable to find an active shard for split.key: " + splitKey);
-        }
-        if (searchSlices.size() > 1) {
-          throw new SolrException(SolrException.ErrorCode.BAD_REQUEST,
-              "Splitting a split.key: " + splitKey + " which spans multiple shards is not supported");
-        }
-        parentSlice = searchSlices.iterator().next();
-        slice.set(parentSlice.getName());
-        log.info("Split by route.key: {}, parent shard is: {} ", splitKey, slice);
-      } else {
-        throw new SolrException(SolrException.ErrorCode.BAD_REQUEST,
-            "Split by route key can only be used with CompositeIdRouter or subclass. Found router: "
-                + router.getClass().getName());
-      }
-    } else {
-      parentSlice = collection.getSlice(slice.get());
-    }
-
-    if (parentSlice == null) {
-      // no chance of the collection being null because ClusterState#getCollection(String) would have thrown
-      // an exception already
-      throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "No shard with the specified name exists: " + slice);
-    }
-    return parentSlice;
-  }
-
-  public static String fillRanges(SolrCloudManager cloudManager, ZkNodeProps message, DocCollection collection, Slice parentSlice,
-                                List<DocRouter.Range> subRanges, List<String> subSlices, List<String> subShardNames) {
-    String splitKey = message.getStr("split.key");
-    DocRouter.Range range = parentSlice.getRange();
-    if (range == null) {
-      range = new PlainIdRouter().fullRange();
-    }
-    DocRouter router = collection.getRouter() != null ? collection.getRouter() : DocRouter.DEFAULT;
-
-    String rangesStr = message.getStr(CoreAdminParams.RANGES);
-    if (rangesStr != null) {
-      String[] ranges = rangesStr.split(",");
-      if (ranges.length == 0 || ranges.length == 1) {
-        throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "There must be at least two ranges specified to split a shard");
-      } else {
-        for (int i = 0; i < ranges.length; i++) {
-          String r = ranges[i];
-          try {
-            subRanges.add(DocRouter.DEFAULT.fromString(r));
-          } catch (Exception e) {
-            throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Exception in parsing hexadecimal hash range: " + r, e);
-          }
-          if (!subRanges.get(i).isSubsetOf(range)) {
-            throw new SolrException(SolrException.ErrorCode.BAD_REQUEST,
-                "Specified hash range: " + r + " is not a subset of parent shard's range: " + range.toString());
-          }
-        }
-        List<DocRouter.Range> temp = new ArrayList<>(subRanges); // copy to preserve original order
-        Collections.sort(temp);
-        if (!range.equals(new DocRouter.Range(temp.get(0).min, temp.get(temp.size() - 1).max))) {
-          throw new SolrException(SolrException.ErrorCode.BAD_REQUEST,
-              "Specified hash ranges: " + rangesStr + " do not cover the entire range of parent shard: " + range);
-        }
-        for (int i = 1; i < temp.size(); i++) {
-          if (temp.get(i - 1).max + 1 != temp.get(i).min) {
-            throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Specified hash ranges: " + rangesStr
-                + " either overlap with each other or " + "do not cover the entire range of parent shard: " + range);
-          }
-        }
-      }
-    } else if (splitKey != null) {
-      if (router instanceof CompositeIdRouter) {
-        CompositeIdRouter compositeIdRouter = (CompositeIdRouter) router;
-        List<DocRouter.Range> tmpSubRanges = compositeIdRouter.partitionRangeByKey(splitKey, range);
-        if (tmpSubRanges.size() == 1) {
-          throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "The split.key: " + splitKey
-              + " has a hash range that is exactly equal to hash range of shard: " + parentSlice.getName());
-        }
-        for (DocRouter.Range subRange : tmpSubRanges) {
-          if (subRange.min == subRange.max) {
-            throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "The split.key: " + splitKey + " must be a compositeId");
-          }
-        }
-        subRanges.addAll(tmpSubRanges);
-        log.info("Partitioning parent shard " + parentSlice.getName() + " range: " + parentSlice.getRange() + " yields: " + subRanges);
-        rangesStr = "";
-        for (int i = 0; i < subRanges.size(); i++) {
-          DocRouter.Range subRange = subRanges.get(i);
-          rangesStr += subRange.toString();
-          if (i < subRanges.size() - 1) rangesStr += ',';
-        }
-      }
-    } else {
-      // todo: fixed to two partitions?
-      subRanges.addAll(router.partitionRange(2, range));
-    }
-
-    for (int i = 0; i < subRanges.size(); i++) {
-      String subSlice = parentSlice.getName() + "_" + i;
-      subSlices.add(subSlice);
-      String subShardName = Assign.buildSolrCoreName(cloudManager.getDistribStateManager(), collection, subSlice, Replica.Type.NRT);
-      subShardNames.add(subShardName);
-    }
-    return rangesStr;
-  }
-}

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/a3c4f738/solr/core/src/java/org/apache/solr/cloud/UtilizeNodeCmd.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/UtilizeNodeCmd.java b/solr/core/src/java/org/apache/solr/cloud/UtilizeNodeCmd.java
deleted file mode 100644
index 6a55cfd..0000000
--- a/solr/core/src/java/org/apache/solr/cloud/UtilizeNodeCmd.java
+++ /dev/null
@@ -1,120 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.solr.cloud;
-
-import java.lang.invoke.MethodHandles;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Objects;
-
-import org.apache.solr.client.solrj.SolrRequest;
-import org.apache.solr.client.solrj.cloud.autoscaling.AutoScalingConfig;
-import org.apache.solr.client.solrj.cloud.autoscaling.Policy;
-import org.apache.solr.client.solrj.cloud.autoscaling.PolicyHelper;
-import org.apache.solr.client.solrj.cloud.autoscaling.Suggester;
-import org.apache.solr.client.solrj.request.V2Request;
-import org.apache.solr.common.cloud.ClusterState;
-import org.apache.solr.common.cloud.ZkNodeProps;
-import org.apache.solr.common.params.CollectionParams;
-import org.apache.solr.common.util.NamedList;
-import org.apache.solr.common.util.StrUtils;
-import org.apache.solr.common.util.Utils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import static org.apache.solr.common.cloud.ZkStateReader.COLLECTION_PROP;
-import static org.apache.solr.common.cloud.ZkStateReader.REPLICA_PROP;
-import static org.apache.solr.common.params.AutoScalingParams.NODE;
-import static org.apache.solr.common.params.CollectionParams.CollectionAction.MOVEREPLICA;
-import static org.apache.solr.common.params.CommonAdminParams.ASYNC;
-
-public class UtilizeNodeCmd implements OverseerCollectionMessageHandler.Cmd {
-  private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
-  private final OverseerCollectionMessageHandler ocmh;
-
-  public UtilizeNodeCmd(OverseerCollectionMessageHandler ocmh) {
-    this.ocmh = ocmh;
-  }
-
-  @Override
-  public void call(ClusterState state, ZkNodeProps message, NamedList results) throws Exception {
-    ocmh.checkRequired(message, NODE);
-    String nodeName = message.getStr(NODE);
-    String async = message.getStr(ASYNC);
-    AutoScalingConfig autoScalingConfig = ocmh.overseer.getSolrCloudManager().getDistribStateManager().getAutoScalingConfig();
-
-    //first look for any violation that may use this replica
-    List<ZkNodeProps> requests = new ArrayList<>();
-    //first look for suggestions if any
-    List<Suggester.SuggestionInfo> suggestions = PolicyHelper.getSuggestions(autoScalingConfig, ocmh.overseer.getSolrCloudManager());
-    for (Suggester.SuggestionInfo suggestionInfo : suggestions) {
-      log.info("op: " + suggestionInfo.getOperation());
-      String coll = null;
-      List<String> pieces = StrUtils.splitSmart(suggestionInfo.getOperation().getPath(), '/');
-      if (pieces.size() > 1) {
-        coll = pieces.get(2);
-      } else {
-        continue;
-      }
-      log.info("coll: " + coll);
-      if (suggestionInfo.getOperation() instanceof V2Request) {
-        String targetNode = (String) Utils.getObjectByPath(suggestionInfo.getOperation(), true, "command/move-replica/targetNode");
-        if (Objects.equals(targetNode, nodeName)) {
-          String replica = (String) Utils.getObjectByPath(suggestionInfo.getOperation(), true, "command/move-replica/replica");
-          requests.add(new ZkNodeProps(COLLECTION_PROP, coll,
-              CollectionParams.TARGET_NODE, targetNode,
-              ASYNC, async,
-              REPLICA_PROP, replica));
-        }
-      }
-    }
-    executeAll(requests);
-    PolicyHelper.SessionWrapper sessionWrapper = PolicyHelper.getSession(ocmh.overseer.getSolrCloudManager());
-    Policy.Session session =  sessionWrapper.get();
-    for (; ; ) {
-      Suggester suggester = session.getSuggester(MOVEREPLICA)
-          .hint(Suggester.Hint.TARGET_NODE, nodeName);
-      session = suggester.getSession();
-      SolrRequest request = suggester.getSuggestion();
-      if (request == null) break;
-      requests.add(new ZkNodeProps(COLLECTION_PROP, request.getParams().get(COLLECTION_PROP),
-          CollectionParams.TARGET_NODE, request.getParams().get(CollectionParams.TARGET_NODE),
-          REPLICA_PROP, request.getParams().get(REPLICA_PROP),
-          ASYNC, request.getParams().get(ASYNC)));
-    }
-    sessionWrapper.returnSession(session);
-    try {
-      executeAll(requests);
-    } finally {
-      sessionWrapper.release();
-    }
-  }
-
-  private void executeAll(List<ZkNodeProps> requests) throws Exception {
-    if (requests.isEmpty()) return;
-    for (ZkNodeProps props : requests) {
-      NamedList result = new NamedList();
-      ocmh.commandMap.get(MOVEREPLICA)
-          .call(ocmh.overseer.getSolrCloudManager().getClusterStateProvider().getClusterState(),
-              props,
-              result);
-    }
-    requests.clear();
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/a3c4f738/solr/core/src/java/org/apache/solr/cloud/api/collections/AddReplicaCmd.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/AddReplicaCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/AddReplicaCmd.java
new file mode 100644
index 0000000..6b4e427
--- /dev/null
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/AddReplicaCmd.java
@@ -0,0 +1,282 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.cloud.api.collections;
+
+
+import java.io.IOException;
+import java.lang.invoke.MethodHandles;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Locale;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.solr.client.solrj.cloud.autoscaling.Policy;
+import org.apache.solr.client.solrj.cloud.autoscaling.PolicyHelper;
+import org.apache.solr.client.solrj.cloud.autoscaling.SolrCloudManager;
+import org.apache.solr.cloud.ActiveReplicaWatcher;
+import org.apache.solr.cloud.CloudUtil;
+import org.apache.solr.cloud.Overseer;
+import org.apache.solr.common.SolrCloseableLatch;
+import org.apache.solr.common.SolrException;
+import org.apache.solr.common.cloud.ClusterState;
+import org.apache.solr.common.cloud.DocCollection;
+import org.apache.solr.common.cloud.Replica;
+import org.apache.solr.common.cloud.Slice;
+import org.apache.solr.common.cloud.ZkNodeProps;
+import org.apache.solr.common.cloud.ZkStateReader;
+import org.apache.solr.common.params.CoreAdminParams;
+import org.apache.solr.common.params.ModifiableSolrParams;
+import org.apache.solr.common.params.ShardParams;
+import org.apache.solr.common.util.NamedList;
+import org.apache.solr.common.util.Utils;
+import org.apache.solr.handler.component.ShardHandler;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.solr.cloud.api.collections.OverseerCollectionMessageHandler.COLL_CONF;
+import static org.apache.solr.cloud.api.collections.OverseerCollectionMessageHandler.SKIP_CREATE_REPLICA_IN_CLUSTER_STATE;
+import static org.apache.solr.common.cloud.ZkStateReader.COLLECTION_PROP;
+import static org.apache.solr.common.cloud.ZkStateReader.CORE_NAME_PROP;
+import static org.apache.solr.common.cloud.ZkStateReader.SHARD_ID_PROP;
+import static org.apache.solr.common.params.CollectionParams.CollectionAction.ADDREPLICA;
+import static org.apache.solr.common.params.CommonAdminParams.ASYNC;
+import static org.apache.solr.common.params.CommonAdminParams.TIMEOUT;
+import static org.apache.solr.common.params.CommonAdminParams.WAIT_FOR_FINAL_STATE;
+
+public class AddReplicaCmd implements OverseerCollectionMessageHandler.Cmd {
+  private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+  private final OverseerCollectionMessageHandler ocmh;
+
+  public AddReplicaCmd(OverseerCollectionMessageHandler ocmh) {
+    this.ocmh = ocmh;
+  }
+
+  @Override
+  public void call(ClusterState state, ZkNodeProps message, NamedList results) throws Exception {
+    addReplica(state, message, results, null);
+  }
+
+  ZkNodeProps addReplica(ClusterState clusterState, ZkNodeProps message, NamedList results, Runnable onComplete)
+      throws IOException, InterruptedException {
+    log.debug("addReplica() : {}", Utils.toJSONString(message));
+    boolean waitForFinalState = message.getBool(WAIT_FOR_FINAL_STATE, false);
+    boolean skipCreateReplicaInClusterState = message.getBool(SKIP_CREATE_REPLICA_IN_CLUSTER_STATE, false);
+    final String asyncId = message.getStr(ASYNC);
+
+    AtomicReference<PolicyHelper.SessionWrapper> sessionWrapper = new AtomicReference<>();
+    message = assignReplicaDetails(ocmh.cloudManager, clusterState, message, sessionWrapper);
+
+    String collection = message.getStr(COLLECTION_PROP);
+    DocCollection coll = clusterState.getCollection(collection);
+
+    String node = message.getStr(CoreAdminParams.NODE);
+    String shard = message.getStr(SHARD_ID_PROP);
+    String coreName = message.getStr(CoreAdminParams.NAME);
+    String coreNodeName = message.getStr(CoreAdminParams.CORE_NODE_NAME);
+    int timeout = message.getInt(TIMEOUT, 10 * 60); // 10 minutes
+    Replica.Type replicaType = Replica.Type.valueOf(message.getStr(ZkStateReader.REPLICA_TYPE, Replica.Type.NRT.name()).toUpperCase(Locale.ROOT));
+    boolean parallel = message.getBool("parallel", false);
+
+    ModifiableSolrParams params = new ModifiableSolrParams();
+
+    ZkStateReader zkStateReader = ocmh.zkStateReader;
+    if (!Overseer.isLegacy(zkStateReader)) {
+      if (!skipCreateReplicaInClusterState) {
+        ZkNodeProps props = new ZkNodeProps(
+            Overseer.QUEUE_OPERATION, ADDREPLICA.toLower(),
+            ZkStateReader.COLLECTION_PROP, collection,
+            ZkStateReader.SHARD_ID_PROP, shard,
+            ZkStateReader.CORE_NAME_PROP, coreName,
+            ZkStateReader.STATE_PROP, Replica.State.DOWN.toString(),
+            ZkStateReader.BASE_URL_PROP, zkStateReader.getBaseUrlForNodeName(node),
+            ZkStateReader.NODE_NAME_PROP, node,
+            ZkStateReader.REPLICA_TYPE, replicaType.name());
+        if (coreNodeName != null) {
+          props = props.plus(ZkStateReader.CORE_NODE_NAME_PROP, coreNodeName);
+        }
+        try {
+          Overseer.getStateUpdateQueue(zkStateReader.getZkClient()).offer(Utils.toJSON(props));
+        } catch (Exception e) {
+          throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Exception updating Overseer state queue", e);
+        }
+      }
+      params.set(CoreAdminParams.CORE_NODE_NAME,
+          ocmh.waitToSeeReplicasInState(collection, Collections.singletonList(coreName)).get(coreName).getName());
+    }
+
+    String configName = zkStateReader.readConfigName(collection);
+    String routeKey = message.getStr(ShardParams._ROUTE_);
+    String dataDir = message.getStr(CoreAdminParams.DATA_DIR);
+    String ulogDir = message.getStr(CoreAdminParams.ULOG_DIR);
+    String instanceDir = message.getStr(CoreAdminParams.INSTANCE_DIR);
+
+    params.set(CoreAdminParams.ACTION, CoreAdminParams.CoreAdminAction.CREATE.toString());
+    params.set(CoreAdminParams.NAME, coreName);
+    params.set(COLL_CONF, configName);
+    params.set(CoreAdminParams.COLLECTION, collection);
+    params.set(CoreAdminParams.REPLICA_TYPE, replicaType.name());
+    if (shard != null) {
+      params.set(CoreAdminParams.SHARD, shard);
+    } else if (routeKey != null) {
+      Collection<Slice> slices = coll.getRouter().getSearchSlicesSingle(routeKey, null, coll);
+      if (slices.isEmpty()) {
+        throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "No active shard serving _route_=" + routeKey + " found");
+      } else {
+        params.set(CoreAdminParams.SHARD, slices.iterator().next().getName());
+      }
+    } else {
+      throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Specify either 'shard' or _route_ param");
+    }
+    if (dataDir != null) {
+      params.set(CoreAdminParams.DATA_DIR, dataDir);
+    }
+    if (ulogDir != null) {
+      params.set(CoreAdminParams.ULOG_DIR, ulogDir);
+    }
+    if (instanceDir != null) {
+      params.set(CoreAdminParams.INSTANCE_DIR, instanceDir);
+    }
+    if (coreNodeName != null) {
+      params.set(CoreAdminParams.CORE_NODE_NAME, coreNodeName);
+    }
+    ocmh.addPropertyParams(message, params);
+
+    // For tracking async calls.
+    Map<String,String> requestMap = new HashMap<>();
+    ShardHandler shardHandler = ocmh.shardHandlerFactory.getShardHandler();
+
+    ocmh.sendShardRequest(node, params, shardHandler, asyncId, requestMap);
+
+    final String fnode = node;
+    final String fcoreName = coreName;
+
+    Runnable runnable = () -> {
+      ocmh.processResponses(results, shardHandler, true, "ADDREPLICA failed to create replica", asyncId, requestMap);
+      ocmh.waitForCoreNodeName(collection, fnode, fcoreName);
+      if (sessionWrapper.get() != null) {
+        sessionWrapper.get().release();
+      }
+      if (onComplete != null) onComplete.run();
+    };
+
+    if (!parallel || waitForFinalState) {
+      if (waitForFinalState) {
+        SolrCloseableLatch latch = new SolrCloseableLatch(1, ocmh);
+        ActiveReplicaWatcher watcher = new ActiveReplicaWatcher(collection, null, Collections.singletonList(coreName), latch);
+        try {
+          zkStateReader.registerCollectionStateWatcher(collection, watcher);
+          runnable.run();
+          if (!latch.await(timeout, TimeUnit.SECONDS)) {
+            throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Timeout waiting " + timeout + " seconds for replica to become active.");
+          }
+        } finally {
+          zkStateReader.removeCollectionStateWatcher(collection, watcher);
+        }
+      } else {
+        runnable.run();
+      }
+    } else {
+      ocmh.tpe.submit(runnable);
+    }
+
+
+    return new ZkNodeProps(
+        ZkStateReader.COLLECTION_PROP, collection,
+        ZkStateReader.SHARD_ID_PROP, shard,
+        ZkStateReader.CORE_NAME_PROP, coreName,
+        ZkStateReader.NODE_NAME_PROP, node
+    );
+  }
+
+  public static ZkNodeProps assignReplicaDetails(SolrCloudManager cloudManager, ClusterState clusterState,
+                                                 ZkNodeProps message, AtomicReference<PolicyHelper.SessionWrapper> sessionWrapper) throws IOException, InterruptedException {
+    boolean skipCreateReplicaInClusterState = message.getBool(SKIP_CREATE_REPLICA_IN_CLUSTER_STATE, false);
+
+    String collection = message.getStr(COLLECTION_PROP);
+    String node = message.getStr(CoreAdminParams.NODE);
+    String shard = message.getStr(SHARD_ID_PROP);
+    String coreName = message.getStr(CoreAdminParams.NAME);
+    String coreNodeName = message.getStr(CoreAdminParams.CORE_NODE_NAME);
+    Replica.Type replicaType = Replica.Type.valueOf(message.getStr(ZkStateReader.REPLICA_TYPE, Replica.Type.NRT.name()).toUpperCase(Locale.ROOT));
+    if (StringUtils.isBlank(coreName)) {
+      coreName = message.getStr(CoreAdminParams.PROPERTY_PREFIX + CoreAdminParams.NAME);
+    }
+
+    DocCollection coll = clusterState.getCollection(collection);
+    if (coll == null) {
+      throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Collection: " + collection + " does not exist");
+    }
+    if (coll.getSlice(shard) == null) {
+      throw new SolrException(SolrException.ErrorCode.BAD_REQUEST,
+          "Collection: " + collection + " shard: " + shard + " does not exist");
+    }
+
+    // Kind of unnecessary, but it does put the logic of whether to override maxShardsPerNode in one place.
+    if (!skipCreateReplicaInClusterState) {
+      if (CloudUtil.usePolicyFramework(coll, cloudManager)) {
+        if (node == null) {
+          if(coll.getPolicyName() != null) message.getProperties().put(Policy.POLICY, coll.getPolicyName());
+          node = Assign.identifyNodes(cloudManager,
+              clusterState,
+              Collections.emptyList(),
+              collection,
+              message,
+              Collections.singletonList(shard),
+              replicaType == Replica.Type.NRT ? 0 : 1,
+              replicaType == Replica.Type.TLOG ? 0 : 1,
+              replicaType == Replica.Type.PULL ? 0 : 1
+          ).get(0).node;
+          sessionWrapper.set(PolicyHelper.getLastSessionWrapper(true));
+        }
+      } else {
+        node = Assign.getNodesForNewReplicas(clusterState, collection, shard, 1, node,
+            cloudManager).get(0).nodeName;// TODO: use replica type in this logic too
+      }
+    }
+    log.info("Node Identified {} for creating new replica", node);
+
+    if (!clusterState.liveNodesContain(node)) {
+      throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Node: " + node + " is not live");
+    }
+    if (coreName == null) {
+      coreName = Assign.buildSolrCoreName(cloudManager.getDistribStateManager(), coll, shard, replicaType);
+    } else if (!skipCreateReplicaInClusterState) {
+      //Validate that the core name is unique in that collection
+      for (Slice slice : coll.getSlices()) {
+        for (Replica replica : slice.getReplicas()) {
+          String replicaCoreName = replica.getStr(CORE_NAME_PROP);
+          if (coreName.equals(replicaCoreName)) {
+            throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Another replica with the same core name already exists" +
+                " for this collection");
+          }
+        }
+      }
+    }
+    if (coreNodeName != null) {
+      message = message.plus(CoreAdminParams.CORE_NODE_NAME, coreNodeName);
+    }
+    message = message.plus(CoreAdminParams.NAME, coreName);
+    message = message.plus(CoreAdminParams.NODE, node);
+    return message;
+  }
+}

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/a3c4f738/solr/core/src/java/org/apache/solr/cloud/api/collections/Assign.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/Assign.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/Assign.java
new file mode 100644
index 0000000..e7ce583
--- /dev/null
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/Assign.java
@@ -0,0 +1,483 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.cloud.api.collections;
+
+import java.io.IOException;
+import java.lang.invoke.MethodHandles;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import com.google.common.collect.ImmutableMap;
+import org.apache.solr.client.solrj.cloud.autoscaling.AlreadyExistsException;
+import org.apache.solr.client.solrj.cloud.autoscaling.AutoScalingConfig;
+import org.apache.solr.client.solrj.cloud.autoscaling.BadVersionException;
+import org.apache.solr.client.solrj.cloud.autoscaling.DistribStateManager;
+import org.apache.solr.client.solrj.cloud.autoscaling.PolicyHelper;
+import org.apache.solr.client.solrj.cloud.autoscaling.SolrCloudManager;
+import org.apache.solr.client.solrj.cloud.autoscaling.VersionedData;
+import org.apache.solr.cloud.rule.ReplicaAssigner;
+import org.apache.solr.cloud.rule.Rule;
+import org.apache.solr.common.SolrException;
+import org.apache.solr.common.cloud.ClusterState;
+import org.apache.solr.common.cloud.DocCollection;
+import org.apache.solr.common.cloud.Replica;
+import org.apache.solr.common.cloud.ReplicaPosition;
+import org.apache.solr.common.cloud.Slice;
+import org.apache.solr.common.cloud.ZkNodeProps;
+import org.apache.solr.common.util.StrUtils;
+import org.apache.solr.common.util.Utils;
+import org.apache.solr.util.NumberUtils;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.solr.client.solrj.cloud.autoscaling.Policy.POLICY;
+import static org.apache.solr.cloud.api.collections.OverseerCollectionMessageHandler.CREATE_NODE_SET;
+import static org.apache.solr.common.cloud.DocCollection.SNITCH;
+import static org.apache.solr.common.cloud.ZkStateReader.CORE_NAME_PROP;
+
+public class Assign {
+  private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+  public static int incAndGetId(DistribStateManager stateManager, String collection, int defaultValue) {
+    String path = "/collections/"+collection;
+    try {
+      if (!stateManager.hasData(path)) {
+        try {
+          stateManager.makePath(path);
+        } catch (AlreadyExistsException e) {
+          // it's okay if another beats us creating the node
+        }
+      }
+      path += "/counter";
+      if (!stateManager.hasData(path)) {
+        try {
+          stateManager.createData(path, NumberUtils.intToBytes(defaultValue), CreateMode.PERSISTENT);
+        } catch (AlreadyExistsException e) {
+          // it's okay if another beats us creating the node
+        }
+      }
+    } catch (InterruptedException e) {
+      Thread.interrupted();
+      throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Error creating counter node in Zookeeper for collection:" + collection, e);
+    } catch (IOException | KeeperException e) {
+      throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Error creating counter node in Zookeeper for collection:" + collection, e);
+    }
+
+    while (true) {
+      try {
+        int version = 0;
+        int currentId = 0;
+        VersionedData data = stateManager.getData(path, null);
+        if (data != null) {
+          currentId = NumberUtils.bytesToInt(data.getData());
+          version = data.getVersion();
+        }
+        byte[] bytes = NumberUtils.intToBytes(++currentId);
+        stateManager.setData(path, bytes, version);
+        return currentId;
+      } catch (BadVersionException e) {
+        continue;
+      } catch (IOException | KeeperException e) {
+        throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Error inc and get counter from Zookeeper for collection:"+collection, e);
+      } catch (InterruptedException e) {
+        Thread.interrupted();
+        throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Error inc and get counter from Zookeeper for collection:" + collection, e);
+      }
+    }
+  }
+
+  public static String assignCoreNodeName(DistribStateManager stateManager, DocCollection collection) {
+    // for backward compatibility;
+    int defaultValue = defaultCounterValue(collection, false);
+    String coreNodeName = "core_node" + incAndGetId(stateManager, collection.getName(), defaultValue);
+    while (collection.getReplica(coreNodeName) != null) {
+      // there is wee chance that, the new coreNodeName id not totally unique,
+      // but this will be guaranteed unique for new collections
+      coreNodeName = "core_node" + incAndGetId(stateManager, collection.getName(), defaultValue);
+    }
+    return coreNodeName;
+  }
+
+  /**
+   * Assign a new unique id up to slices count - then add replicas evenly.
+   *
+   * @return the assigned shard id
+   */
+  public static String assignShard(DocCollection collection, Integer numShards) {
+    if (numShards == null) {
+      numShards = 1;
+    }
+    String returnShardId = null;
+    Map<String, Slice> sliceMap = collection != null ? collection.getActiveSlicesMap() : null;
+
+
+    // TODO: now that we create shards ahead of time, is this code needed?  Esp since hash ranges aren't assigned when creating via this method?
+
+    if (sliceMap == null) {
+      return "shard1";
+    }
+
+    List<String> shardIdNames = new ArrayList<>(sliceMap.keySet());
+
+    if (shardIdNames.size() < numShards) {
+      return "shard" + (shardIdNames.size() + 1);
+    }
+
+    // TODO: don't need to sort to find shard with fewest replicas!
+
+    // else figure out which shard needs more replicas
+    final Map<String, Integer> map = new HashMap<>();
+    for (String shardId : shardIdNames) {
+      int cnt = sliceMap.get(shardId).getReplicasMap().size();
+      map.put(shardId, cnt);
+    }
+
+    Collections.sort(shardIdNames, (String o1, String o2) -> {
+      Integer one = map.get(o1);
+      Integer two = map.get(o2);
+      return one.compareTo(two);
+    });
+
+    returnShardId = shardIdNames.get(0);
+    return returnShardId;
+  }
+
+  private static String buildSolrCoreName(String collectionName, String shard, Replica.Type type, int replicaNum) {
+    // TODO: Adding the suffix is great for debugging, but may be an issue if at some point we want to support a way to change replica type
+    return String.format(Locale.ROOT, "%s_%s_replica_%s%s", collectionName, shard, type.name().substring(0,1).toLowerCase(Locale.ROOT), replicaNum);
+  }
+
+  private static int defaultCounterValue(DocCollection collection, boolean newCollection) {
+    if (newCollection) return 0;
+    int defaultValue = collection.getReplicas().size();
+    if (collection.getReplicationFactor() != null) {
+      // numReplicas and replicationFactor * numSlices can be not equals,
+      // in case of many addReplicas or deleteReplicas are executed
+      defaultValue = Math.max(defaultValue,
+          collection.getReplicationFactor() * collection.getSlices().size());
+    }
+    return defaultValue * 20;
+  }
+
+  public static String buildSolrCoreName(DistribStateManager stateManager, DocCollection collection, String shard, Replica.Type type, boolean newCollection) {
+    Slice slice = collection.getSlice(shard);
+    int defaultValue = defaultCounterValue(collection, newCollection);
+    int replicaNum = incAndGetId(stateManager, collection.getName(), defaultValue);
+    String coreName = buildSolrCoreName(collection.getName(), shard, type, replicaNum);
+    while (existCoreName(coreName, slice)) {
+      replicaNum = incAndGetId(stateManager, collection.getName(), defaultValue);
+      coreName = buildSolrCoreName(collection.getName(), shard, type, replicaNum);
+    }
+    return coreName;
+  }
+
+  public static String buildSolrCoreName(DistribStateManager stateManager, DocCollection collection, String shard, Replica.Type type) {
+    return buildSolrCoreName(stateManager, collection, shard, type, false);
+  }
+
+  private static boolean existCoreName(String coreName, Slice slice) {
+    if (slice == null) return false;
+    for (Replica replica : slice.getReplicas()) {
+      if (coreName.equals(replica.getStr(CORE_NAME_PROP))) {
+        return true;
+      }
+    }
+    return false;
+  }
+
+  public static List<String> getLiveOrLiveAndCreateNodeSetList(final Set<String> liveNodes, final ZkNodeProps message, final Random random) {
+    // TODO: add smarter options that look at the current number of cores per
+    // node?
+    // for now we just go random (except when createNodeSet and createNodeSet.shuffle=false are passed in)
+
+    List<String> nodeList;
+
+    final String createNodeSetStr = message.getStr(CREATE_NODE_SET);
+    final List<String> createNodeList = (createNodeSetStr == null) ? null :
+        StrUtils.splitSmart((OverseerCollectionMessageHandler.CREATE_NODE_SET_EMPTY.equals(createNodeSetStr) ?
+            "" : createNodeSetStr), ",", true);
+
+    if (createNodeList != null) {
+      nodeList = new ArrayList<>(createNodeList);
+      nodeList.retainAll(liveNodes);
+      if (message.getBool(OverseerCollectionMessageHandler.CREATE_NODE_SET_SHUFFLE,
+          OverseerCollectionMessageHandler.CREATE_NODE_SET_SHUFFLE_DEFAULT)) {
+        Collections.shuffle(nodeList, random);
+      }
+    } else {
+      nodeList = new ArrayList<>(liveNodes);
+      Collections.shuffle(nodeList, random);
+    }
+
+    return nodeList;
+  }
+
+  public static List<ReplicaPosition> identifyNodes(SolrCloudManager cloudManager,
+                                                    ClusterState clusterState,
+                                                    List<String> nodeList,
+                                                    String collectionName,
+                                                    ZkNodeProps message,
+                                                    List<String> shardNames,
+                                                    int numNrtReplicas,
+                                                    int numTlogReplicas,
+                                                    int numPullReplicas) throws IOException, InterruptedException {
+    List<Map> rulesMap = (List) message.get("rule");
+    String policyName = message.getStr(POLICY);
+    AutoScalingConfig autoScalingConfig = cloudManager.getDistribStateManager().getAutoScalingConfig();
+
+    if (rulesMap == null && policyName == null && autoScalingConfig.getPolicy().getClusterPolicy().isEmpty()) {
+      log.debug("Identify nodes using default");
+      int i = 0;
+      List<ReplicaPosition> result = new ArrayList<>();
+      for (String aShard : shardNames)
+        for (Map.Entry<Replica.Type, Integer> e : ImmutableMap.of(Replica.Type.NRT, numNrtReplicas,
+            Replica.Type.TLOG, numTlogReplicas,
+            Replica.Type.PULL, numPullReplicas
+        ).entrySet()) {
+          for (int j = 0; j < e.getValue(); j++){
+            result.add(new ReplicaPosition(aShard, j, e.getKey(), nodeList.get(i % nodeList.size())));
+            i++;
+          }
+        }
+      return result;
+    } else {
+      if (numTlogReplicas + numPullReplicas != 0 && rulesMap != null) {
+        throw new SolrException(SolrException.ErrorCode.BAD_REQUEST,
+            Replica.Type.TLOG + " or " + Replica.Type.PULL + " replica types not supported with placement rules or cluster policies");
+      }
+    }
+
+    if (rulesMap != null && !rulesMap.isEmpty()) {
+      List<Rule> rules = new ArrayList<>();
+      for (Object map : rulesMap) rules.add(new Rule((Map) map));
+      Map<String, Integer> sharVsReplicaCount = new HashMap<>();
+
+      for (String shard : shardNames) sharVsReplicaCount.put(shard, numNrtReplicas);
+      ReplicaAssigner replicaAssigner = new ReplicaAssigner(rules,
+          sharVsReplicaCount,
+          (List<Map>) message.get(SNITCH),
+          new HashMap<>(),//this is a new collection. So, there are no nodes in any shard
+          nodeList,
+          cloudManager,
+          clusterState);
+
+      Map<ReplicaPosition, String> nodeMappings = replicaAssigner.getNodeMappings();
+      return nodeMappings.entrySet().stream()
+          .map(e -> new ReplicaPosition(e.getKey().shard, e.getKey().index, e.getKey().type, e.getValue()))
+          .collect(Collectors.toList());
+    } else  {
+      if (message.getStr(CREATE_NODE_SET) == null)
+        nodeList = Collections.emptyList();// unless explicitly specified do not pass node list to Policy
+      return getPositionsUsingPolicy(collectionName,
+          shardNames, numNrtReplicas, numTlogReplicas, numPullReplicas, policyName, cloudManager, nodeList);
+    }
+  }
+
+  static class ReplicaCount {
+    public final String nodeName;
+    public int thisCollectionNodes = 0;
+    public int totalNodes = 0;
+
+    ReplicaCount(String nodeName) {
+      this.nodeName = nodeName;
+    }
+
+    public int weight() {
+      return (thisCollectionNodes * 100) + totalNodes;
+    }
+  }
+
+  // Only called from createShard and addReplica (so far).
+  //
+  // Gets a list of candidate nodes to put the required replica(s) on. Throws errors if not enough replicas
+  // could be created on live nodes given maxShardsPerNode, Replication factor (if from createShard) etc.
+  public static List<ReplicaCount> getNodesForNewReplicas(ClusterState clusterState, String collectionName,
+                                                          String shard, int nrtReplicas,
+                                                          Object createNodeSet, SolrCloudManager cloudManager) throws IOException, InterruptedException {
+    log.debug("getNodesForNewReplicas() shard: {} , replicas : {} , createNodeSet {}", shard, nrtReplicas, createNodeSet );
+    DocCollection coll = clusterState.getCollection(collectionName);
+    Integer maxShardsPerNode = coll.getMaxShardsPerNode();
+    List<String> createNodeList = null;
+
+    if (createNodeSet instanceof List) {
+      createNodeList = (List) createNodeSet;
+    } else {
+      createNodeList = createNodeSet == null ? null : StrUtils.splitSmart((String) createNodeSet, ",", true);
+    }
+
+     HashMap<String, ReplicaCount> nodeNameVsShardCount = getNodeNameVsShardCount(collectionName, clusterState, createNodeList);
+
+    if (createNodeList == null) { // We only care if we haven't been told to put new replicas on specific nodes.
+      int availableSlots = 0;
+      for (Map.Entry<String, ReplicaCount> ent : nodeNameVsShardCount.entrySet()) {
+        //ADDREPLICA can put more than maxShardsPerNode on an instance, so this test is necessary.
+        if (maxShardsPerNode > ent.getValue().thisCollectionNodes) {
+          availableSlots += (maxShardsPerNode - ent.getValue().thisCollectionNodes);
+        }
+      }
+      if (availableSlots < nrtReplicas) {
+        throw new SolrException(SolrException.ErrorCode.BAD_REQUEST,
+            String.format(Locale.ROOT, "Cannot create %d new replicas for collection %s given the current number of live nodes and a maxShardsPerNode of %d",
+                nrtReplicas, collectionName, maxShardsPerNode));
+      }
+    }
+
+    List l = (List) coll.get(DocCollection.RULE);
+    List<ReplicaPosition> replicaPositions = null;
+    if (l != null) {
+      // TODO: make it so that this method doesn't require access to CC
+      replicaPositions = getNodesViaRules(clusterState, shard, nrtReplicas, cloudManager, coll, createNodeList, l);
+    }
+    String policyName = coll.getStr(POLICY);
+    AutoScalingConfig autoScalingConfig = cloudManager.getDistribStateManager().getAutoScalingConfig();
+    if (policyName != null || !autoScalingConfig.getPolicy().getClusterPolicy().isEmpty()) {
+      replicaPositions = Assign.getPositionsUsingPolicy(collectionName, Collections.singletonList(shard), nrtReplicas, 0, 0,
+          policyName, cloudManager, createNodeList);
+    }
+
+    if(replicaPositions != null){
+      List<ReplicaCount> repCounts = new ArrayList<>();
+      for (ReplicaPosition p : replicaPositions) {
+        repCounts.add(new ReplicaCount(p.node));
+      }
+      return repCounts;
+    }
+
+    ArrayList<ReplicaCount> sortedNodeList = new ArrayList<>(nodeNameVsShardCount.values());
+    Collections.sort(sortedNodeList, (x, y) -> (x.weight() < y.weight()) ? -1 : ((x.weight() == y.weight()) ? 0 : 1));
+    return sortedNodeList;
+
+  }
+
+  public static List<ReplicaPosition> getPositionsUsingPolicy(String collName, List<String> shardNames,
+                                                              int nrtReplicas,
+                                                              int tlogReplicas,
+                                                              int pullReplicas,
+                                                              String policyName, SolrCloudManager cloudManager,
+                                                              List<String> nodesList) throws IOException, InterruptedException {
+    log.debug("shardnames {} NRT {} TLOG {} PULL {} , policy {}, nodeList {}", shardNames, nrtReplicas, tlogReplicas, pullReplicas, policyName, nodesList);
+    List<ReplicaPosition> replicaPositions = null;
+    AutoScalingConfig autoScalingConfig = cloudManager.getDistribStateManager().getAutoScalingConfig();
+    try {
+      Map<String, String> kvMap = Collections.singletonMap(collName, policyName);
+      replicaPositions = PolicyHelper.getReplicaLocations(
+          collName,
+          autoScalingConfig,
+          cloudManager,
+          kvMap,
+          shardNames,
+          nrtReplicas,
+          tlogReplicas,
+          pullReplicas,
+          nodesList);
+      return replicaPositions;
+    } catch (Exception e) {
+      throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Error getting replica locations", e);
+    } finally {
+      if (log.isTraceEnabled()) {
+        if (replicaPositions != null)
+          log.trace("REPLICA_POSITIONS: " + Utils.toJSONString(Utils.getDeepCopy(replicaPositions, 7, true)));
+        log.trace("AUTOSCALING_CONF: " + Utils.toJSONString(autoScalingConfig));
+      }
+    }
+  }
+
+  private static List<ReplicaPosition> getNodesViaRules(ClusterState clusterState, String shard, int numberOfNodes,
+                                                        SolrCloudManager cloudManager, DocCollection coll, List<String> createNodeList, List l) {
+    ArrayList<Rule> rules = new ArrayList<>();
+    for (Object o : l) rules.add(new Rule((Map) o));
+    Map<String, Map<String, Integer>> shardVsNodes = new LinkedHashMap<>();
+    for (Slice slice : coll.getSlices()) {
+      LinkedHashMap<String, Integer> n = new LinkedHashMap<>();
+      shardVsNodes.put(slice.getName(), n);
+      for (Replica replica : slice.getReplicas()) {
+        Integer count = n.get(replica.getNodeName());
+        if (count == null) count = 0;
+        n.put(replica.getNodeName(), ++count);
+      }
+    }
+    List snitches = (List) coll.get(SNITCH);
+    List<String> nodesList = createNodeList == null ?
+        new ArrayList<>(clusterState.getLiveNodes()) :
+        createNodeList;
+    Map<ReplicaPosition, String> positions = new ReplicaAssigner(
+        rules,
+        Collections.singletonMap(shard, numberOfNodes),
+        snitches,
+        shardVsNodes,
+        nodesList, cloudManager, clusterState).getNodeMappings();
+
+    return positions.entrySet().stream().map(e -> e.getKey().setNode(e.getValue())).collect(Collectors.toList());// getReplicaCounts(positions);
+  }
+
+  private static HashMap<String, ReplicaCount> getNodeNameVsShardCount(String collectionName,
+                                                                       ClusterState clusterState, List<String> createNodeList) {
+    Set<String> nodes = clusterState.getLiveNodes();
+
+    List<String> nodeList = new ArrayList<>(nodes.size());
+    nodeList.addAll(nodes);
+    if (createNodeList != null) nodeList.retainAll(createNodeList);
+
+    HashMap<String, ReplicaCount> nodeNameVsShardCount = new HashMap<>();
+    for (String s : nodeList) {
+      nodeNameVsShardCount.put(s, new ReplicaCount(s));
+    }
+    if (createNodeList != null) { // Overrides petty considerations about maxShardsPerNode
+      if (createNodeList.size() != nodeNameVsShardCount.size()) {
+        throw new SolrException(SolrException.ErrorCode.BAD_REQUEST,
+            "At least one of the node(s) specified " + createNodeList + " are not currently active in "
+                + nodeNameVsShardCount.keySet() + ", no action taken.");
+      }
+      return nodeNameVsShardCount;
+    }
+    DocCollection coll = clusterState.getCollection(collectionName);
+    Integer maxShardsPerNode = coll.getMaxShardsPerNode();
+    Map<String, DocCollection> collections = clusterState.getCollectionsMap();
+    for (Map.Entry<String, DocCollection> entry : collections.entrySet()) {
+      DocCollection c = entry.getValue();
+      //identify suitable nodes  by checking the no:of cores in each of them
+      for (Slice slice : c.getSlices()) {
+        Collection<Replica> replicas = slice.getReplicas();
+        for (Replica replica : replicas) {
+          ReplicaCount count = nodeNameVsShardCount.get(replica.getNodeName());
+          if (count != null) {
+            count.totalNodes++; // Used ot "weigh" whether this node should be used later.
+            if (entry.getKey().equals(collectionName)) {
+              count.thisCollectionNodes++;
+              if (count.thisCollectionNodes >= maxShardsPerNode) nodeNameVsShardCount.remove(replica.getNodeName());
+            }
+          }
+        }
+      }
+    }
+
+    return nodeNameVsShardCount;
+  }
+
+
+}