You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by no...@apache.org on 2016/08/19 05:44:45 UTC

[1/3] lucene-solr:master: SOLR-9421: Refactored out OverseerCollectionMessageHandler to smaller classes

Repository: lucene-solr
Updated Branches:
  refs/heads/master 9e1a25e77 -> bbd1efe5d


http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/bbd1efe5/solr/core/src/java/org/apache/solr/cloud/OverseerRoleCmd.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/OverseerRoleCmd.java b/solr/core/src/java/org/apache/solr/cloud/OverseerRoleCmd.java
new file mode 100644
index 0000000..0f450bd
--- /dev/null
+++ b/solr/core/src/java/org/apache/solr/cloud/OverseerRoleCmd.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.cloud;
+
+
+import java.lang.invoke.MethodHandles;
+import java.util.ArrayList;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.solr.cloud.OverseerCollectionMessageHandler.Cmd;
+import org.apache.solr.common.cloud.ClusterState;
+import org.apache.solr.common.cloud.SolrZkClient;
+import org.apache.solr.common.cloud.ZkNodeProps;
+import org.apache.solr.common.cloud.ZkStateReader;
+import org.apache.solr.common.params.CollectionParams.CollectionAction;
+import org.apache.solr.common.util.NamedList;
+import org.apache.solr.common.util.Utils;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.data.Stat;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.solr.common.params.CollectionParams.CollectionAction.ADDROLE;
+import static org.apache.solr.common.params.CollectionParams.CollectionAction.REMOVEROLE;
+
+public class OverseerRoleCmd implements Cmd {
+  private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+  private final OverseerCollectionMessageHandler ocmh;
+  private final CollectionAction operation;
+  private final OverseerNodePrioritizer overseerPrioritizer;
+
+
+
+  public OverseerRoleCmd(OverseerCollectionMessageHandler ocmh, CollectionAction operation, OverseerNodePrioritizer prioritizer) {
+    this.ocmh = ocmh;
+    this.operation = operation;
+    this.overseerPrioritizer = prioritizer;
+  }
+
+  @Override
+  @SuppressWarnings("unchecked")
+  public void call(ClusterState state, ZkNodeProps message, NamedList results) throws Exception {
+    ZkStateReader zkStateReader = ocmh.zkStateReader;
+    SolrZkClient zkClient = zkStateReader.getZkClient();
+    Map roles = null;
+    String node = message.getStr("node");
+
+    String roleName = message.getStr("role");
+    boolean nodeExists = false;
+    if (nodeExists = zkClient.exists(ZkStateReader.ROLES, true)) {
+      roles = (Map) Utils.fromJSON(zkClient.getData(ZkStateReader.ROLES, null, new Stat(), true));
+    } else {
+      roles = new LinkedHashMap(1);
+    }
+
+    List nodeList = (List) roles.get(roleName);
+    if (nodeList == null) roles.put(roleName, nodeList = new ArrayList());
+    if (ADDROLE == operation) {
+      log.info("Overseer role added to {}", node);
+      if (!nodeList.contains(node)) nodeList.add(node);
+    } else if (REMOVEROLE == operation) {
+      log.info("Overseer role removed from {}", node);
+      nodeList.remove(node);
+    }
+
+    if (nodeExists) {
+      zkClient.setData(ZkStateReader.ROLES, Utils.toJSON(roles), true);
+    } else {
+      zkClient.create(ZkStateReader.ROLES, Utils.toJSON(roles), CreateMode.PERSISTENT, true);
+    }
+    //if there are too many nodes this command may time out. And most likely dedicated
+    // overseers are created when there are too many nodes  . So , do this operation in a separate thread
+    new Thread(() -> {
+      try {
+        overseerPrioritizer.prioritizeOverseerNodes(ocmh.myId);
+      } catch (Exception e) {
+        log.error("Error in prioritizing Overseer", e);
+      }
+
+    }).start();
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/bbd1efe5/solr/core/src/java/org/apache/solr/cloud/OverseerStatusCmd.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/OverseerStatusCmd.java b/solr/core/src/java/org/apache/solr/cloud/OverseerStatusCmd.java
new file mode 100644
index 0000000..22e2270
--- /dev/null
+++ b/solr/core/src/java/org/apache/solr/cloud/OverseerStatusCmd.java
@@ -0,0 +1,122 @@
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.cloud;
+
+import java.lang.invoke.MethodHandles;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.solr.cloud.OverseerCollectionMessageHandler.Cmd;
+import org.apache.solr.common.cloud.ClusterState;
+import org.apache.solr.common.cloud.ZkNodeProps;
+import org.apache.solr.common.cloud.ZkStateReader;
+import org.apache.solr.common.util.NamedList;
+import org.apache.solr.common.util.SimpleOrderedMap;
+import org.apache.solr.util.stats.Snapshot;
+import org.apache.solr.util.stats.Timer;
+import org.apache.zookeeper.data.Stat;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class OverseerStatusCmd implements Cmd {
+  private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+  private final OverseerCollectionMessageHandler ocmh;
+
+  public OverseerStatusCmd(OverseerCollectionMessageHandler ocmh) {
+    this.ocmh = ocmh;
+  }
+
+  @Override
+  @SuppressWarnings("unchecked")
+  public void call(ClusterState state, ZkNodeProps message, NamedList results) throws Exception {
+    ZkStateReader zkStateReader = ocmh.zkStateReader;
+    String leaderNode = OverseerTaskProcessor.getLeaderNode(zkStateReader.getZkClient());
+    results.add("leader", leaderNode);
+    Stat stat = new Stat();
+    zkStateReader.getZkClient().getData("/overseer/queue",null, stat, true);
+    results.add("overseer_queue_size", stat.getNumChildren());
+    stat = new Stat();
+    zkStateReader.getZkClient().getData("/overseer/queue-work",null, stat, true);
+    results.add("overseer_work_queue_size", stat.getNumChildren());
+    stat = new Stat();
+    zkStateReader.getZkClient().getData("/overseer/collection-queue-work",null, stat, true);
+    results.add("overseer_collection_queue_size", stat.getNumChildren());
+
+    NamedList overseerStats = new NamedList();
+    NamedList collectionStats = new NamedList();
+    NamedList stateUpdateQueueStats = new NamedList();
+    NamedList workQueueStats = new NamedList();
+    NamedList collectionQueueStats = new NamedList();
+    Overseer.Stats stats = ocmh.stats;
+    for (Map.Entry<String, Overseer.Stat> entry : stats.getStats().entrySet()) {
+      String key = entry.getKey();
+      NamedList<Object> lst = new SimpleOrderedMap<>();
+      if (key.startsWith("collection_"))  {
+        collectionStats.add(key.substring(11), lst);
+        int successes = stats.getSuccessCount(entry.getKey());
+        int errors = stats.getErrorCount(entry.getKey());
+        lst.add("requests", successes);
+        lst.add("errors", errors);
+        List<Overseer.FailedOp> failureDetails = stats.getFailureDetails(key);
+        if (failureDetails != null) {
+          List<SimpleOrderedMap<Object>> failures = new ArrayList<>();
+          for (Overseer.FailedOp failedOp : failureDetails) {
+            SimpleOrderedMap<Object> fail = new SimpleOrderedMap<>();
+            fail.add("request", failedOp.req.getProperties());
+            fail.add("response", failedOp.resp.getResponse());
+            failures.add(fail);
+          }
+          lst.add("recent_failures", failures);
+        }
+      } else if (key.startsWith("/overseer/queue_"))  {
+        stateUpdateQueueStats.add(key.substring(16), lst);
+      } else if (key.startsWith("/overseer/queue-work_"))  {
+        workQueueStats.add(key.substring(21), lst);
+      } else if (key.startsWith("/overseer/collection-queue-work_"))  {
+        collectionQueueStats.add(key.substring(32), lst);
+      } else  {
+        // overseer stats
+        overseerStats.add(key, lst);
+        int successes = stats.getSuccessCount(entry.getKey());
+        int errors = stats.getErrorCount(entry.getKey());
+        lst.add("requests", successes);
+        lst.add("errors", errors);
+      }
+      Timer timer = entry.getValue().requestTime;
+      Snapshot snapshot = timer.getSnapshot();
+      lst.add("totalTime", timer.getSum());
+      lst.add("avgRequestsPerMinute", timer.getMeanRate());
+      lst.add("5minRateRequestsPerMinute", timer.getFiveMinuteRate());
+      lst.add("15minRateRequestsPerMinute", timer.getFifteenMinuteRate());
+      lst.add("avgTimePerRequest", timer.getMean());
+      lst.add("medianRequestTime", snapshot.getMedian());
+      lst.add("75thPctlRequestTime", snapshot.get75thPercentile());
+      lst.add("95thPctlRequestTime", snapshot.get95thPercentile());
+      lst.add("99thPctlRequestTime", snapshot.get99thPercentile());
+      lst.add("999thPctlRequestTime", snapshot.get999thPercentile());
+    }
+    results.add("overseer_operations", overseerStats);
+    results.add("collection_operations", collectionStats);
+    results.add("overseer_queue", stateUpdateQueueStats);
+    results.add("overseer_internal_queue", workQueueStats);
+    results.add("collection_queue", collectionQueueStats);
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/bbd1efe5/solr/core/src/java/org/apache/solr/cloud/RestoreCmd.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/RestoreCmd.java b/solr/core/src/java/org/apache/solr/cloud/RestoreCmd.java
new file mode 100644
index 0000000..af2215c
--- /dev/null
+++ b/solr/core/src/java/org/apache/solr/cloud/RestoreCmd.java
@@ -0,0 +1,243 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.cloud;
+
+
+import java.lang.invoke.MethodHandles;
+import java.net.URI;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.Set;
+
+import org.apache.solr.cloud.overseer.OverseerAction;
+import org.apache.solr.common.cloud.ClusterState;
+import org.apache.solr.common.cloud.DocCollection;
+import org.apache.solr.common.cloud.ImplicitDocRouter;
+import org.apache.solr.common.cloud.Slice;
+import org.apache.solr.common.cloud.ZkNodeProps;
+import org.apache.solr.common.cloud.ZkStateReader;
+import org.apache.solr.common.params.CoreAdminParams;
+import org.apache.solr.common.params.ModifiableSolrParams;
+import org.apache.solr.common.util.NamedList;
+import org.apache.solr.common.util.StrUtils;
+import org.apache.solr.common.util.Utils;
+import org.apache.solr.core.CoreContainer;
+import org.apache.solr.core.backup.BackupManager;
+import org.apache.solr.core.backup.repository.BackupRepository;
+import org.apache.solr.handler.component.ShardHandler;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.solr.cloud.OverseerCollectionMessageHandler.COLL_CONF;
+import static org.apache.solr.cloud.OverseerCollectionMessageHandler.COLL_PROPS;
+import static org.apache.solr.cloud.OverseerCollectionMessageHandler.CREATE_NODE_SET;
+import static org.apache.solr.cloud.OverseerCollectionMessageHandler.CREATE_NODE_SET_EMPTY;
+import static org.apache.solr.cloud.OverseerCollectionMessageHandler.NUM_SLICES;
+import static org.apache.solr.cloud.OverseerCollectionMessageHandler.SHARDS_PROP;
+import static org.apache.solr.common.cloud.ZkStateReader.COLLECTION_PROP;
+import static org.apache.solr.common.cloud.ZkStateReader.SHARD_ID_PROP;
+import static org.apache.solr.common.params.CollectionParams.CollectionAction.CREATE;
+import static org.apache.solr.common.params.CollectionParams.CollectionAction.CREATESHARD;
+import static org.apache.solr.common.params.CommonAdminParams.ASYNC;
+import static org.apache.solr.common.params.CommonParams.NAME;
+
+
+public class RestoreCmd implements OverseerCollectionMessageHandler.Cmd {
+  private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+  private final OverseerCollectionMessageHandler ocmh;
+
+  public RestoreCmd(OverseerCollectionMessageHandler ocmh) {
+    this.ocmh = ocmh;
+  }
+
+  @Override
+  public void call(ClusterState state, ZkNodeProps message, NamedList results) throws Exception {
+    // TODO maybe we can inherit createCollection's options/code
+    String restoreCollectionName = message.getStr(COLLECTION_PROP);
+    String backupName = message.getStr(NAME); // of backup
+    ShardHandler shardHandler = ocmh.shardHandlerFactory.getShardHandler();
+    String asyncId = message.getStr(ASYNC);
+    String repo = message.getStr(CoreAdminParams.BACKUP_REPOSITORY);
+    String location = message.getStr(CoreAdminParams.BACKUP_LOCATION);
+    Map<String, String> requestMap = new HashMap<>();
+
+    CoreContainer cc = ocmh.overseer.getZkController().getCoreContainer();
+    BackupRepository repository = cc.newBackupRepository(Optional.ofNullable(repo));
+
+    URI backupPath = repository.createURI(location, backupName);
+    ZkStateReader zkStateReader = ocmh.zkStateReader;
+    BackupManager backupMgr = new BackupManager(repository, zkStateReader, restoreCollectionName);
+
+    Properties properties = backupMgr.readBackupProperties(location, backupName);
+    String backupCollection = properties.getProperty(BackupManager.COLLECTION_NAME_PROP);
+    DocCollection backupCollectionState = backupMgr.readCollectionState(location, backupName, backupCollection);
+
+    //Upload the configs
+    String configName = (String) properties.get(COLL_CONF);
+    String restoreConfigName = message.getStr(COLL_CONF, configName);
+    if (zkStateReader.getConfigManager().configExists(restoreConfigName)) {
+      log.info("Using existing config {}", restoreConfigName);
+      //TODO add overwrite option?
+    } else {
+      log.info("Uploading config {}", restoreConfigName);
+      backupMgr.uploadConfigDir(location, backupName, configName, restoreConfigName);
+    }
+
+    log.info("Starting restore into collection={} with backup_name={} at location={}", restoreCollectionName, backupName,
+        location);
+
+    //Create core-less collection
+    {
+      Map<String, Object> propMap = new HashMap<>();
+      propMap.put(Overseer.QUEUE_OPERATION, CREATE.toString());
+      propMap.put("fromApi", "true"); // mostly true.  Prevents autoCreated=true in the collection state.
+
+      // inherit settings from input API, defaulting to the backup's setting.  Ex: replicationFactor
+      for (String collProp : COLL_PROPS.keySet()) {
+        Object val = message.getProperties().getOrDefault(collProp, backupCollectionState.get(collProp));
+        if (val != null) {
+          propMap.put(collProp, val);
+        }
+      }
+
+      propMap.put(NAME, restoreCollectionName);
+      propMap.put(CREATE_NODE_SET, CREATE_NODE_SET_EMPTY); //no cores
+      propMap.put(COLL_CONF, restoreConfigName);
+
+      // router.*
+      @SuppressWarnings("unchecked")
+      Map<String, Object> routerProps = (Map<String, Object>) backupCollectionState.getProperties().get(DocCollection.DOC_ROUTER);
+      for (Map.Entry<String, Object> pair : routerProps.entrySet()) {
+        propMap.put(DocCollection.DOC_ROUTER + "." + pair.getKey(), pair.getValue());
+      }
+
+      Set<String> sliceNames = backupCollectionState.getActiveSlicesMap().keySet();
+      if (backupCollectionState.getRouter() instanceof ImplicitDocRouter) {
+        propMap.put(SHARDS_PROP, StrUtils.join(sliceNames, ','));
+      } else {
+        propMap.put(NUM_SLICES, sliceNames.size());
+        // ClusterStateMutator.createCollection detects that "slices" is in fact a slice structure instead of a
+        //   list of names, and if so uses this instead of building it.  We clear the replica list.
+        Collection<Slice> backupSlices = backupCollectionState.getActiveSlices();
+        Map<String, Slice> newSlices = new LinkedHashMap<>(backupSlices.size());
+        for (Slice backupSlice : backupSlices) {
+          newSlices.put(backupSlice.getName(),
+              new Slice(backupSlice.getName(), Collections.emptyMap(), backupSlice.getProperties()));
+        }
+        propMap.put(SHARDS_PROP, newSlices);
+      }
+
+      ocmh.commandMap.get(CREATE).call(zkStateReader.getClusterState(), new ZkNodeProps(propMap), new NamedList());
+      // note: when createCollection() returns, the collection exists (no race)
+    }
+
+    DocCollection restoreCollection = zkStateReader.getClusterState().getCollection(restoreCollectionName);
+
+    DistributedQueue inQueue = Overseer.getStateUpdateQueue(zkStateReader.getZkClient());
+
+    //Mark all shards in CONSTRUCTION STATE while we restore the data
+    {
+      //TODO might instead createCollection accept an initial state?  Is there a race?
+      Map<String, Object> propMap = new HashMap<>();
+      propMap.put(Overseer.QUEUE_OPERATION, OverseerAction.UPDATESHARDSTATE.toLower());
+      for (Slice shard : restoreCollection.getSlices()) {
+        propMap.put(shard.getName(), Slice.State.CONSTRUCTION.toString());
+      }
+      propMap.put(ZkStateReader.COLLECTION_PROP, restoreCollectionName);
+      inQueue.offer(Utils.toJSON(new ZkNodeProps(propMap)));
+    }
+
+    // TODO how do we leverage the CREATE_NODE_SET / RULE / SNITCH logic in createCollection?
+
+    ClusterState clusterState = zkStateReader.getClusterState();
+    //Create one replica per shard and copy backed up data to it
+    for (Slice slice : restoreCollection.getSlices()) {
+      log.debug("Adding replica for shard={} collection={} ", slice.getName(), restoreCollection);
+      HashMap<String, Object> propMap = new HashMap<>();
+      propMap.put(Overseer.QUEUE_OPERATION, CREATESHARD);
+      propMap.put(COLLECTION_PROP, restoreCollectionName);
+      propMap.put(SHARD_ID_PROP, slice.getName());
+      // add async param
+      if (asyncId != null) {
+        propMap.put(ASYNC, asyncId);
+      }
+      ocmh.addPropertyParams(message, propMap);
+
+      ocmh.addReplica(clusterState, new ZkNodeProps(propMap), new NamedList(), null);
+    }
+
+    //refresh the location copy of collection state
+    restoreCollection = zkStateReader.getClusterState().getCollection(restoreCollectionName);
+
+    //Copy data from backed up index to each replica
+    for (Slice slice : restoreCollection.getSlices()) {
+      ModifiableSolrParams params = new ModifiableSolrParams();
+      params.set(CoreAdminParams.ACTION, CoreAdminParams.CoreAdminAction.RESTORECORE.toString());
+      params.set(NAME, "snapshot." + slice.getName());
+      params.set(CoreAdminParams.BACKUP_LOCATION, backupPath.getPath());
+      params.set(CoreAdminParams.BACKUP_REPOSITORY, repo);
+
+      ocmh.sliceCmd(clusterState, params, null, slice, shardHandler, asyncId, requestMap);
+    }
+    ocmh.processResponses(new NamedList(), shardHandler, true, "Could not restore core", asyncId, requestMap);
+
+    //Mark all shards in ACTIVE STATE
+    {
+      HashMap<String, Object> propMap = new HashMap<>();
+      propMap.put(Overseer.QUEUE_OPERATION, OverseerAction.UPDATESHARDSTATE.toLower());
+      propMap.put(ZkStateReader.COLLECTION_PROP, restoreCollectionName);
+      for (Slice shard : restoreCollection.getSlices()) {
+        propMap.put(shard.getName(), Slice.State.ACTIVE.toString());
+      }
+      inQueue.offer(Utils.toJSON(new ZkNodeProps(propMap)));
+    }
+
+    //refresh the location copy of collection state
+    restoreCollection = zkStateReader.getClusterState().getCollection(restoreCollectionName);
+
+    //Add the remaining replicas for each shard
+    Integer numReplicas = restoreCollection.getReplicationFactor();
+    if (numReplicas != null && numReplicas > 1) {
+      log.info("Adding replicas to restored collection={}", restoreCollection);
+
+      for (Slice slice : restoreCollection.getSlices()) {
+        for (int i = 1; i < numReplicas; i++) {
+          log.debug("Adding replica for shard={} collection={} ", slice.getName(), restoreCollection);
+          HashMap<String, Object> propMap = new HashMap<>();
+          propMap.put(COLLECTION_PROP, restoreCollectionName);
+          propMap.put(SHARD_ID_PROP, slice.getName());
+          // add async param
+          if (asyncId != null) {
+            propMap.put(ASYNC, asyncId);
+          }
+          ocmh.addPropertyParams(message, propMap);
+
+          ocmh.addReplica(zkStateReader.getClusterState(), new ZkNodeProps(propMap), results, null);
+        }
+      }
+    }
+
+    log.info("Completed restoring collection={} backupName={}", restoreCollection, backupName);
+  }
+}

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/bbd1efe5/solr/core/src/java/org/apache/solr/cloud/SplitShardCmd.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/SplitShardCmd.java b/solr/core/src/java/org/apache/solr/cloud/SplitShardCmd.java
new file mode 100644
index 0000000..d7bbf66
--- /dev/null
+++ b/solr/core/src/java/org/apache/solr/cloud/SplitShardCmd.java
@@ -0,0 +1,458 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.cloud;
+
+
+import java.lang.invoke.MethodHandles;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.solr.client.solrj.request.CoreAdminRequest;
+import org.apache.solr.cloud.OverseerCollectionMessageHandler.Cmd;
+import org.apache.solr.cloud.overseer.OverseerAction;
+import org.apache.solr.cloud.rule.ReplicaAssigner;
+import org.apache.solr.common.SolrException;
+import org.apache.solr.common.cloud.ClusterState;
+import org.apache.solr.common.cloud.CompositeIdRouter;
+import org.apache.solr.common.cloud.DocCollection;
+import org.apache.solr.common.cloud.DocRouter;
+import org.apache.solr.common.cloud.PlainIdRouter;
+import org.apache.solr.common.cloud.Replica;
+import org.apache.solr.common.cloud.Slice;
+import org.apache.solr.common.cloud.ZkNodeProps;
+import org.apache.solr.common.cloud.ZkStateReader;
+import org.apache.solr.common.params.CoreAdminParams;
+import org.apache.solr.common.params.ModifiableSolrParams;
+import org.apache.solr.common.util.NamedList;
+import org.apache.solr.common.util.Utils;
+import org.apache.solr.handler.component.ShardHandler;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.solr.cloud.OverseerCollectionMessageHandler.COLL_PROP_PREFIX;
+import static org.apache.solr.cloud.OverseerCollectionMessageHandler.SKIP_CREATE_REPLICA_IN_CLUSTER_STATE;
+import static org.apache.solr.common.cloud.ZkStateReader.COLLECTION_PROP;
+import static org.apache.solr.common.cloud.ZkStateReader.SHARD_ID_PROP;
+import static org.apache.solr.common.params.CollectionParams.CollectionAction.ADDREPLICA;
+import static org.apache.solr.common.params.CollectionParams.CollectionAction.CREATESHARD;
+import static org.apache.solr.common.params.CollectionParams.CollectionAction.DELETESHARD;
+import static org.apache.solr.common.params.CommonAdminParams.ASYNC;
+
+
+public class SplitShardCmd implements Cmd {
+  private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+  private final OverseerCollectionMessageHandler ocmh;
+
+  public SplitShardCmd(OverseerCollectionMessageHandler ocmh) {
+    this.ocmh = ocmh;
+  }
+
+  @Override
+  public void call(ClusterState state, ZkNodeProps message, NamedList results) throws Exception {
+    split(state, message, results);
+  }
+
+  public boolean split(ClusterState clusterState, ZkNodeProps message, NamedList results) throws Exception {
+    String collectionName = message.getStr("collection");
+    String slice = message.getStr(ZkStateReader.SHARD_ID_PROP);
+
+    log.info("Split shard invoked");
+    ZkStateReader zkStateReader = ocmh.zkStateReader;
+
+    String splitKey = message.getStr("split.key");
+    ShardHandler shardHandler = ocmh.shardHandlerFactory.getShardHandler();
+
+    DocCollection collection = clusterState.getCollection(collectionName);
+    DocRouter router = collection.getRouter() != null ? collection.getRouter() : DocRouter.DEFAULT;
+
+    Slice parentSlice;
+
+    if (slice == null) {
+      if (router instanceof CompositeIdRouter) {
+        Collection<Slice> searchSlices = router.getSearchSlicesSingle(splitKey, new ModifiableSolrParams(), collection);
+        if (searchSlices.isEmpty()) {
+          throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Unable to find an active shard for split.key: " + splitKey);
+        }
+        if (searchSlices.size() > 1) {
+          throw new SolrException(SolrException.ErrorCode.BAD_REQUEST,
+              "Splitting a split.key: " + splitKey + " which spans multiple shards is not supported");
+        }
+        parentSlice = searchSlices.iterator().next();
+        slice = parentSlice.getName();
+        log.info("Split by route.key: {}, parent shard is: {} ", splitKey, slice);
+      } else {
+        throw new SolrException(SolrException.ErrorCode.BAD_REQUEST,
+            "Split by route key can only be used with CompositeIdRouter or subclass. Found router: "
+                + router.getClass().getName());
+      }
+    } else {
+      parentSlice = collection.getSlice(slice);
+    }
+
+    if (parentSlice == null) {
+      // no chance of the collection being null because ClusterState#getCollection(String) would have thrown
+      // an exception already
+      throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "No shard with the specified name exists: " + slice);
+    }
+
+    // find the leader for the shard
+    Replica parentShardLeader = null;
+    try {
+      parentShardLeader = zkStateReader.getLeaderRetry(collectionName, slice, 10000);
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+    }
+
+    DocRouter.Range range = parentSlice.getRange();
+    if (range == null) {
+      range = new PlainIdRouter().fullRange();
+    }
+
+    List<DocRouter.Range> subRanges = null;
+    String rangesStr = message.getStr(CoreAdminParams.RANGES);
+    if (rangesStr != null) {
+      String[] ranges = rangesStr.split(",");
+      if (ranges.length == 0 || ranges.length == 1) {
+        throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "There must be at least two ranges specified to split a shard");
+      } else {
+        subRanges = new ArrayList<>(ranges.length);
+        for (int i = 0; i < ranges.length; i++) {
+          String r = ranges[i];
+          try {
+            subRanges.add(DocRouter.DEFAULT.fromString(r));
+          } catch (Exception e) {
+            throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Exception in parsing hexadecimal hash range: " + r, e);
+          }
+          if (!subRanges.get(i).isSubsetOf(range)) {
+            throw new SolrException(SolrException.ErrorCode.BAD_REQUEST,
+                "Specified hash range: " + r + " is not a subset of parent shard's range: " + range.toString());
+          }
+        }
+        List<DocRouter.Range> temp = new ArrayList<>(subRanges); // copy to preserve original order
+        Collections.sort(temp);
+        if (!range.equals(new DocRouter.Range(temp.get(0).min, temp.get(temp.size() - 1).max))) {
+          throw new SolrException(SolrException.ErrorCode.BAD_REQUEST,
+              "Specified hash ranges: " + rangesStr + " do not cover the entire range of parent shard: " + range);
+        }
+        for (int i = 1; i < temp.size(); i++) {
+          if (temp.get(i - 1).max + 1 != temp.get(i).min) {
+            throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Specified hash ranges: " + rangesStr
+                + " either overlap with each other or " + "do not cover the entire range of parent shard: " + range);
+          }
+        }
+      }
+    } else if (splitKey != null) {
+      if (router instanceof CompositeIdRouter) {
+        CompositeIdRouter compositeIdRouter = (CompositeIdRouter) router;
+        subRanges = compositeIdRouter.partitionRangeByKey(splitKey, range);
+        if (subRanges.size() == 1) {
+          throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "The split.key: " + splitKey
+              + " has a hash range that is exactly equal to hash range of shard: " + slice);
+        }
+        for (DocRouter.Range subRange : subRanges) {
+          if (subRange.min == subRange.max) {
+            throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "The split.key: " + splitKey + " must be a compositeId");
+          }
+        }
+        log.info("Partitioning parent shard " + slice + " range: " + parentSlice.getRange() + " yields: " + subRanges);
+        rangesStr = "";
+        for (int i = 0; i < subRanges.size(); i++) {
+          DocRouter.Range subRange = subRanges.get(i);
+          rangesStr += subRange.toString();
+          if (i < subRanges.size() - 1) rangesStr += ',';
+        }
+      }
+    } else {
+      // todo: fixed to two partitions?
+      subRanges = router.partitionRange(2, range);
+    }
+
+    try {
+      List<String> subSlices = new ArrayList<>(subRanges.size());
+      List<String> subShardNames = new ArrayList<>(subRanges.size());
+      String nodeName = parentShardLeader.getNodeName();
+      for (int i = 0; i < subRanges.size(); i++) {
+        String subSlice = slice + "_" + i;
+        subSlices.add(subSlice);
+        String subShardName = collectionName + "_" + subSlice + "_replica1";
+        subShardNames.add(subShardName);
+
+        Slice oSlice = collection.getSlice(subSlice);
+        if (oSlice != null) {
+          final Slice.State state = oSlice.getState();
+          if (state == Slice.State.ACTIVE) {
+            throw new SolrException(SolrException.ErrorCode.BAD_REQUEST,
+                "Sub-shard: " + subSlice + " exists in active state. Aborting split shard.");
+          } else if (state == Slice.State.CONSTRUCTION || state == Slice.State.RECOVERY) {
+            // delete the shards
+            for (String sub : subSlices) {
+              log.info("Sub-shard: {} already exists therefore requesting its deletion", sub);
+              Map<String, Object> propMap = new HashMap<>();
+              propMap.put(Overseer.QUEUE_OPERATION, "deleteshard");
+              propMap.put(COLLECTION_PROP, collectionName);
+              propMap.put(SHARD_ID_PROP, sub);
+              ZkNodeProps m = new ZkNodeProps(propMap);
+              try {
+                ocmh.commandMap.get(DELETESHARD).call(clusterState, m, new NamedList());
+              } catch (Exception e) {
+                throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Unable to delete already existing sub shard: " + sub,
+                    e);
+              }
+            }
+          }
+        }
+      }
+
+      final String asyncId = message.getStr(ASYNC);
+      Map<String, String> requestMap = new HashMap<>();
+
+      for (int i = 0; i < subRanges.size(); i++) {
+        String subSlice = subSlices.get(i);
+        String subShardName = subShardNames.get(i);
+        DocRouter.Range subRange = subRanges.get(i);
+
+        log.info("Creating slice " + subSlice + " of collection " + collectionName + " on " + nodeName);
+
+        Map<String, Object> propMap = new HashMap<>();
+        propMap.put(Overseer.QUEUE_OPERATION, CREATESHARD.toLower());
+        propMap.put(ZkStateReader.SHARD_ID_PROP, subSlice);
+        propMap.put(ZkStateReader.COLLECTION_PROP, collectionName);
+        propMap.put(ZkStateReader.SHARD_RANGE_PROP, subRange.toString());
+        propMap.put(ZkStateReader.SHARD_STATE_PROP, Slice.State.CONSTRUCTION.toString());
+        propMap.put(ZkStateReader.SHARD_PARENT_PROP, parentSlice.getName());
+        DistributedQueue inQueue = Overseer.getStateUpdateQueue(zkStateReader.getZkClient());
+        inQueue.offer(Utils.toJSON(new ZkNodeProps(propMap)));
+
+        // wait until we are able to see the new shard in cluster state
+        ocmh.waitForNewShard(collectionName, subSlice);
+
+        // refresh cluster state
+        clusterState = zkStateReader.getClusterState();
+
+        log.info("Adding replica " + subShardName + " as part of slice " + subSlice + " of collection " + collectionName
+            + " on " + nodeName);
+        propMap = new HashMap<>();
+        propMap.put(Overseer.QUEUE_OPERATION, ADDREPLICA.toLower());
+        propMap.put(COLLECTION_PROP, collectionName);
+        propMap.put(SHARD_ID_PROP, subSlice);
+        propMap.put("node", nodeName);
+        propMap.put(CoreAdminParams.NAME, subShardName);
+        // copy over property params:
+        for (String key : message.keySet()) {
+          if (key.startsWith(COLL_PROP_PREFIX)) {
+            propMap.put(key, message.getStr(key));
+          }
+        }
+        // add async param
+        if (asyncId != null) {
+          propMap.put(ASYNC, asyncId);
+        }
+        ocmh.addReplica(clusterState, new ZkNodeProps(propMap), results, null);
+      }
+
+      ocmh.processResponses(results, shardHandler, true, "SPLITSHARD failed to create subshard leaders", asyncId, requestMap);
+
+      for (String subShardName : subShardNames) {
+        // wait for parent leader to acknowledge the sub-shard core
+        log.info("Asking parent leader to wait for: " + subShardName + " to be alive on: " + nodeName);
+        String coreNodeName = ocmh.waitForCoreNodeName(collectionName, nodeName, subShardName);
+        CoreAdminRequest.WaitForState cmd = new CoreAdminRequest.WaitForState();
+        cmd.setCoreName(subShardName);
+        cmd.setNodeName(nodeName);
+        cmd.setCoreNodeName(coreNodeName);
+        cmd.setState(Replica.State.ACTIVE);
+        cmd.setCheckLive(true);
+        cmd.setOnlyIfLeader(true);
+
+        ModifiableSolrParams p = new ModifiableSolrParams(cmd.getParams());
+        ocmh.sendShardRequest(nodeName, p, shardHandler, asyncId, requestMap);
+      }
+
+      ocmh.processResponses(results, shardHandler, true, "SPLITSHARD timed out waiting for subshard leaders to come up",
+          asyncId, requestMap);
+
+      log.info("Successfully created all sub-shards for collection " + collectionName + " parent shard: " + slice
+          + " on: " + parentShardLeader);
+
+      log.info("Splitting shard " + parentShardLeader.getName() + " as part of slice " + slice + " of collection "
+          + collectionName + " on " + parentShardLeader);
+
+      ModifiableSolrParams params = new ModifiableSolrParams();
+      params.set(CoreAdminParams.ACTION, CoreAdminParams.CoreAdminAction.SPLIT.toString());
+      params.set(CoreAdminParams.CORE, parentShardLeader.getStr("core"));
+      for (int i = 0; i < subShardNames.size(); i++) {
+        String subShardName = subShardNames.get(i);
+        params.add(CoreAdminParams.TARGET_CORE, subShardName);
+      }
+      params.set(CoreAdminParams.RANGES, rangesStr);
+
+      ocmh.sendShardRequest(parentShardLeader.getNodeName(), params, shardHandler, asyncId, requestMap);
+
+      ocmh.processResponses(results, shardHandler, true, "SPLITSHARD failed to invoke SPLIT core admin command", asyncId,
+          requestMap);
+
+      log.info("Index on shard: " + nodeName + " split into two successfully");
+
+      // apply buffered updates on sub-shards
+      for (int i = 0; i < subShardNames.size(); i++) {
+        String subShardName = subShardNames.get(i);
+
+        log.info("Applying buffered updates on : " + subShardName);
+
+        params = new ModifiableSolrParams();
+        params.set(CoreAdminParams.ACTION, CoreAdminParams.CoreAdminAction.REQUESTAPPLYUPDATES.toString());
+        params.set(CoreAdminParams.NAME, subShardName);
+
+        ocmh.sendShardRequest(nodeName, params, shardHandler, asyncId, requestMap);
+      }
+
+      ocmh.processResponses(results, shardHandler, true, "SPLITSHARD failed while asking sub shard leaders" +
+          " to apply buffered updates", asyncId, requestMap);
+
+      log.info("Successfully applied buffered updates on : " + subShardNames);
+
+      // Replica creation for the new Slices
+
+      // look at the replication factor and see if it matches reality
+      // if it does not, find best nodes to create more cores
+
+      // TODO: Have replication factor decided in some other way instead of numShards for the parent
+
+      int repFactor = parentSlice.getReplicas().size();
+
+      // we need to look at every node and see how many cores it serves
+      // add our new cores to existing nodes serving the least number of cores
+      // but (for now) require that each core goes on a distinct node.
+
+      // TODO: add smarter options that look at the current number of cores per
+      // node?
+      // for now we just go random
+      Set<String> nodes = clusterState.getLiveNodes();
+      List<String> nodeList = new ArrayList<>(nodes.size());
+      nodeList.addAll(nodes);
+
+      // TODO: Have maxShardsPerNode param for this operation?
+
+      // Remove the node that hosts the parent shard for replica creation.
+      nodeList.remove(nodeName);
+
+      // TODO: change this to handle sharding a slice into > 2 sub-shards.
+
+
+      Map<ReplicaAssigner.Position, String> nodeMap = ocmh.identifyNodes(clusterState,
+          new ArrayList<>(clusterState.getLiveNodes()),
+          new ZkNodeProps(collection.getProperties()),
+          subSlices, repFactor - 1);
+
+      List<Map<String, Object>> replicas = new ArrayList<>((repFactor - 1) * 2);
+
+      for (Map.Entry<ReplicaAssigner.Position, String> entry : nodeMap.entrySet()) {
+        String sliceName = entry.getKey().shard;
+        String subShardNodeName = entry.getValue();
+        String shardName = collectionName + "_" + sliceName + "_replica" + (entry.getKey().index);
+
+        log.info("Creating replica shard " + shardName + " as part of slice " + sliceName + " of collection "
+            + collectionName + " on " + subShardNodeName);
+
+        ZkNodeProps props = new ZkNodeProps(Overseer.QUEUE_OPERATION, ADDREPLICA.toLower(),
+            ZkStateReader.COLLECTION_PROP, collectionName,
+            ZkStateReader.SHARD_ID_PROP, sliceName,
+            ZkStateReader.CORE_NAME_PROP, shardName,
+            ZkStateReader.STATE_PROP, Replica.State.DOWN.toString(),
+            ZkStateReader.BASE_URL_PROP, zkStateReader.getBaseUrlForNodeName(subShardNodeName),
+            ZkStateReader.NODE_NAME_PROP, subShardNodeName);
+        Overseer.getStateUpdateQueue(zkStateReader.getZkClient()).offer(Utils.toJSON(props));
+
+        HashMap<String, Object> propMap = new HashMap<>();
+        propMap.put(Overseer.QUEUE_OPERATION, ADDREPLICA.toLower());
+        propMap.put(COLLECTION_PROP, collectionName);
+        propMap.put(SHARD_ID_PROP, sliceName);
+        propMap.put("node", subShardNodeName);
+        propMap.put(CoreAdminParams.NAME, shardName);
+        // copy over property params:
+        for (String key : message.keySet()) {
+          if (key.startsWith(COLL_PROP_PREFIX)) {
+            propMap.put(key, message.getStr(key));
+          }
+        }
+        // add async param
+        if (asyncId != null) {
+          propMap.put(ASYNC, asyncId);
+        }
+        // special flag param to instruct addReplica not to create the replica in cluster state again
+        propMap.put(SKIP_CREATE_REPLICA_IN_CLUSTER_STATE, "true");
+
+        replicas.add(propMap);
+      }
+
+      // we must set the slice state into recovery before actually creating the replica cores
+      // this ensures that the logic inside Overseer to update sub-shard state to 'active'
+      // always gets a chance to execute. See SOLR-7673
+
+      if (repFactor == 1) {
+        // switch sub shard states to 'active'
+        log.info("Replication factor is 1 so switching shard states");
+        DistributedQueue inQueue = Overseer.getStateUpdateQueue(zkStateReader.getZkClient());
+        Map<String, Object> propMap = new HashMap<>();
+        propMap.put(Overseer.QUEUE_OPERATION, OverseerAction.UPDATESHARDSTATE.toLower());
+        propMap.put(slice, Slice.State.INACTIVE.toString());
+        for (String subSlice : subSlices) {
+          propMap.put(subSlice, Slice.State.ACTIVE.toString());
+        }
+        propMap.put(ZkStateReader.COLLECTION_PROP, collectionName);
+        ZkNodeProps m = new ZkNodeProps(propMap);
+        inQueue.offer(Utils.toJSON(m));
+      } else {
+        log.info("Requesting shard state be set to 'recovery'");
+        DistributedQueue inQueue = Overseer.getStateUpdateQueue(zkStateReader.getZkClient());
+        Map<String, Object> propMap = new HashMap<>();
+        propMap.put(Overseer.QUEUE_OPERATION, OverseerAction.UPDATESHARDSTATE.toLower());
+        for (String subSlice : subSlices) {
+          propMap.put(subSlice, Slice.State.RECOVERY.toString());
+        }
+        propMap.put(ZkStateReader.COLLECTION_PROP, collectionName);
+        ZkNodeProps m = new ZkNodeProps(propMap);
+        inQueue.offer(Utils.toJSON(m));
+      }
+
+      // now actually create replica cores on sub shard nodes
+      for (Map<String, Object> replica : replicas) {
+        ocmh.addReplica(clusterState, new ZkNodeProps(replica), results, null);
+      }
+
+      ocmh.processResponses(results, shardHandler, true, "SPLITSHARD failed to create subshard replicas", asyncId, requestMap);
+
+      log.info("Successfully created all replica shards for all sub-slices " + subSlices);
+
+      ocmh.commit(results, slice, parentShardLeader);
+
+      return true;
+    } catch (SolrException e) {
+      throw e;
+    } catch (Exception e) {
+      log.error("Error executing split operation for collection: " + collectionName + " parent shard: " + slice, e);
+      throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, null, e);
+    }
+  }
+}


[2/3] lucene-solr:master: SOLR-9421: Refactored out OverseerCollectionMessageHandler to smaller classes

Posted by no...@apache.org.
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/bbd1efe5/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionMessageHandler.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionMessageHandler.java b/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionMessageHandler.java
index 49e0942..36b5105 100644
--- a/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionMessageHandler.java
+++ b/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionMessageHandler.java
@@ -19,28 +19,19 @@ package org.apache.solr.cloud;
 import java.io.Closeable;
 import java.io.IOException;
 import java.lang.invoke.MethodHandles;
-import java.net.URI;
-import java.time.Instant;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.Iterator;
-import java.util.LinkedHashMap;
 import java.util.List;
-import java.util.Locale;
 import java.util.Map;
-import java.util.Optional;
-import java.util.Properties;
 import java.util.Random;
 import java.util.Set;
-import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.SynchronousQueue;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicReference;
 
 import com.google.common.collect.ImmutableMap;
 import org.apache.commons.lang.StringUtils;
@@ -49,93 +40,59 @@ import org.apache.solr.client.solrj.SolrServerException;
 import org.apache.solr.client.solrj.impl.HttpSolrClient;
 import org.apache.solr.client.solrj.impl.HttpSolrClient.RemoteSolrException;
 import org.apache.solr.client.solrj.request.AbstractUpdateRequest;
-import org.apache.solr.client.solrj.request.CoreAdminRequest;
 import org.apache.solr.client.solrj.request.UpdateRequest;
 import org.apache.solr.client.solrj.response.UpdateResponse;
-import org.apache.solr.cloud.Assign.ReplicaCount;
-import org.apache.solr.cloud.overseer.ClusterStateMutator;
 import org.apache.solr.cloud.overseer.OverseerAction;
 import org.apache.solr.cloud.rule.ReplicaAssigner;
 import org.apache.solr.cloud.rule.ReplicaAssigner.Position;
 import org.apache.solr.cloud.rule.Rule;
-import org.apache.solr.common.NonExistentCoreException;
 import org.apache.solr.common.SolrException;
 import org.apache.solr.common.SolrException.ErrorCode;
-import org.apache.solr.common.cloud.Aliases;
 import org.apache.solr.common.cloud.ClusterState;
-import org.apache.solr.common.cloud.CompositeIdRouter;
 import org.apache.solr.common.cloud.DocCollection;
 import org.apache.solr.common.cloud.DocRouter;
-import org.apache.solr.common.cloud.ImplicitDocRouter;
-import org.apache.solr.common.cloud.PlainIdRouter;
 import org.apache.solr.common.cloud.Replica;
-import org.apache.solr.common.cloud.RoutingRule;
 import org.apache.solr.common.cloud.Slice;
 import org.apache.solr.common.cloud.SolrZkClient;
 import org.apache.solr.common.cloud.ZkConfigManager;
 import org.apache.solr.common.cloud.ZkCoreNodeProps;
 import org.apache.solr.common.cloud.ZkNodeProps;
 import org.apache.solr.common.cloud.ZkStateReader;
-import org.apache.solr.common.params.CollectionParams;
+import org.apache.solr.common.params.CollectionParams.CollectionAction;
 import org.apache.solr.common.params.CoreAdminParams;
 import org.apache.solr.common.params.CoreAdminParams.CoreAdminAction;
 import org.apache.solr.common.params.ModifiableSolrParams;
-import org.apache.solr.common.params.ShardParams;
 import org.apache.solr.common.util.ExecutorUtil;
 import org.apache.solr.common.util.NamedList;
 import org.apache.solr.common.util.SimpleOrderedMap;
 import org.apache.solr.common.util.StrUtils;
 import org.apache.solr.common.util.SuppressForbidden;
 import org.apache.solr.common.util.Utils;
-import org.apache.solr.core.CoreContainer;
-import org.apache.solr.core.backup.BackupManager;
-import org.apache.solr.core.backup.repository.BackupRepository;
 import org.apache.solr.handler.component.ShardHandler;
 import org.apache.solr.handler.component.ShardHandlerFactory;
 import org.apache.solr.handler.component.ShardRequest;
 import org.apache.solr.handler.component.ShardResponse;
-import org.apache.solr.update.SolrIndexSplitter;
 import org.apache.solr.util.DefaultSolrThreadFactory;
 import org.apache.solr.util.RTimer;
 import org.apache.solr.util.TimeOut;
-import org.apache.solr.util.stats.Snapshot;
-import org.apache.solr.util.stats.Timer;
-import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.data.Stat;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import static org.apache.solr.cloud.Assign.getNodesForNewReplicas;
 import static org.apache.solr.common.cloud.DocCollection.SNITCH;
 import static org.apache.solr.common.cloud.ZkStateReader.BASE_URL_PROP;
 import static org.apache.solr.common.cloud.ZkStateReader.COLLECTION_PROP;
 import static org.apache.solr.common.cloud.ZkStateReader.CORE_NAME_PROP;
 import static org.apache.solr.common.cloud.ZkStateReader.CORE_NODE_NAME_PROP;
 import static org.apache.solr.common.cloud.ZkStateReader.ELECTION_NODE_PROP;
-import static org.apache.solr.common.cloud.ZkStateReader.MAX_SHARDS_PER_NODE;
 import static org.apache.solr.common.cloud.ZkStateReader.PROPERTY_PROP;
 import static org.apache.solr.common.cloud.ZkStateReader.PROPERTY_VALUE_PROP;
 import static org.apache.solr.common.cloud.ZkStateReader.REJOIN_AT_HEAD_PROP;
-import static org.apache.solr.common.cloud.ZkStateReader.REPLICATION_FACTOR;
 import static org.apache.solr.common.cloud.ZkStateReader.REPLICA_PROP;
 import static org.apache.solr.common.cloud.ZkStateReader.SHARD_ID_PROP;
-import static org.apache.solr.common.params.CollectionParams.CollectionAction.ADDREPLICA;
-import static org.apache.solr.common.params.CollectionParams.CollectionAction.ADDREPLICAPROP;
-import static org.apache.solr.common.params.CollectionParams.CollectionAction.ADDROLE;
-import static org.apache.solr.common.params.CollectionParams.CollectionAction.BALANCESHARDUNIQUE;
-import static org.apache.solr.common.params.CollectionParams.CollectionAction.CREATE;
-import static org.apache.solr.common.params.CollectionParams.CollectionAction.CREATESHARD;
-import static org.apache.solr.common.params.CollectionParams.CollectionAction.DELETE;
-import static org.apache.solr.common.params.CollectionParams.CollectionAction.DELETENODE;
-import static org.apache.solr.common.params.CollectionParams.CollectionAction.DELETEREPLICAPROP;
-import static org.apache.solr.common.params.CollectionParams.CollectionAction.DELETESHARD;
-import static org.apache.solr.common.params.CollectionParams.CollectionAction.MIGRATESTATEFORMAT;
-import static org.apache.solr.common.params.CollectionParams.CollectionAction.REMOVEROLE;
-import static org.apache.solr.common.params.CollectionParams.CollectionAction.REPLACENODE;
+import static org.apache.solr.common.params.CollectionParams.CollectionAction.*;
 import static org.apache.solr.common.params.CommonAdminParams.ASYNC;
 import static org.apache.solr.common.params.CommonParams.NAME;
-import static org.apache.solr.common.util.StrUtils.formatString;
 import static org.apache.solr.common.util.Utils.makeMap;
 
 /**
@@ -167,7 +124,7 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler
 
   public static final String ONLY_ACTIVE_NODES = "onlyactivenodes";
 
-  private static final String SKIP_CREATE_REPLICA_IN_CLUSTER_STATE = "skipCreateReplicaInClusterState";
+  static final String SKIP_CREATE_REPLICA_IN_CLUSTER_STATE = "skipCreateReplicaInClusterState";
 
   public static final Map<String, Object> COLL_PROPS = Collections.unmodifiableMap(makeMap(
       ROUTER, DocRouter.DEFAULT_NAME,
@@ -179,13 +136,12 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler
 
   private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
 
-  private Overseer overseer;
-  private ShardHandlerFactory shardHandlerFactory;
-  private String adminPath;
+  Overseer overseer;
+  ShardHandlerFactory shardHandlerFactory;
+  String adminPath;
   ZkStateReader zkStateReader;
-  private String myId;
-  private Overseer.Stats stats;
-  private OverseerNodePrioritizer overseerPrioritizer;
+  String myId;
+  Overseer.Stats stats;
 
   // Set that tracks collections that are currently being processed by a running task.
   // This is used for handling mutual exclusion of the tasks.
@@ -206,7 +162,8 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler
       RANDOM = new Random(seed.hashCode());
     }
   }
-  private final Map<CollectionParams.CollectionAction, Cmd> commandMap;
+
+  final Map<CollectionAction, Cmd> commandMap;
 
   public OverseerCollectionMessageHandler(ZkStateReader zkStateReader, String myId,
                                         final ShardHandlerFactory shardHandlerFactory,
@@ -220,108 +177,52 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler
     this.myId = myId;
     this.stats = stats;
     this.overseer = overseer;
-    this.overseerPrioritizer = overseerPrioritizer;
-    commandMap = new ImmutableMap.Builder<CollectionParams.CollectionAction, Cmd>()
+    commandMap = new ImmutableMap.Builder<CollectionAction, Cmd>()
         .put(REPLACENODE, new ReplaceNodeCmd(this))
         .put(DELETENODE, new DeleteNodeCmd(this))
+        .put(BACKUP, new BackupCmd(this))
+        .put(RESTORE, new RestoreCmd(this))
+        .put(SPLITSHARD, new SplitShardCmd(this))
+        .put(ADDROLE, new OverseerRoleCmd(this, ADDROLE, overseerPrioritizer))
+        .put(REMOVEROLE, new OverseerRoleCmd(this, REMOVEROLE, overseerPrioritizer))
+        .put(MOCK_COLL_TASK, this::mockOperation)
+        .put(MOCK_SHARD_TASK, this::mockOperation)
+        .put(MOCK_REPLICA_TASK, this::mockOperation)
+        .put(MIGRATESTATEFORMAT, this::migrateStateFormat)
+        .put(CREATESHARD, new CreateShardCmd(this))
+        .put(MIGRATE, new MigrateCmd(this))
+        .put(CREATE, new CreateCollectionCmd(this))
+        .put(MODIFYCOLLECTION, this::modifyCollection)
+        .put(ADDREPLICAPROP, this::processReplicaAddPropertyCommand)
+        .put(DELETEREPLICAPROP, this::processReplicaDeletePropertyCommand)
+        .put(BALANCESHARDUNIQUE, this::balanceProperty)
+        .put(REBALANCELEADERS, this::processRebalanceLeaders)
+        .put(RELOAD, this::reloadCollection)
+        .put(DELETE, new DeleteCollectionCmd(this))
+        .put(CREATEALIAS, new CreateAliasCmd(this))
+        .put(DELETEALIAS, new DeleteAliasCmd(this))
+        .put(OVERSEERSTATUS, new OverseerStatusCmd(this))
+        .put(DELETESHARD, new DeleteShardCmd(this))
+        .put(DELETEREPLICA, new DeleteReplicaCmd(this))
+        .put(ADDREPLICA, new AddReplicaCmd(this))
         .build()
     ;
   }
 
   @Override
-  @SuppressForbidden(reason = "Needs currentTimeMillis for mock requests")
   @SuppressWarnings("unchecked")
   public SolrResponse processMessage(ZkNodeProps message, String operation) {
     log.info("OverseerCollectionMessageHandler.processMessage : "+ operation + " , "+ message.toString());
 
     NamedList results = new NamedList();
     try {
-      CollectionParams.CollectionAction action = getCollectionAction(operation);
-      switch (action) {
-        case CREATE:
-          createCollection(zkStateReader.getClusterState(), message, results);
-          break;
-        case DELETE:
-          deleteCollection(message, results);
-          break;
-        case RELOAD:
-          reloadCollection(message, results);
-          break;
-        case CREATEALIAS:
-          createAlias(zkStateReader.getAliases(), message);
-          break;
-        case DELETEALIAS:
-          deleteAlias(zkStateReader.getAliases(), message);
-          break;
-        case SPLITSHARD:
-          splitShard(zkStateReader.getClusterState(), message, results);
-          break;
-        case DELETESHARD:
-          deleteShard(zkStateReader.getClusterState(), message, results);
-          break;
-        case CREATESHARD:
-          createShard(zkStateReader.getClusterState(), message, results);
-          break;
-        case DELETEREPLICA:
-          deleteReplica(zkStateReader.getClusterState(), message, results, null);
-          break;
-        case MIGRATE:
-          migrate(zkStateReader.getClusterState(), message, results);
-          break;
-        case ADDROLE:
-          processRoleCommand(message, operation);
-          break;
-        case REMOVEROLE:
-          processRoleCommand(message, operation);
-          break;
-        case ADDREPLICA:
-          addReplica(zkStateReader.getClusterState(), message, results, null);
-          break;
-        case OVERSEERSTATUS:
-          getOverseerStatus(message, results);
-          break;
-        case ADDREPLICAPROP:
-          processReplicaAddPropertyCommand(message);
-          break;
-        case DELETEREPLICAPROP:
-          processReplicaDeletePropertyCommand(message);
-          break;
-        case BALANCESHARDUNIQUE:
-          balanceProperty(message);
-          break;
-        case REBALANCELEADERS:
-          processRebalanceLeaders(message);
-          break;
-        case MODIFYCOLLECTION:
-          modifyCollection(message, results);
-          break;
-        case MIGRATESTATEFORMAT:
-          migrateStateFormat(message, results);
-          break;
-        case BACKUP:
-          processBackupAction(message, results);
-          break;
-        case RESTORE:
-          processRestoreAction(message, results);
-          break;
-        case MOCK_COLL_TASK:
-        case MOCK_SHARD_TASK:
-        case MOCK_REPLICA_TASK: {
-          //only for test purposes
-          Thread.sleep(message.getInt("sleep", 1));
-          log.info("MOCK_TASK_EXECUTED time {} data {}",System.currentTimeMillis(), Utils.toJSONString(message));
-          results.add("MOCK_FINISHED", System.currentTimeMillis());
-          break;
-        }
-        default: {
-          Cmd command = commandMap.get(action);
-          if (command != null) {
-            command.call(zkStateReader.getClusterState(), message, results);
-          } else {
-            throw new SolrException(ErrorCode.BAD_REQUEST, "Unknown operation:"
-                + operation);
-          }
-        }
+      CollectionAction action = getCollectionAction(operation);
+      Cmd command = commandMap.get(action);
+      if (command != null) {
+        command.call(zkStateReader.getClusterState(), message, results);
+      } else {
+        throw new SolrException(ErrorCode.BAD_REQUEST, "Unknown operation:"
+            + operation);
       }
     } catch (Exception e) {
       String collName = message.getStr("collection");
@@ -343,19 +244,23 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler
     return new OverseerSolrResponse(results);
   }
 
-  private CollectionParams.CollectionAction getCollectionAction(String operation) {
-    CollectionParams.CollectionAction action = CollectionParams.CollectionAction.get(operation);
+  @SuppressForbidden(reason = "Needs currentTimeMillis for mock requests")
+  private void mockOperation(ClusterState state, ZkNodeProps message, NamedList results) throws InterruptedException {
+    //only for test purposes
+    Thread.sleep(message.getInt("sleep", 1));
+    log.info("MOCK_TASK_EXECUTED time {} data {}", System.currentTimeMillis(), Utils.toJSONString(message));
+    results.add("MOCK_FINISHED", System.currentTimeMillis());
+  }
+
+  private CollectionAction getCollectionAction(String operation) {
+    CollectionAction action = CollectionAction.get(operation);
     if (action == null) {
       throw new SolrException(ErrorCode.BAD_REQUEST, "Unknown operation:" + operation);
     }
     return action;
   }
 
-  //
-  // TODO DWS: this class has gone out of control (too big); refactor to break it up
-  //
-
-  private void reloadCollection(ZkNodeProps message, NamedList results) {
+  private void reloadCollection(ClusterState clusterState, ZkNodeProps message, NamedList results) {
     ModifiableSolrParams params = new ModifiableSolrParams();
     params.set(CoreAdminParams.ACTION, CoreAdminAction.RELOAD.toString());
 
@@ -368,7 +273,8 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler
   }
 
   @SuppressWarnings("unchecked")
-  private void processRebalanceLeaders(ZkNodeProps message) throws KeeperException, InterruptedException {
+  private void processRebalanceLeaders(ClusterState clusterState, ZkNodeProps message, NamedList results)
+      throws Exception {
     checkRequired(message, COLLECTION_PROP, SHARD_ID_PROP, CORE_NAME_PROP, ELECTION_NODE_PROP,
         CORE_NODE_NAME_PROP, BASE_URL_PROP, REJOIN_AT_HEAD_PROP);
 
@@ -396,7 +302,8 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler
   }
 
   @SuppressWarnings("unchecked")
-  private void processReplicaAddPropertyCommand(ZkNodeProps message) throws KeeperException, InterruptedException {
+  private void processReplicaAddPropertyCommand(ClusterState clusterState, ZkNodeProps message, NamedList results)
+      throws Exception {
     checkRequired(message, COLLECTION_PROP, SHARD_ID_PROP, REPLICA_PROP, PROPERTY_PROP, PROPERTY_VALUE_PROP);
     SolrZkClient zkClient = zkStateReader.getZkClient();
     DistributedQueue inQueue = Overseer.getStateUpdateQueue(zkClient);
@@ -407,7 +314,8 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler
     inQueue.offer(Utils.toJSON(m));
   }
 
-  private void processReplicaDeletePropertyCommand(ZkNodeProps message) throws KeeperException, InterruptedException {
+  private void processReplicaDeletePropertyCommand(ClusterState clusterState, ZkNodeProps message, NamedList results)
+      throws KeeperException, InterruptedException {
     checkRequired(message, COLLECTION_PROP, SHARD_ID_PROP, REPLICA_PROP, PROPERTY_PROP);
     SolrZkClient zkClient = zkStateReader.getZkClient();
     DistributedQueue inQueue = Overseer.getStateUpdateQueue(zkClient);
@@ -418,7 +326,7 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler
     inQueue.offer(Utils.toJSON(m));
   }
 
-  private void balanceProperty(ZkNodeProps message) throws KeeperException, InterruptedException {
+  private void balanceProperty(ClusterState clusterState, ZkNodeProps message, NamedList results) throws KeeperException, InterruptedException {
     if (StringUtils.isBlank(message.getStr(COLLECTION_PROP)) || StringUtils.isBlank(message.getStr(PROPERTY_PROP))) {
       throw new SolrException(ErrorCode.BAD_REQUEST,
           "The '" + COLLECTION_PROP + "' and '" + PROPERTY_PROP +
@@ -432,81 +340,6 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler
     inQueue.offer(Utils.toJSON(new ZkNodeProps(propMap)));
   }
 
-
-  @SuppressWarnings("unchecked")
-  private void getOverseerStatus(ZkNodeProps message, NamedList results) throws KeeperException, InterruptedException {
-    String leaderNode = OverseerTaskProcessor.getLeaderNode(zkStateReader.getZkClient());
-    results.add("leader", leaderNode);
-    Stat stat = new Stat();
-    zkStateReader.getZkClient().getData("/overseer/queue",null, stat, true);
-    results.add("overseer_queue_size", stat.getNumChildren());
-    stat = new Stat();
-    zkStateReader.getZkClient().getData("/overseer/queue-work",null, stat, true);
-    results.add("overseer_work_queue_size", stat.getNumChildren());
-    stat = new Stat();
-    zkStateReader.getZkClient().getData("/overseer/collection-queue-work",null, stat, true);
-    results.add("overseer_collection_queue_size", stat.getNumChildren());
-
-    NamedList overseerStats = new NamedList();
-    NamedList collectionStats = new NamedList();
-    NamedList stateUpdateQueueStats = new NamedList();
-    NamedList workQueueStats = new NamedList();
-    NamedList collectionQueueStats = new NamedList();
-    for (Map.Entry<String, Overseer.Stat> entry : stats.getStats().entrySet()) {
-      String key = entry.getKey();
-      NamedList<Object> lst = new SimpleOrderedMap<>();
-      if (key.startsWith("collection_"))  {
-        collectionStats.add(key.substring(11), lst);
-        int successes = stats.getSuccessCount(entry.getKey());
-        int errors = stats.getErrorCount(entry.getKey());
-        lst.add("requests", successes);
-        lst.add("errors", errors);
-        List<Overseer.FailedOp> failureDetails = stats.getFailureDetails(key);
-        if (failureDetails != null) {
-          List<SimpleOrderedMap<Object>> failures = new ArrayList<>();
-          for (Overseer.FailedOp failedOp : failureDetails) {
-            SimpleOrderedMap<Object> fail = new SimpleOrderedMap<>();
-            fail.add("request", failedOp.req.getProperties());
-            fail.add("response", failedOp.resp.getResponse());
-            failures.add(fail);
-          }
-          lst.add("recent_failures", failures);
-        }
-      } else if (key.startsWith("/overseer/queue_"))  {
-        stateUpdateQueueStats.add(key.substring(16), lst);
-      } else if (key.startsWith("/overseer/queue-work_"))  {
-        workQueueStats.add(key.substring(21), lst);
-      } else if (key.startsWith("/overseer/collection-queue-work_"))  {
-        collectionQueueStats.add(key.substring(32), lst);
-      } else  {
-        // overseer stats
-        overseerStats.add(key, lst);
-        int successes = stats.getSuccessCount(entry.getKey());
-        int errors = stats.getErrorCount(entry.getKey());
-        lst.add("requests", successes);
-        lst.add("errors", errors);
-      }
-      Timer timer = entry.getValue().requestTime;
-      Snapshot snapshot = timer.getSnapshot();
-      lst.add("totalTime", timer.getSum());
-      lst.add("avgRequestsPerMinute", timer.getMeanRate());
-      lst.add("5minRateRequestsPerMinute", timer.getFiveMinuteRate());
-      lst.add("15minRateRequestsPerMinute", timer.getFifteenMinuteRate());
-      lst.add("avgTimePerRequest", timer.getMean());
-      lst.add("medianRequestTime", snapshot.getMedian());
-      lst.add("75thPctlRequestTime", snapshot.get75thPercentile());
-      lst.add("95thPctlRequestTime", snapshot.get95thPercentile());
-      lst.add("99thPctlRequestTime", snapshot.get99thPercentile());
-      lst.add("999thPctlRequestTime", snapshot.get999thPercentile());
-    }
-    results.add("overseer_operations", overseerStats);
-    results.add("collection_operations", collectionStats);
-    results.add("overseer_queue", stateUpdateQueueStats);
-    results.add("overseer_internal_queue", workQueueStats);
-    results.add("collection_queue", collectionQueueStats);
-
-  }
-
   /**
    * Walks the tree of collection status to verify that any replicas not reporting a "down" status is
    * on a live node, if any replicas reporting their status as "active" but the node is not live is
@@ -573,138 +406,13 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler
   }
 
   @SuppressWarnings("unchecked")
-  private void processRoleCommand(ZkNodeProps message, String operation) throws KeeperException, InterruptedException {
-    SolrZkClient zkClient = zkStateReader.getZkClient();
-    Map roles = null;
-    String node = message.getStr("node");
-
-    String roleName = message.getStr("role");
-    boolean nodeExists = false;
-    if(nodeExists = zkClient.exists(ZkStateReader.ROLES, true)){
-      roles = (Map) Utils.fromJSON(zkClient.getData(ZkStateReader.ROLES, null, new Stat(), true));
-    } else {
-      roles = new LinkedHashMap(1);
-    }
-
-    List nodeList= (List) roles.get(roleName);
-    if(nodeList == null) roles.put(roleName, nodeList = new ArrayList());
-    if(ADDROLE.toString().toLowerCase(Locale.ROOT).equals(operation) ){
-      log.info("Overseer role added to {}", node);
-      if(!nodeList.contains(node)) nodeList.add(node);
-    } else if(REMOVEROLE.toString().toLowerCase(Locale.ROOT).equals(operation)) {
-      log.info("Overseer role removed from {}", node);
-      nodeList.remove(node);
-    }
-
-    if(nodeExists){
-      zkClient.setData(ZkStateReader.ROLES, Utils.toJSON(roles),true);
-    } else {
-      zkClient.create(ZkStateReader.ROLES, Utils.toJSON(roles), CreateMode.PERSISTENT,true);
-    }
-    //if there are too many nodes this command may time out. And most likely dedicated
-    // overseers are created when there are too many nodes  . So , do this operation in a separate thread
-    new Thread(() -> {
-      try {
-        overseerPrioritizer.prioritizeOverseerNodes(myId);
-      } catch (Exception e) {
-        log.error("Error in prioritizing Overseer", e);
-      }
-
-    }).start();
-
-  }
-
-  @SuppressWarnings("unchecked")
   void deleteReplica(ClusterState clusterState, ZkNodeProps message, NamedList results, Runnable onComplete)
-      throws KeeperException, InterruptedException {
-    checkRequired(message, COLLECTION_PROP, SHARD_ID_PROP, REPLICA_PROP);
-    String collectionName = message.getStr(COLLECTION_PROP);
-    String shard = message.getStr(SHARD_ID_PROP);
-    String replicaName = message.getStr(REPLICA_PROP);
-    boolean parallel = message.getBool("parallel", false);
-    
-    DocCollection coll = clusterState.getCollection(collectionName);
-    Slice slice = coll.getSlice(shard);
-    if (slice == null) {
-      throw new SolrException(ErrorCode.BAD_REQUEST,
-          "Invalid shard name : " + shard + " in collection : " + collectionName);
-    }
-    Replica replica = slice.getReplica(replicaName);
-    if (replica == null) {
-      ArrayList<String> l = new ArrayList<>();
-      for (Replica r : slice.getReplicas())
-        l.add(r.getName());
-      throw new SolrException(ErrorCode.BAD_REQUEST, "Invalid replica : " + replicaName + " in shard/collection : "
-          + shard + "/" + collectionName + " available replicas are " + StrUtils.join(l, ','));
-    }
-    
-    // If users are being safe and only want to remove a shard if it is down, they can specify onlyIfDown=true
-    // on the command.
-    if (Boolean.parseBoolean(message.getStr(ONLY_IF_DOWN)) && replica.getState() != Replica.State.DOWN) {
-      throw new SolrException(ErrorCode.BAD_REQUEST,
-          "Attempted to remove replica : " + collectionName + "/" + shard + "/" + replicaName
-              + " with onlyIfDown='true', but state is '" + replica.getStr(ZkStateReader.STATE_PROP) + "'");
-    }
-
-    ShardHandler shardHandler = shardHandlerFactory.getShardHandler();
-    String core = replica.getStr(ZkStateReader.CORE_NAME_PROP);
-    String asyncId = message.getStr(ASYNC);
-    AtomicReference<Map<String, String>> requestMap = new AtomicReference<>(null);
-    if (asyncId != null) {
-      requestMap.set(new HashMap<>(1, 1.0f));
-    }
-
-    ModifiableSolrParams params = new ModifiableSolrParams();
-    params.add(CoreAdminParams.ACTION, CoreAdminAction.UNLOAD.toString());
-    params.add(CoreAdminParams.CORE, core);
-
-    params.set(CoreAdminParams.DELETE_INDEX, message.getBool(CoreAdminParams.DELETE_INDEX, true));
-    params.set(CoreAdminParams.DELETE_INSTANCE_DIR, message.getBool(CoreAdminParams.DELETE_INSTANCE_DIR, true));
-    params.set(CoreAdminParams.DELETE_DATA_DIR, message.getBool(CoreAdminParams.DELETE_DATA_DIR, true));
-
-    boolean isLive = zkStateReader.getClusterState().getLiveNodes().contains(replica.getNodeName());
-    if (isLive) {
-      sendShardRequest(replica.getNodeName(), params, shardHandler, asyncId, requestMap.get());
-    }
-
-    Callable<Boolean> callable = () -> {
-      try {
-        if (isLive) {
-          processResponses(results, shardHandler, false, null, asyncId, requestMap.get());
-
-          //check if the core unload removed the corenode zk entry
-          if (waitForCoreNodeGone(collectionName, shard, replicaName, 5000)) return Boolean.TRUE;
-        }
+      throws Exception {
+    ((DeleteReplicaCmd) commandMap.get(DELETEREPLICA)).deleteReplica(clusterState, message, results, onComplete);
 
-        // try and ensure core info is removed from cluster state
-        deleteCoreNode(collectionName, replicaName, replica, core);
-        if (waitForCoreNodeGone(collectionName, shard, replicaName, 30000)) return Boolean.TRUE;
-        return Boolean.FALSE;
-      } catch (Exception e) {
-        results.add("failure", "Could not complete delete " + e.getMessage());
-        throw e;
-      } finally {
-        if (onComplete != null) onComplete.run();
-      }
-    };
-
-    if (!parallel) {
-      try {
-        if (!callable.call())
-          throw new SolrException(ErrorCode.SERVER_ERROR,
-              "Could not  remove replica : " + collectionName + "/" + shard + "/" + replicaName);
-      } catch (InterruptedException | KeeperException e) {
-        throw e;
-      } catch (Exception ex) {
-        throw new SolrException(ErrorCode.UNKNOWN, "Error waiting for corenode gone", ex);
-      }
-
-    } else {
-      tpe.submit(callable);
-    }
   }
 
-  private boolean waitForCoreNodeGone(String collectionName, String shard, String replicaName, int timeoutms) throws InterruptedException {
+  boolean waitForCoreNodeGone(String collectionName, String shard, String replicaName, int timeoutms) throws InterruptedException {
     TimeOut timeout = new TimeOut(timeoutms, TimeUnit.MILLISECONDS);
     boolean deleted = false;
     while (! timeout.hasTimedOut()) {
@@ -722,7 +430,7 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler
     return deleted;
   }
 
-  private void deleteCoreNode(String collectionName, String replicaName, Replica replica, String core) throws KeeperException, InterruptedException {
+  void deleteCoreNode(String collectionName, String replicaName, Replica replica, String core) throws KeeperException, InterruptedException {
     ZkNodeProps m = new ZkNodeProps(
         Overseer.QUEUE_OPERATION, OverseerAction.DELETECORE.toLower(),
         ZkStateReader.CORE_NAME_PROP, core,
@@ -741,73 +449,8 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler
 
   }
 
-  private void deleteCollection(ZkNodeProps message, NamedList results) throws KeeperException, InterruptedException {
-    final String collection = message.getStr(NAME);
-    try {
-      if (zkStateReader.getClusterState().getCollectionOrNull(collection) == null) {
-        if (zkStateReader.getZkClient().exists(ZkStateReader.COLLECTIONS_ZKNODE + "/" + collection, true)) {
-          // if the collection is not in the clusterstate, but is listed in zk, do nothing, it will just
-          // be removed in the finally - we cannot continue, because the below code will error if the collection
-          // is not in the clusterstate
-          return;
-        }
-      }
-      ModifiableSolrParams params = new ModifiableSolrParams();
-      params.set(CoreAdminParams.ACTION, CoreAdminAction.UNLOAD.toString());
-      params.set(CoreAdminParams.DELETE_INSTANCE_DIR, true);
-      params.set(CoreAdminParams.DELETE_DATA_DIR, true);
-
-      String asyncId = message.getStr(ASYNC);
-      Map<String, String> requestMap = null;
-      if (asyncId != null) {
-        requestMap = new HashMap<>();
-      }
-      
-      Set<String> okayExceptions = new HashSet<>(1);
-      okayExceptions.add(NonExistentCoreException.class.getName());
-      
-      collectionCmd(message, params, results, null, asyncId, requestMap, okayExceptions);
-
-      ZkNodeProps m = new ZkNodeProps(Overseer.QUEUE_OPERATION, DELETE.toLower(), NAME, collection);
-      Overseer.getStateUpdateQueue(zkStateReader.getZkClient()).offer(Utils.toJSON(m));
-
-      // wait for a while until we don't see the collection
-      TimeOut timeout = new TimeOut(30, TimeUnit.SECONDS);
-      boolean removed = false;
-      while (! timeout.hasTimedOut()) {
-        Thread.sleep(100);
-        removed = !zkStateReader.getClusterState().hasCollection(collection);
-        if (removed) {
-          Thread.sleep(500); // just a bit of time so it's more likely other
-                             // readers see on return
-          break;
-        }
-      }
-      if (!removed) {
-        throw new SolrException(ErrorCode.SERVER_ERROR,
-            "Could not fully remove collection: " + collection);
-      }
-
-    } finally {
-
-      try {
-        if (zkStateReader.getZkClient().exists(
-            ZkStateReader.COLLECTIONS_ZKNODE + "/" + collection, true)) {
-          zkStateReader.getZkClient().clean(
-              ZkStateReader.COLLECTIONS_ZKNODE + "/" + collection);
-        }
-      } catch (InterruptedException e) {
-        SolrException.log(log, "Cleaning up collection in zk was interrupted:"
-            + collection, e);
-        Thread.currentThread().interrupt();
-      } catch (KeeperException e) {
-        SolrException.log(log, "Problem cleaning up collection in zk:"
-            + collection, e);
-      }
-    }
-  }
-
-  private void migrateStateFormat(ZkNodeProps message, NamedList results)
+  //TODO should we not remove in the next release ?
+  private void migrateStateFormat(ClusterState state, ZkNodeProps message, NamedList results)
       throws KeeperException, InterruptedException {
     final String collectionName = message.getStr(COLLECTION_PROP);
 
@@ -836,549 +479,7 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler
     throw new SolrException(ErrorCode.SERVER_ERROR, "Could not migrate state format for collection: " + collectionName);
   }
 
-  private void createAlias(Aliases aliases, ZkNodeProps message) {
-    String aliasName = message.getStr(NAME);
-    String collections = message.getStr("collections");
-    
-    Map<String,Map<String,String>> newAliasesMap = new HashMap<>();
-    Map<String,String> newCollectionAliasesMap = new HashMap<>();
-    Map<String,String> prevColAliases = aliases.getCollectionAliasMap();
-    if (prevColAliases != null) {
-      newCollectionAliasesMap.putAll(prevColAliases);
-    }
-    newCollectionAliasesMap.put(aliasName, collections);
-    newAliasesMap.put("collection", newCollectionAliasesMap);
-    Aliases newAliases = new Aliases(newAliasesMap);
-    byte[] jsonBytes = null;
-    if (newAliases.collectionAliasSize() > 0) { // only sub map right now
-      jsonBytes = Utils.toJSON(newAliases.getAliasMap());
-    }
-    try {
-      zkStateReader.getZkClient().setData(ZkStateReader.ALIASES, jsonBytes, true);
-      
-      checkForAlias(aliasName, collections);
-      // some fudge for other nodes
-      Thread.sleep(100);
-    } catch (KeeperException e) {
-      log.error("", e);
-      throw new SolrException(ErrorCode.SERVER_ERROR, e);
-    } catch (InterruptedException e) {
-      log.warn("", e);
-      throw new SolrException(ErrorCode.SERVER_ERROR, e);
-    }
-  }
-
-  private void checkForAlias(String name, String value) {
-
-    TimeOut timeout = new TimeOut(30, TimeUnit.SECONDS);
-    boolean success = false;
-    Aliases aliases;
-    while (! timeout.hasTimedOut()) {
-      aliases = zkStateReader.getAliases();
-      String collections = aliases.getCollectionAlias(name);
-      if (collections != null && collections.equals(value)) {
-        success = true;
-        break;
-      }
-    }
-    if (!success) {
-      log.warn("Timeout waiting to be notified of Alias change...");
-    }
-  }
-
-  private void checkForAliasAbsence(String name) {
-
-    TimeOut timeout = new TimeOut(30, TimeUnit.SECONDS);
-    boolean success = false;
-    Aliases aliases = null;
-    while (! timeout.hasTimedOut()) {
-      aliases = zkStateReader.getAliases();
-      String collections = aliases.getCollectionAlias(name);
-      if (collections == null) {
-        success = true;
-        break;
-      }
-    }
-    if (!success) {
-      log.warn("Timeout waiting to be notified of Alias change...");
-    }
-  }
-
-  private void deleteAlias(Aliases aliases, ZkNodeProps message) {
-    String aliasName = message.getStr(NAME);
-
-    Map<String,Map<String,String>> newAliasesMap = new HashMap<>();
-    Map<String,String> newCollectionAliasesMap = new HashMap<>();
-    newCollectionAliasesMap.putAll(aliases.getCollectionAliasMap());
-    newCollectionAliasesMap.remove(aliasName);
-    newAliasesMap.put("collection", newCollectionAliasesMap);
-    Aliases newAliases = new Aliases(newAliasesMap);
-    byte[] jsonBytes = null;
-    if (newAliases.collectionAliasSize() > 0) { // only sub map right now
-      jsonBytes  = Utils.toJSON(newAliases.getAliasMap());
-    }
-    try {
-      zkStateReader.getZkClient().setData(ZkStateReader.ALIASES,
-          jsonBytes, true);
-      checkForAliasAbsence(aliasName);
-      // some fudge for other nodes
-      Thread.sleep(100);
-    } catch (KeeperException e) {
-      log.error("", e);
-      throw new SolrException(ErrorCode.SERVER_ERROR, e);
-    } catch (InterruptedException e) {
-      log.warn("", e);
-      throw new SolrException(ErrorCode.SERVER_ERROR, e);
-    }
-
-  }
-
-  private boolean createShard(ClusterState clusterState, ZkNodeProps message, NamedList results)
-      throws KeeperException, InterruptedException {
-    String collectionName = message.getStr(COLLECTION_PROP);
-    String sliceName = message.getStr(SHARD_ID_PROP);
-
-    log.info("Create shard invoked: {}", message);
-    if (collectionName == null || sliceName == null)
-      throw new SolrException(ErrorCode.BAD_REQUEST, "'collection' and 'shard' are required parameters");
-    int numSlices = 1;
-    
-    ShardHandler shardHandler = shardHandlerFactory.getShardHandler();
-    DocCollection collection = clusterState.getCollection(collectionName);
-    int repFactor = message.getInt(REPLICATION_FACTOR, collection.getInt(REPLICATION_FACTOR, 1));
-    String createNodeSetStr = message.getStr(CREATE_NODE_SET);
-    List<ReplicaCount> sortedNodeList = getNodesForNewReplicas(clusterState, collectionName, sliceName, repFactor,
-        createNodeSetStr, overseer.getZkController().getCoreContainer());
-        
-    Overseer.getStateUpdateQueue(zkStateReader.getZkClient()).offer(Utils.toJSON(message));
-    // wait for a while until we see the shard
-    TimeOut timeout = new TimeOut(30, TimeUnit.SECONDS);
-    boolean created = false;
-    while (! timeout.hasTimedOut()) {
-      Thread.sleep(100);
-      created = zkStateReader.getClusterState().getCollection(collectionName).getSlice(sliceName) != null;
-      if (created) break;
-    }
-    if (!created)
-      throw new SolrException(ErrorCode.SERVER_ERROR, "Could not fully create shard: " + message.getStr(NAME));
-      
-    String configName = message.getStr(COLL_CONF);
-
-    String async = message.getStr(ASYNC);
-    Map<String, String> requestMap = null;
-    if (async != null) {
-      requestMap = new HashMap<>(repFactor, 1.0f);
-    }
-
-    for (int j = 1; j <= repFactor; j++) {
-      String nodeName = sortedNodeList.get(((j - 1)) % sortedNodeList.size()).nodeName;
-      String shardName = collectionName + "_" + sliceName + "_replica" + j;
-      log.info("Creating shard " + shardName + " as part of slice " + sliceName + " of collection " + collectionName
-          + " on " + nodeName);
-          
-      // Need to create new params for each request
-      ModifiableSolrParams params = new ModifiableSolrParams();
-      params.set(CoreAdminParams.ACTION, CoreAdminAction.CREATE.toString());
-      params.set(CoreAdminParams.NAME, shardName);
-      params.set(COLL_CONF, configName);
-      params.set(CoreAdminParams.COLLECTION, collectionName);
-      params.set(CoreAdminParams.SHARD, sliceName);
-      params.set(ZkStateReader.NUM_SHARDS_PROP, numSlices);
-      addPropertyParams(message, params);
-
-      sendShardRequest(nodeName, params, shardHandler, async, requestMap);
-    }
-    
-    processResponses(results, shardHandler, true, "Failed to create shard", async, requestMap, Collections.emptySet());
-    
-    log.info("Finished create command on all shards for collection: " + collectionName);
-    
-    return true;
-  }
-
-
-  private boolean splitShard(ClusterState clusterState, ZkNodeProps message, NamedList results) {
-    String collectionName = message.getStr("collection");
-    String slice = message.getStr(ZkStateReader.SHARD_ID_PROP);
-    
-    log.info("Split shard invoked");
-    String splitKey = message.getStr("split.key");
-    ShardHandler shardHandler = shardHandlerFactory.getShardHandler();
-    
-    DocCollection collection = clusterState.getCollection(collectionName);
-    DocRouter router = collection.getRouter() != null ? collection.getRouter() : DocRouter.DEFAULT;
-    
-    Slice parentSlice;
-    
-    if (slice == null) {
-      if (router instanceof CompositeIdRouter) {
-        Collection<Slice> searchSlices = router.getSearchSlicesSingle(splitKey, new ModifiableSolrParams(), collection);
-        if (searchSlices.isEmpty()) {
-          throw new SolrException(ErrorCode.BAD_REQUEST, "Unable to find an active shard for split.key: " + splitKey);
-        }
-        if (searchSlices.size() > 1) {
-          throw new SolrException(ErrorCode.BAD_REQUEST,
-              "Splitting a split.key: " + splitKey + " which spans multiple shards is not supported");
-        }
-        parentSlice = searchSlices.iterator().next();
-        slice = parentSlice.getName();
-        log.info("Split by route.key: {}, parent shard is: {} ", splitKey, slice);
-      } else {
-        throw new SolrException(ErrorCode.BAD_REQUEST,
-            "Split by route key can only be used with CompositeIdRouter or subclass. Found router: "
-                + router.getClass().getName());
-      }
-    } else {
-      parentSlice = collection.getSlice(slice);
-    }
-    
-    if (parentSlice == null) {
-      // no chance of the collection being null because ClusterState#getCollection(String) would have thrown
-      // an exception already
-      throw new SolrException(ErrorCode.BAD_REQUEST, "No shard with the specified name exists: " + slice);
-    }
-    
-    // find the leader for the shard
-    Replica parentShardLeader = null;
-    try {
-      parentShardLeader = zkStateReader.getLeaderRetry(collectionName, slice, 10000);
-    } catch (InterruptedException e) {
-      Thread.currentThread().interrupt();
-    }
-    
-    DocRouter.Range range = parentSlice.getRange();
-    if (range == null) {
-      range = new PlainIdRouter().fullRange();
-    }
-    
-    List<DocRouter.Range> subRanges = null;
-    String rangesStr = message.getStr(CoreAdminParams.RANGES);
-    if (rangesStr != null) {
-      String[] ranges = rangesStr.split(",");
-      if (ranges.length == 0 || ranges.length == 1) {
-        throw new SolrException(ErrorCode.BAD_REQUEST, "There must be at least two ranges specified to split a shard");
-      } else {
-        subRanges = new ArrayList<>(ranges.length);
-        for (int i = 0; i < ranges.length; i++) {
-          String r = ranges[i];
-          try {
-            subRanges.add(DocRouter.DEFAULT.fromString(r));
-          } catch (Exception e) {
-            throw new SolrException(ErrorCode.BAD_REQUEST, "Exception in parsing hexadecimal hash range: " + r, e);
-          }
-          if (!subRanges.get(i).isSubsetOf(range)) {
-            throw new SolrException(ErrorCode.BAD_REQUEST,
-                "Specified hash range: " + r + " is not a subset of parent shard's range: " + range.toString());
-          }
-        }
-        List<DocRouter.Range> temp = new ArrayList<>(subRanges); // copy to preserve original order
-        Collections.sort(temp);
-        if (!range.equals(new DocRouter.Range(temp.get(0).min, temp.get(temp.size() - 1).max))) {
-          throw new SolrException(ErrorCode.BAD_REQUEST,
-              "Specified hash ranges: " + rangesStr + " do not cover the entire range of parent shard: " + range);
-        }
-        for (int i = 1; i < temp.size(); i++) {
-          if (temp.get(i - 1).max + 1 != temp.get(i).min) {
-            throw new SolrException(ErrorCode.BAD_REQUEST, "Specified hash ranges: " + rangesStr
-                + " either overlap with each other or " + "do not cover the entire range of parent shard: " + range);
-          }
-        }
-      }
-    } else if (splitKey != null) {
-      if (router instanceof CompositeIdRouter) {
-        CompositeIdRouter compositeIdRouter = (CompositeIdRouter) router;
-        subRanges = compositeIdRouter.partitionRangeByKey(splitKey, range);
-        if (subRanges.size() == 1) {
-          throw new SolrException(ErrorCode.BAD_REQUEST, "The split.key: " + splitKey
-              + " has a hash range that is exactly equal to hash range of shard: " + slice);
-        }
-        for (DocRouter.Range subRange : subRanges) {
-          if (subRange.min == subRange.max) {
-            throw new SolrException(ErrorCode.BAD_REQUEST, "The split.key: " + splitKey + " must be a compositeId");
-          }
-        }
-        log.info("Partitioning parent shard " + slice + " range: " + parentSlice.getRange() + " yields: " + subRanges);
-        rangesStr = "";
-        for (int i = 0; i < subRanges.size(); i++) {
-          DocRouter.Range subRange = subRanges.get(i);
-          rangesStr += subRange.toString();
-          if (i < subRanges.size() - 1) rangesStr += ',';
-        }
-      }
-    } else {
-      // todo: fixed to two partitions?
-      subRanges = router.partitionRange(2, range);
-    }
-    
-    try {
-      List<String> subSlices = new ArrayList<>(subRanges.size());
-      List<String> subShardNames = new ArrayList<>(subRanges.size());
-      String nodeName = parentShardLeader.getNodeName();
-      for (int i = 0; i < subRanges.size(); i++) {
-        String subSlice = slice + "_" + i;
-        subSlices.add(subSlice);
-        String subShardName = collectionName + "_" + subSlice + "_replica1";
-        subShardNames.add(subShardName);
-        
-        Slice oSlice = collection.getSlice(subSlice);
-        if (oSlice != null) {
-          final Slice.State state = oSlice.getState();
-          if (state == Slice.State.ACTIVE) {
-            throw new SolrException(ErrorCode.BAD_REQUEST,
-                "Sub-shard: " + subSlice + " exists in active state. Aborting split shard.");
-          } else if (state == Slice.State.CONSTRUCTION || state == Slice.State.RECOVERY) {
-            // delete the shards
-            for (String sub : subSlices) {
-              log.info("Sub-shard: {} already exists therefore requesting its deletion", sub);
-              Map<String,Object> propMap = new HashMap<>();
-              propMap.put(Overseer.QUEUE_OPERATION, "deleteshard");
-              propMap.put(COLLECTION_PROP, collectionName);
-              propMap.put(SHARD_ID_PROP, sub);
-              ZkNodeProps m = new ZkNodeProps(propMap);
-              try {
-                deleteShard(clusterState, m, new NamedList());
-              } catch (Exception e) {
-                throw new SolrException(ErrorCode.SERVER_ERROR, "Unable to delete already existing sub shard: " + sub,
-                    e);
-              }
-            }
-          }
-        }
-      }
-      
-      final String asyncId = message.getStr(ASYNC);
-      Map<String,String> requestMap = new HashMap<>();
-      
-      for (int i = 0; i < subRanges.size(); i++) {
-        String subSlice = subSlices.get(i);
-        String subShardName = subShardNames.get(i);
-        DocRouter.Range subRange = subRanges.get(i);
-        
-        log.info("Creating slice " + subSlice + " of collection " + collectionName + " on " + nodeName);
-        
-        Map<String,Object> propMap = new HashMap<>();
-        propMap.put(Overseer.QUEUE_OPERATION, CREATESHARD.toLower());
-        propMap.put(ZkStateReader.SHARD_ID_PROP, subSlice);
-        propMap.put(ZkStateReader.COLLECTION_PROP, collectionName);
-        propMap.put(ZkStateReader.SHARD_RANGE_PROP, subRange.toString());
-        propMap.put(ZkStateReader.SHARD_STATE_PROP, Slice.State.CONSTRUCTION.toString());
-        propMap.put(ZkStateReader.SHARD_PARENT_PROP, parentSlice.getName());
-        DistributedQueue inQueue = Overseer.getStateUpdateQueue(zkStateReader.getZkClient());
-        inQueue.offer(Utils.toJSON(new ZkNodeProps(propMap)));
-        
-        // wait until we are able to see the new shard in cluster state
-        waitForNewShard(collectionName, subSlice);
-        
-        // refresh cluster state
-        clusterState = zkStateReader.getClusterState();
-        
-        log.info("Adding replica " + subShardName + " as part of slice " + subSlice + " of collection " + collectionName
-            + " on " + nodeName);
-        propMap = new HashMap<>();
-        propMap.put(Overseer.QUEUE_OPERATION, ADDREPLICA.toLower());
-        propMap.put(COLLECTION_PROP, collectionName);
-        propMap.put(SHARD_ID_PROP, subSlice);
-        propMap.put("node", nodeName);
-        propMap.put(CoreAdminParams.NAME, subShardName);
-        // copy over property params:
-        for (String key : message.keySet()) {
-          if (key.startsWith(COLL_PROP_PREFIX)) {
-            propMap.put(key, message.getStr(key));
-          }
-        }
-        // add async param
-        if (asyncId != null) {
-          propMap.put(ASYNC, asyncId);
-        }
-        addReplica(clusterState, new ZkNodeProps(propMap), results, null);
-      }
-
-      processResponses(results, shardHandler, true, "SPLITSHARD failed to create subshard leaders", asyncId, requestMap);
-      
-      for (String subShardName : subShardNames) {
-        // wait for parent leader to acknowledge the sub-shard core
-        log.info("Asking parent leader to wait for: " + subShardName + " to be alive on: " + nodeName);
-        String coreNodeName = waitForCoreNodeName(collectionName, nodeName, subShardName);
-        CoreAdminRequest.WaitForState cmd = new CoreAdminRequest.WaitForState();
-        cmd.setCoreName(subShardName);
-        cmd.setNodeName(nodeName);
-        cmd.setCoreNodeName(coreNodeName);
-        cmd.setState(Replica.State.ACTIVE);
-        cmd.setCheckLive(true);
-        cmd.setOnlyIfLeader(true);
-        
-        ModifiableSolrParams p = new ModifiableSolrParams(cmd.getParams());
-        sendShardRequest(nodeName, p, shardHandler, asyncId, requestMap);
-      }
-
-      processResponses(results, shardHandler, true, "SPLITSHARD timed out waiting for subshard leaders to come up",
-          asyncId, requestMap);
-      
-      log.info("Successfully created all sub-shards for collection " + collectionName + " parent shard: " + slice
-          + " on: " + parentShardLeader);
-          
-      log.info("Splitting shard " + parentShardLeader.getName() + " as part of slice " + slice + " of collection "
-          + collectionName + " on " + parentShardLeader);
-          
-      ModifiableSolrParams params = new ModifiableSolrParams();
-      params.set(CoreAdminParams.ACTION, CoreAdminAction.SPLIT.toString());
-      params.set(CoreAdminParams.CORE, parentShardLeader.getStr("core"));
-      for (int i = 0; i < subShardNames.size(); i++) {
-        String subShardName = subShardNames.get(i);
-        params.add(CoreAdminParams.TARGET_CORE, subShardName);
-      }
-      params.set(CoreAdminParams.RANGES, rangesStr);
-      
-      sendShardRequest(parentShardLeader.getNodeName(), params, shardHandler, asyncId, requestMap);
-
-      processResponses(results, shardHandler, true, "SPLITSHARD failed to invoke SPLIT core admin command", asyncId,
-          requestMap);
-      
-      log.info("Index on shard: " + nodeName + " split into two successfully");
-      
-      // apply buffered updates on sub-shards
-      for (int i = 0; i < subShardNames.size(); i++) {
-        String subShardName = subShardNames.get(i);
-        
-        log.info("Applying buffered updates on : " + subShardName);
-        
-        params = new ModifiableSolrParams();
-        params.set(CoreAdminParams.ACTION, CoreAdminAction.REQUESTAPPLYUPDATES.toString());
-        params.set(CoreAdminParams.NAME, subShardName);
-        
-        sendShardRequest(nodeName, params, shardHandler, asyncId, requestMap);
-      }
-
-      processResponses(results, shardHandler, true, "SPLITSHARD failed while asking sub shard leaders" +
-          " to apply buffered updates", asyncId, requestMap);
-      
-      log.info("Successfully applied buffered updates on : " + subShardNames);
-      
-      // Replica creation for the new Slices
-      
-      // look at the replication factor and see if it matches reality
-      // if it does not, find best nodes to create more cores
-      
-      // TODO: Have replication factor decided in some other way instead of numShards for the parent
-      
-      int repFactor = parentSlice.getReplicas().size();
-      
-      // we need to look at every node and see how many cores it serves
-      // add our new cores to existing nodes serving the least number of cores
-      // but (for now) require that each core goes on a distinct node.
-      
-      // TODO: add smarter options that look at the current number of cores per
-      // node?
-      // for now we just go random
-      Set<String> nodes = clusterState.getLiveNodes();
-      List<String> nodeList = new ArrayList<>(nodes.size());
-      nodeList.addAll(nodes);
-      
-      // TODO: Have maxShardsPerNode param for this operation?
-      
-      // Remove the node that hosts the parent shard for replica creation.
-      nodeList.remove(nodeName);
-      
-      // TODO: change this to handle sharding a slice into > 2 sub-shards.
-
-
-      Map<Position, String> nodeMap = identifyNodes(clusterState,
-          new ArrayList<>(clusterState.getLiveNodes()),
-          new ZkNodeProps(collection.getProperties()),
-          subSlices, repFactor - 1);
-
-      List<Map<String, Object>> replicas = new ArrayList<>((repFactor - 1) * 2);
-
-        for (Map.Entry<Position, String> entry : nodeMap.entrySet()) {
-          String sliceName = entry.getKey().shard;
-          String subShardNodeName = entry.getValue();
-          String shardName = collectionName + "_" + sliceName + "_replica" + (entry.getKey().index);
-
-          log.info("Creating replica shard " + shardName + " as part of slice " + sliceName + " of collection "
-              + collectionName + " on " + subShardNodeName);
-
-          ZkNodeProps props = new ZkNodeProps(Overseer.QUEUE_OPERATION, ADDREPLICA.toLower(),
-              ZkStateReader.COLLECTION_PROP, collectionName,
-              ZkStateReader.SHARD_ID_PROP, sliceName,
-              ZkStateReader.CORE_NAME_PROP, shardName,
-              ZkStateReader.STATE_PROP, Replica.State.DOWN.toString(),
-              ZkStateReader.BASE_URL_PROP, zkStateReader.getBaseUrlForNodeName(subShardNodeName),
-              ZkStateReader.NODE_NAME_PROP, subShardNodeName);
-          Overseer.getStateUpdateQueue(zkStateReader.getZkClient()).offer(Utils.toJSON(props));
-
-          HashMap<String,Object> propMap = new HashMap<>();
-          propMap.put(Overseer.QUEUE_OPERATION, ADDREPLICA.toLower());
-          propMap.put(COLLECTION_PROP, collectionName);
-          propMap.put(SHARD_ID_PROP, sliceName);
-          propMap.put("node", subShardNodeName);
-          propMap.put(CoreAdminParams.NAME, shardName);
-          // copy over property params:
-          for (String key : message.keySet()) {
-            if (key.startsWith(COLL_PROP_PREFIX)) {
-              propMap.put(key, message.getStr(key));
-            }
-          }
-          // add async param
-          if (asyncId != null) {
-            propMap.put(ASYNC, asyncId);
-          }
-          // special flag param to instruct addReplica not to create the replica in cluster state again
-          propMap.put(SKIP_CREATE_REPLICA_IN_CLUSTER_STATE, "true");
-
-          replicas.add(propMap);
-        }
-
-      // we must set the slice state into recovery before actually creating the replica cores
-      // this ensures that the logic inside Overseer to update sub-shard state to 'active'
-      // always gets a chance to execute. See SOLR-7673
-
-      if (repFactor == 1) {
-        // switch sub shard states to 'active'
-        log.info("Replication factor is 1 so switching shard states");
-        DistributedQueue inQueue = Overseer.getStateUpdateQueue(zkStateReader.getZkClient());
-        Map<String,Object> propMap = new HashMap<>();
-        propMap.put(Overseer.QUEUE_OPERATION, OverseerAction.UPDATESHARDSTATE.toLower());
-        propMap.put(slice, Slice.State.INACTIVE.toString());
-        for (String subSlice : subSlices) {
-          propMap.put(subSlice, Slice.State.ACTIVE.toString());
-        }
-        propMap.put(ZkStateReader.COLLECTION_PROP, collectionName);
-        ZkNodeProps m = new ZkNodeProps(propMap);
-        inQueue.offer(Utils.toJSON(m));
-      } else {
-        log.info("Requesting shard state be set to 'recovery'");
-        DistributedQueue inQueue = Overseer.getStateUpdateQueue(zkStateReader.getZkClient());
-        Map<String,Object> propMap = new HashMap<>();
-        propMap.put(Overseer.QUEUE_OPERATION, OverseerAction.UPDATESHARDSTATE.toLower());
-        for (String subSlice : subSlices) {
-          propMap.put(subSlice, Slice.State.RECOVERY.toString());
-        }
-        propMap.put(ZkStateReader.COLLECTION_PROP, collectionName);
-        ZkNodeProps m = new ZkNodeProps(propMap);
-        inQueue.offer(Utils.toJSON(m));
-      }
-
-      // now actually create replica cores on sub shard nodes
-      for (Map<String, Object> replica : replicas) {
-        addReplica(clusterState, new ZkNodeProps(replica), results, null);
-      }
-
-      processResponses(results, shardHandler, true, "SPLITSHARD failed to create subshard replicas", asyncId, requestMap);
-      
-      log.info("Successfully created all replica shards for all sub-slices " + subSlices);
-      
-      commit(results, slice, parentShardLeader);
-      
-      return true;
-    } catch (SolrException e) {
-      throw e;
-    } catch (Exception e) {
-      log.error("Error executing split operation for collection: " + collectionName + " parent shard: " + slice, e);
-      throw new SolrException(ErrorCode.SERVER_ERROR, null, e);
-    }
-  }
-
-  private void commit(NamedList results, String slice, Replica parentShardLeader) {
+  void commit(NamedList results, String slice, Replica parentShardLeader) {
     log.info("Calling soft commit to make sub shard updates visible");
     String coreUrl = new ZkCoreNodeProps(parentShardLeader).getCoreUrl();
     // HttpShardHandler is hard coded to send a QueryRequest hence we go direct
@@ -1406,7 +507,7 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler
     }
   }
 
-  private String waitForCoreNodeName(String collectionName, String msgNodeName, String msgCore) {
+  String waitForCoreNodeName(String collectionName, String msgNodeName, String msgCore) {
     int retryCount = 320;
     while (retryCount-- > 0) {
       Map<String,Slice> slicesMap = zkStateReader.getClusterState()
@@ -1435,7 +536,7 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler
     throw new SolrException(ErrorCode.SERVER_ERROR, "Could not find coreNodeName");
   }
 
-  private void waitForNewShard(String collectionName, String sliceName) throws KeeperException, InterruptedException {
+  void waitForNewShard(String collectionName, String sliceName) throws KeeperException, InterruptedException {
     log.info("Waiting for slice {} of collection {} to be available", sliceName, collectionName);
     RTimer timer = new RTimer();
     int retryCount = 320;
@@ -1459,339 +560,7 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler
     );
   }
 
-  private void deleteShard(ClusterState clusterState, ZkNodeProps message, NamedList results) {
-    String collectionName = message.getStr(ZkStateReader.COLLECTION_PROP);
-    String sliceId = message.getStr(ZkStateReader.SHARD_ID_PROP);
-    
-    log.info("Delete shard invoked");
-    Slice slice = clusterState.getSlice(collectionName, sliceId);
-    
-    if (slice == null) {
-      if (clusterState.hasCollection(collectionName)) {
-        throw new SolrException(ErrorCode.BAD_REQUEST,
-            "No shard with name " + sliceId + " exists for collection " + collectionName);
-      } else {
-        throw new SolrException(ErrorCode.BAD_REQUEST, "No collection with the specified name exists: " + collectionName);
-      }
-    }
-    // For now, only allow for deletions of Inactive slices or custom hashes (range==null).
-    // TODO: Add check for range gaps on Slice deletion
-    final Slice.State state = slice.getState();
-    if (!(slice.getRange() == null || state == Slice.State.INACTIVE || state == Slice.State.RECOVERY
-        || state == Slice.State.CONSTRUCTION)) {
-      throw new SolrException(ErrorCode.BAD_REQUEST, "The slice: " + slice.getName() + " is currently " + state
-          + ". Only non-active (or custom-hashed) slices can be deleted.");
-    }
-    ShardHandler shardHandler = shardHandlerFactory.getShardHandler();
-
-    String asyncId = message.getStr(ASYNC);
-    Map<String, String> requestMap = null;
-    if (asyncId != null) {
-      requestMap = new HashMap<>(slice.getReplicas().size(), 1.0f);
-    }
-    
-    try {
-      ModifiableSolrParams params = new ModifiableSolrParams();
-      params.set(CoreAdminParams.ACTION, CoreAdminAction.UNLOAD.toString());
-      params.set(CoreAdminParams.DELETE_INDEX, message.getBool(CoreAdminParams.DELETE_INDEX, true));
-      params.set(CoreAdminParams.DELETE_INSTANCE_DIR, message.getBool(CoreAdminParams.DELETE_INSTANCE_DIR, true));
-      params.set(CoreAdminParams.DELETE_DATA_DIR, message.getBool(CoreAdminParams.DELETE_DATA_DIR, true));
-
-      sliceCmd(clusterState, params, null, slice, shardHandler, asyncId, requestMap);
-
-      processResponses(results, shardHandler, true, "Failed to delete shard", asyncId, requestMap, Collections.emptySet());
-
-      ZkNodeProps m = new ZkNodeProps(Overseer.QUEUE_OPERATION, DELETESHARD.toLower(), ZkStateReader.COLLECTION_PROP,
-          collectionName, ZkStateReader.SHARD_ID_PROP, sliceId);
-      Overseer.getStateUpdateQueue(zkStateReader.getZkClient()).offer(Utils.toJSON(m));
-      
-      // wait for a while until we don't see the shard
-      TimeOut timeout = new TimeOut(30, TimeUnit.SECONDS);
-      boolean removed = false;
-      while (! timeout.hasTimedOut()) {
-        Thread.sleep(100);
-        DocCollection collection = zkStateReader.getClusterState().getCollection(collectionName);
-        removed = collection.getSlice(sliceId) == null;
-        if (removed) {
-          Thread.sleep(100); // just a bit of time so it's more likely other readers see on return
-          break;
-        }
-      }
-      if (!removed) {
-        throw new SolrException(ErrorCode.SERVER_ERROR,
-            "Could not fully remove collection: " + collectionName + " shard: " + sliceId);
-      }
-      
-      log.info("Successfully deleted collection: " + collectionName + ", shard: " + sliceId);
-      
-    } catch (SolrException e) {
-      throw e;
-    } catch (Exception e) {
-      throw new SolrException(ErrorCode.SERVER_ERROR,
-          "Error executing delete operation for collection: " + collectionName + " shard: " + sliceId, e);
-    }
-  }
-
-  private void migrate(ClusterState clusterState, ZkNodeProps message, NamedList results) throws KeeperException, InterruptedException {
-    String sourceCollectionName = message.getStr("collection");
-    String splitKey = message.getStr("split.key");
-    String targetCollectionName = message.getStr("target.collection");
-    int timeout = message.getInt("forward.timeout", 10 * 60) * 1000;
-
-    DocCollection sourceCollection = clusterState.getCollection(sourceCollectionName);
-    if (sourceCollection == null) {
-      throw new SolrException(ErrorCode.BAD_REQUEST, "Unknown source collection: " + sourceCollectionName);
-    }
-    DocCollection targetCollection = clusterState.getCollection(targetCollectionName);
-    if (targetCollection == null) {
-      throw new SolrException(ErrorCode.BAD_REQUEST, "Unknown target collection: " + sourceCollectionName);
-    }
-    if (!(sourceCollection.getRouter() instanceof CompositeIdRouter))  {
-      throw new SolrException(ErrorCode.BAD_REQUEST, "Source collection must use a compositeId router");
-    }
-    if (!(targetCollection.getRouter() instanceof CompositeIdRouter))  {
-      throw new SolrException(ErrorCode.BAD_REQUEST, "Target collection must use a compositeId router");
-    }
-    CompositeIdRouter sourceRouter = (CompositeIdRouter) sourceCollection.getRouter();
-    CompositeIdRouter targetRouter = (CompositeIdRouter) targetCollection.getRouter();
-    Collection<Slice> sourceSlices = sourceRouter.getSearchSlicesSingle(splitKey, null, sourceCollection);
-    if (sourceSlices.isEmpty()) {
-      throw new SolrException(ErrorCode.BAD_REQUEST,
-          "No active slices available in source collection: " + sourceCollection + "for given split.key: " + splitKey);
-    }
-    Collection<Slice> targetSlices = targetRouter.getSearchSlicesSingle(splitKey, null, targetCollection);
-    if (targetSlices.isEmpty()) {
-      throw new SolrException(ErrorCode.BAD_REQUEST,
-          "No active slices available in target collection: " + targetCollection + "for given split.key: " + splitKey);
-    }
-
-    String asyncId = null;
-    if(message.containsKey(ASYNC) && message.get(ASYNC) != null)
-      asyncId = message.getStr(ASYNC);
-
-    for (Slice sourceSlice : sourceSlices) {
-      for (Slice targetSlice : targetSlices) {
-        log.info("Migrating source shard: {} to target shard: {} for split.key = " + splitKey, sourceSlice, targetSlice);
-        migrateKey(clusterState, sourceCollection, sourceSlice, targetCollection, targetSlice, splitKey,
-            timeout, results, asyncId, message);
-      }
-    }
-  }
-
-  private void migrateKey(ClusterState clusterState, DocCollection sourceCollection, Slice sourceSlice,
-                          DocCollection targetCollection, Slice targetSlice,
-                          String splitKey, int timeout,
-                          NamedList results, String asyncId, ZkNodeProps message) throws KeeperException, InterruptedException {
-    String tempSourceCollectionName = "split_" + sourceSlice.getName() + "_temp_" + targetSlice.getName();
-    if (clusterState.hasCollection(tempSourceCollectionName)) {
-      log.info("Deleting temporary collection: " + tempSourceCollectionName);
-      Map<String, Object> props = makeMap(
-          Overseer.QUEUE_OPERATION, DELETE.toLower(),
-          NAME, tempSourceCollectionName);
-
-      try {
-        deleteCollection(new ZkNodeProps(props), results);
-        clusterState = zkStateReader.getClusterState();
-      } catch (Exception e) {
-        log.warn("Unable to clean up existing temporary collection: " + tempSourceCollectionName, e);
-      }
-    }
-
-    CompositeIdRouter sourceRouter = (CompositeIdRouter) sourceCollection.getRouter();
-    DocRouter.Range keyHashRange = sourceRouter.keyHashRange(splitKey);
-
-    ShardHandler shardHandler = shardHandlerFactory.getShardHandler();
-
-    log.info("Hash range for split.key: {} is: {}", splitKey, keyHashRange);
-    // intersect source range, keyHashRange and target range
-    // this is the range that has to be split from source and transferred to target
-    DocRouter.Range splitRange = intersect(targetSlice.getRange(), intersect(sourceSlice.getRange(), keyHashRange));
-    if (splitRange == null) {
-      log.info("No common hashes between source shard: {} and target shard: {}", sourceSlice.getName(), targetSlice.getName());
-      return;
-    }
-    log.info("Common hash range between source shard: {} and target shard: {} = " + splitRange, sourceSlice.getName(), targetSlice.getName());
-
-    Replica targetLeader = zkStateReader.getLeaderRetry(targetCollection.getName(), targetSlice.getName(), 10000);
-    // For tracking async calls.
-    Map<String, String> requestMap = new HashMap<>();
-
-    log.info("Asking target leader node: " + targetLeader.getNodeName() + " core: "
-        + targetLeader.getStr("core") + " to buffer updates");
-    ModifiableSolrParams params = new ModifiableSolrParams();
-    params.set(CoreAdminParams.ACTION, CoreAdminAction.REQUESTBUFFERUPDATES.toString());
-    params.set(CoreAdminParams.NAME, targetLeader.getStr("core"));
-
-    sendShardRequest(targetLeader.getNodeName(), params, shardHandler, asyncId, requestMap);
-
-    processResponses(results, shardHandler, true, "MIGRATE failed to request node to buffer updates", asyncId, requestMap);
-
-    ZkNodeProps m = new ZkNodeProps(
-        Overseer.QUEUE_OPERATION, OverseerAction.ADDROUTINGRULE.toLower(),
-        COLLECTION_PROP, sourceCollection.getName(),
-        SHARD_ID_PROP, sourceSlice.getName(),
-        "routeKey", SolrIndexSplitter.getRouteKey(splitKey) + "!",
-        "range", splitRange.toString(),
-        "targetCollection", targetCollection.getName(),
-        "expireAt", RoutingRule.makeExpiryAt(timeout));
-    log.info("Adding routing rule: " + m);
-    Overseer.getStateUpdateQueue(zkStateReader.getZkClient()).offer(Utils.toJSON(m));
-
-    // wait for a while until we see the new rule
-    log.info("Waiting to see routing rule updated in clusterstate");
-    TimeOut waitUntil = new TimeOut(60, TimeUnit.SECONDS);
-    boolean added = false;
-    while (! waitUntil.hasTimedOut()) {
-      Thread.sleep(100);
-      sourceCollection = zkStateReader.getClusterState().getCollection(sourceCollection.getName());
-      sourceSlice = sourceCollection.getSlice(sourceSlice.getName());
-      Map<String, RoutingRule> rules = sourceSlice.getRoutingRules();
-      if (rules != null) {
-        RoutingRule rule = rules.get(SolrIndexSplitter.getRouteKey(splitKey) + "!");
-        if (rule != null && rule.getRouteRanges().contains(splitRange)) {
-          added = true;
-          break;
-        }
-      }
-    }
-    if (!added) {
-      throw new SolrException(ErrorCode.SERVER_ERROR, "Could not add routing rule: " + m);
-    }
-
-    log.info("Routing rule added successfully");
-
-    // Create temp core on source shard
-    Replica sourceLeader = zkStateReader.getLeaderRetry(sourceCollection.getName(), sourceSlice.getName(), 10000);
-
-    // create a temporary collection with just one node on the shard leader
-    String configName = zkStateReader.readConfigName(sourceCollection.getName());
-    Map<String, Object> props = makeMap(
-        Overseer.QUEUE_OPERATION, CREATE.toLower(),
-        NAME, tempSourceCollectionName,
-        REPLICATION_FACTOR, 1,
-        NUM_SLICES, 1,
-        COLL_CONF, configName,
-        CREATE_NODE_SET, sourceLeader.getNodeName());
-    if (asyncId != null) {
-      String internalAsyncId = asyncId + Math.abs(System.nanoTime());
-      props.put(ASYNC, internalAsyncId);
-    }
-
-    log.info("Creating temporary collection: " + props);
-    createCollection(clusterState, new ZkNodeProps(props), results);
-    // refresh cluster state
-    clusterState = zkStateReader.getClusterState();
-    Slice tempSourceSlice = clusterState.getCollection(tempSourceCollectionName).getSlices().iterator().next();
-    Replica tempSourceLeader = zkStateReader.getLeaderRetry(tempSourceCollectionName, tempSourceSlice.getName(), 120000);
-
-    String tempCollectionReplica1 = tempSourceCollectionName + "_" + tempSourceSlice.getName() + "_replica1";
-    String coreNodeName = waitForCoreNodeName(tempSourceCollectionName,
-        sourceLeader.getNodeName(), tempCollectionReplica1);
-    // wait for the replicas to be seen as active on temp source leader
-    log.info("Asking source leader to wait for: " + tempCollectionReplica1 + " to be alive on: " + sourceLeader.getNodeName());
-    CoreAdminRequest.WaitForState cmd = new CoreAdminRequest.WaitForState();
-    cmd.setCoreName(tempCollectionReplica1);
-    cmd.setNodeName(sourceLeader.getNodeName());
-    cmd.setCoreNodeName(coreNodeName);
-    cmd.setState(Replica.State.ACTIVE);
-    cmd.setCheckLive(true);
-    cmd.setOnlyIfLeader(true);
-    // we don't want this to happen asynchronously
-    sendShardRequest(tempSourceLeader.getNodeName(), new ModifiableSolrParams(cmd.getParams()), shardHandler, null, null);
-
-    processResponses(results, shardHandler, true, "MIGRATE failed to create temp collection leader" +
-        " or timed out waiting for it to come up", asyncId, requestMap);
-
-    log.info("Asking source leader to split index");
-    params = new ModifiableSolrParams();
-    params.set(CoreAdminParams.ACTION, CoreAdminAction.SPLIT.toString());
-    params.set(CoreAdminParams.CORE, sourceLeader.getStr("core"));
-    params.add(CoreAdminParams.TARGET_CORE, tempSourceLeader.getStr("core"));
-    params.set(CoreAdminParams.RANGES, splitRange.toString());
-    params.set("split.key", splitKey);
-
-    String tempNodeName = sourceLeader.getNodeName();
-
-    sendShardRequest(tempNodeName, params, shardHandler, asyncId, requestMap);
-    processResponses(results, shardHandler, true, "MIGRATE failed to invoke SPLIT core admin command", asyncId, requestMap);
-
-    log.info("Creating a replica of temporary collection: {} on the target leader node: {}",
-        tempSourceCollectionName, targetLeader.getNodeName());
-    String tempCollectionReplica2 = tempSourceCollectionName + "_" + tempSourceSlice.getName() + "_replica2";
-    props = new HashMap<>();
-    props.put(Overseer.QUEUE_OPERATION, ADDREPLICA.toLower());
-    props.put(COLLECTION_PROP, tempSourceCollectionName);
-    props.put(SHARD_ID_PROP, tempSourceSlice.getName());
-    props.put("node", targetLeader.getNodeName());
-    props.put(CoreAdminParams.NAME, tempCollectionReplica2);
-    // copy over property params:
-    for (String key : message.keySet()) {
-      if (key.startsWith(COLL_PROP_PREFIX)) {
-        props.put(key, message.getStr(key));
-      }
-    }
-    // add async param
-    if(asyncId != null) {
-      props.put(ASYNC, asyncId);
-    }
-    addReplica(clusterState, new ZkNodeProps(props), results, null);
-
-    processResponses(results, shardHandler, true, "MIGRATE failed to create replica of " +
-        "temporary collection in target leader node.", asyncId, requestMap);
-
-    coreNodeName = waitForCoreNodeName(tempSourceCollectionName,
-        targetLeader.getNodeName(), tempCollectionReplica2);
-    // wait for the replicas to be seen as active on temp source leader
-    log.info("Asking temp source leader to wait for: " + tempCollectionReplica2 + " to be alive on: " + targetLeader.getNodeName());
-    cmd = new CoreAdminRequest.WaitForState();
-    cmd.setCoreName(tempSourceLeader.getStr("core"));
-    cmd.setNodeName(targetLeader.getNodeName());
-    cmd.setCoreNodeName(coreNodeName);
-    cmd.setState(Replica.State.ACTIVE);
-    cmd.setCheckLive(true);
-    cmd.setOnlyIfLeader(true);
-    params = new ModifiableSolrParams(cmd.getParams());
-
-    sendShardRequest(tempSourceLeader.getNodeName(), params, shardHandler, asyncId, requestMap);
-
-    processResponses(results, shardHandler, true, "MIGRATE failed to create temp collection" +
-        " replica or timed out waiting for them to come up", asyncId, requestMap);
-
-    log.info("Successfully created replica of temp source collection on target leader node");
-
-    log.info("Requesting merge of temp source collection replica to target leader");
-    params = new ModifiableSolrParams();
-    params.set(CoreAdminParams.ACTION, CoreAdminAction.MERGEINDEXES.toString());
-    params.set(CoreAdminParams.CORE, targetLeader.getStr("core"));
-    params.set(CoreAdminParams.SRC_CORE, tempCollectionReplica2);
-
-    sendShardRequest(targetLeader.getNodeName(), params, shardHandler, asyncId, requestMap);
-    String msg = "MIGRATE failed to merge " + tempCollectionReplica2 + " to "
-        + targetLeader.getStr("core") + " on node: " + targetLeader.getNodeName();
-    processResponses(results, shardHandler, true, msg, asyncId, requestMap);
-
-    log.info("Asking target leader to apply buffered updates");
-    params = new ModifiableSolrParams();
-    params.set(CoreAdminParams.ACTION, CoreAdminAction.REQUESTAPPLYUPDATES.toString());
-    params.set(CoreAdminParams.NAME, targetLeader.getStr("core"));
-
-    sendShardRequest(targetLeader.getNodeName(), params, shardHandler, asyncId, requestMap);
-    processResponses(results, shardHandler, true, "MIGRATE failed to request node to apply buffered updates",
-        asyncId, requestMap);
-
-    try {
-      log.info("Deleting temporary collection: " + tempSourceCollectionName);
-      props = makeMap(
-          Overseer.QUEUE_OPERATION, DELETE.toLower(),
-          NAME, tempSourceCollectionName);
-      deleteCollection(new ZkNodeProps(props), results);
-    } catch (Exception e) {
-      log.error("Unable to delete temporary collection: " + tempSourceCollectionName
-          + ". Please remove it manually", e);
-    }
-  }
-
-  private DocRouter.Range intersect(DocRouter.Range a, DocRouter.Range b) {
+  DocRouter.Range intersect(DocRouter.Range a, DocRouter.Range b) {
     if (a == null || b == null || !a.overlaps(b)) {
       return null;
     } else if (a.isSubsetOf(b))
@@ -1805,9 +574,9 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler
     }
   }
 
-  private void sendShardRequest(String nodeName, ModifiableSolrParams params,
-                                ShardHandler shardHandler, String asyncId,
-                                Map<String, String> requestMap) {
+  void sendShardRequest(String nodeName, ModifiableSolrParams params,
+                        ShardHandler shardHandler, String asyncId,
+                        Map<String, String> requestMap) {
     sendShardRequest(nodeName, params, shardHandler, asyncId, requestMap, adminPath, zkStateReader);
 
   }
@@ -1833,7 +602,7 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler
     shardHandler.submit(sreq, replica, sreq.params);
   }
 
-  private void addPropertyParams(ZkNodeProps message, ModifiableSolrParams params) {
+  void addPropertyParams(ZkNodeProps message, ModifiableSolrParams params) {
     // Now add the property.key=value pairs
     for (String key : message.keySet()) {
       if (key.startsWith(COLL_PROP_PREFIX)) {
@@ -1842,7 +611,7 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler
     }
   }
 
-  private void addPropertyParams(ZkNodeProps message, Map<String,Object> map) {
+  void addPropertyParams(ZkNodeProps message, Map<String, Object> map) {
     // Now add the property.key=value pairs
     for (String key : message.keySet()) {
       if (key.startsWith(COLL_PROP_PREFIX)) {
@@ -1851,7 +620,7 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler
     }
   }
 
-  private static List<String> getLiveOrLiveAndCreateNodeSetList(final Set<String> liveNodes, final ZkNodeProps message, final Random random) {
+  static List<String> getLiveOrLiveAndCreateNodeSetList(final Set<String> liveNodes, final ZkNodeProps message, final Random random) {
     // TODO: add smarter options that look at the current number of cores per
     // node?
     // for now we just go random (except when createNodeSet and createNodeSet.shuffle=false are passed in)
@@ -1874,9 +643,10 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler
 
     return nodeList;
   }
-  
-  
-  private void modifyCollection(ZkNodeProps message, NamedList results) throws KeeperException, InterruptedException {
+
+
+  private void modifyCollection(ClusterState clusterState, ZkNodeProps message, NamedList results)
+      throws KeeperException, InterruptedException {
     
     final String collectionName = message.getStr(ZkStateReader.COLLECTION_PROP);
     //the rest of the processing is based on writing cluster state properties
@@ -1888,223 +658,25 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler
       
       boolean isLegacyCloud =  Overseer.isLegacy(zkStateReader);
       createConfNode(configName, collectionName, isLegacyCloud);
-      reloadCollection(new ZkNodeProps(NAME, collectionName), results);
+      reloadCollection(null, new ZkNodeProps(NAME, collectionName), results);
     }
     
     overseer.getStateUpdateQueue(zkStateReader.getZkClient()).offer(Utils.toJSON(message));
   }
 
-  private void createCollection(ClusterState clusterState, ZkNodeProps message, NamedList results) throws KeeperException, InterruptedException {
-    final String collectionName = message.getStr(NAME);
-    log.info("Create collection {}", collectionName);
-    if (clusterState.hasCollection(collectionName)) {
-      throw new SolrException(ErrorCode.BAD_REQUEST, "collection already exists: " + collectionName);
-    }
-
-    String configName = getConfigName(collectionName, message);
-    if (configName == null) {
-      throw new SolrException(ErrorCode.BAD_REQUEST, "No config set found to associate with the collection.");
-    }
-    
-    validateConfigOrThrowSolrException(configName);
-    
-
-    try {
-      // look at the replication factor and see if it matches reality
-      // if it does not, find best nodes to create more cores
-
-      int repFactor = message.getInt(REPLICATION_FACTOR, 1);
-
-      ShardHandler shardHandler = shardHandlerFactory.getShardHandler();
-      final String async = message.getStr(ASYNC);
-
-      Integer numSlices = message.getInt(NUM_SLICES, null);
-      String router = message.getStr("router.name", DocRouter.DEFAULT_NAME);
-      List<String> shardNames = new ArrayList<>();
-      if(ImplicitDocRouter.NAME.equals(router)){
-        ClusterStateMutator.getShardNames(shardNames, message.getStr("shards", null));
-        numSlices = shardNames.size();
-      } else {
-        if (numSlices == null ) {
-          throw new SolrException(ErrorCode.BAD_REQUEST, NUM_SLICES + " is a required param (when using CompositeId router).");
-        }
-        ClusterStateMutator.getShardNames(numSlices, shardNames);
-      }
-
-      int maxShardsPerNode = message.getInt(MAX_SHARDS_PER_NODE, 1);
-
-      if (repFactor <= 0) {
-        throw new SolrException(ErrorCode.BAD_REQUEST, REPLICATION_FACTOR + " must be greater than 0");
-      }
-
-      if (numSlices <= 0) {
-        throw new SolrException(ErrorCode.BAD_REQUEST, NUM_SLICES + " must be > 0");
-      }
-
-      // we need to look at every node and see how many cores it serves
-      // add our new cores to existing nodes serving the least number of cores
-      // but (for now) require that each core goes on a distinct node.
-
-      final List<String> nodeList = getLiveOrLiveAndCreateNodeSetList(clusterState.getLiveNodes(), message, RANDOM);
-      Map<Position, String> positionVsNodes;
-      if (nodeList.isEmpty()) {
-        log.warn("It is unusual to create a collection ("+collectionName+") without cores.");
-
-        positionVsNodes = new HashMap<>();
-      } else {
-        if (repFactor > nodeList.size()) {
-          log.warn("Specified "
-              + REPLICATION_FACTOR
-              + " of "
-              + repFactor
-              + " on collection "
-              + collectionName
-              + " is higher than or equal to the number of Solr instances currently live or live and part of your " + CREATE_NODE_SET + "("
-              + nodeList.size()
-              + "). It's unusual to run two replica of the same slice on the same Solr-instance.");
-        }
-        
-        int maxShardsAllowedToCreate = maxShardsPerNode * nodeList.size();
-        int requestedShardsToCreate = numSlices * repFactor;
-        if (maxShardsAllowedToCreate < requestedShardsToCreate) {
-          throw new SolrException(ErrorCode.BAD_REQUEST, "Cannot create collection " + collectionName + ". Value of "
-              + MAX_SHARDS_PER_NODE + " is " + maxShardsPerNode
-              + ", and the number of nodes currently live or live and part of your "+CREATE_NODE_SET+" is " + nodeList.size()
-              + ". This allows a maximum of " + maxShardsAllowedToCreate
-              + " to be created. Value of " + NUM_SLICES + " is " + numSlices
-              + " and value of " + REPLICATION_FACTOR + " is " + repFactor
-              + ". This requires " + requestedShardsToCreate
-              + " shards to be created (higher than the allowed number)");
-        }
-
-        positionVsNodes = identifyNodes(clusterState, nodeList, message, shardNames, repFactor);
-      }
-
-      boolean isLegacyCloud =  Overseer.isLegacy(zkStateReader);
-
-      createConfNode(configName, collectionName, isLegacyCloud);
-
-      Overseer.getStateUpdateQueue(zkStateReader.getZkClient()).offer(Utils.toJSON(message));
-
-      // wait for a while until we don't see the collection
-      TimeOut waitUntil = new TimeOut(30, TimeUnit.SECONDS);
-      boolean created = false;
-      while (! waitUntil.hasTimedOut()) {
-        Thread.sleep(100);
-        created = zkStateReader.getClusterState().hasCollection(collectionName);
-        if(created) break;
-      }
-      if (!created)
-        throw new SolrException(ErrorCode.SERVER_ERROR, "Could not fully create collection: " + collectionName);
-
-      if (nodeList.isEmpty()) {
-        log.info("Finished create command for collection: {}", collectionName);
-        return;
-      }
-
-      // For tracking async calls.
-      Map<String, String> requestMap = new HashMap<>();
-
-
-      log.info(formatString("Creating SolrCores for new collection {0}, shardNames {1} , replicationFactor : {2}",
-          collectionName, shardNames, repFactor));
-      Map<String,ShardRequest> coresToCreate = new LinkedHashMap<>();
-      for (Map.Entry<Position, String> e : positionVsNodes.entrySet()) {
-        Position position = e.getKey();
-        String nodeName = e.getValue();
-        String coreName = collectionName + "_" + position.shard + "_replica" + (position.index + 1);
-        log.info(formatString("Creating core {0} as part of shard {1} of collection {2} on {3}"
-            , coreName, position.shard, collectionName, nodeName));
-
-
-        String baseUrl = zkStateReader.getBaseUrlForNodeName(nodeName);
-        //in the new mode, create the replica in clusterstate prior to creating the core.
-        // Otherwise the core creation fails
-        if (!isLegacyCloud) {
-          ZkNodeProps props = new ZkNodeProps(
-              Overseer.QUEUE_OPERATION, ADDREPLICA.toString(),
-              ZkStateReader.COLLECTION_PROP, collectionName,
-              ZkStateReader.SHARD_ID_PROP, position.shard,
-              ZkStateReader.CORE_NAME_PROP, coreName,
-              ZkStateReader.STATE_PROP, Replica.State.DOWN.toString(),
-              ZkStateReader.BASE_URL_PROP, baseUrl);
-          Overseer.getStateUpdateQueue(zkStateReader.getZkClient()).offer(Utils.toJSON(props));
-        }
-
-        // Need to create new params for each request
-        ModifiableSolrParams params = new ModifiableSolrParams();
-        params.set(CoreAdminParams.ACTION, CoreAdminAction.CREATE.toString());
-
-        params.set(CoreAdminParams.NAME, coreName);
-        params.set(COLL_CONF, configName);
-        params.set(CoreAdminParams.COLLECTION, collectionName);
-        params.set(CoreAdminParams.SHARD, position.shard);
-        params.set(ZkStateReader.NUM_SHARDS_PROP, numSlices);
-
-        if (async != null) {
-          String coreAdminAsyncId = async + Math.abs(System.nanoTime());
-          params.add(ASYNC, coreAdminAsyncId);
-          requestMap.put(nodeName, coreAdminAsyncId);
-        }
-        addPropertyParams(message, params);
-
-        ShardRequest sreq = new ShardRequest();
-        sreq.nodeName = nodeName;
-        params.set("qt", adminPath);
-        sreq.purpose = 1;
-        sreq.shards = new String[]{baseUrl};
-        sreq.actualShards = sreq.shards;
-        sreq.params = params;
-
-        if (isLegacyCloud) {
-          shardHandler.submit(sreq, sreq.shards[0], sreq.params);
-        } else {
-          coresToCreate.put(coreName, sreq);
-        }
-      }
-
-      if(!isLegacyCloud) {
-        // wait for all replica entries to be created
-        Map<String, Replica> replicas = waitToSeeReplicasInState(collectionName, coresToCreate.keySet());
-        for (Map.Entry<String, ShardRequest> e : coresToCreate.entrySet()) {
-          ShardRequest sreq = e.getValue();
-          sreq.params.set(CoreAdminParams.CORE_NODE_NAME, replicas.get(e.getKey()).getName());
-          shardHandler.submit(sreq, sreq.shards[0], sreq.params);
-        }
-      }
-
-      processResponses(results, shardHandler, false, null, async, requestMap, Collections.emptySet());
-      if(results.get("failure") != null && ((SimpleOrderedMap)results.get("failure")).size() > 0) {
-        // Let's cleanup as we hit an exception
-        // We shouldn't be passing 'results' here for the cleanup as the response would then contain 'success'
-        // element, which may be interpreted by the user as a positive ack
-        cleanupCollection(collectionName, new NamedLi

<TRUNCATED>

[3/3] lucene-solr:master: SOLR-9421: Refactored out OverseerCollectionMessageHandler to smaller classes

Posted by no...@apache.org.
SOLR-9421: Refactored out OverseerCollectionMessageHandler to smaller classes


Project: http://git-wip-us.apache.org/repos/asf/lucene-solr/repo
Commit: http://git-wip-us.apache.org/repos/asf/lucene-solr/commit/bbd1efe5
Tree: http://git-wip-us.apache.org/repos/asf/lucene-solr/tree/bbd1efe5
Diff: http://git-wip-us.apache.org/repos/asf/lucene-solr/diff/bbd1efe5

Branch: refs/heads/master
Commit: bbd1efe5d8f547e0503d2a39abbfd3849019c77f
Parents: 9e1a25e
Author: Noble Paul <no...@apache.org>
Authored: Fri Aug 19 11:12:29 2016 +0530
Committer: Noble Paul <no...@apache.org>
Committed: Fri Aug 19 11:14:25 2016 +0530

----------------------------------------------------------------------
 solr/CHANGES.txt                                |    2 +
 .../org/apache/solr/cloud/AddReplicaCmd.java    |  192 ++
 .../java/org/apache/solr/cloud/BackupCmd.java   |  132 ++
 .../org/apache/solr/cloud/CreateAliasCmd.java   |  101 +
 .../apache/solr/cloud/CreateCollectionCmd.java  |  291 +++
 .../org/apache/solr/cloud/CreateShardCmd.java   |  120 ++
 .../org/apache/solr/cloud/DeleteAliasCmd.java   |   95 +
 .../apache/solr/cloud/DeleteCollectionCmd.java  |  121 ++
 .../org/apache/solr/cloud/DeleteNodeCmd.java    |    3 +-
 .../org/apache/solr/cloud/DeleteReplicaCmd.java |  155 ++
 .../org/apache/solr/cloud/DeleteShardCmd.java   |  126 ++
 .../java/org/apache/solr/cloud/MigrateCmd.java  |  333 +++
 .../cloud/OverseerCollectionMessageHandler.java | 2012 +-----------------
 .../org/apache/solr/cloud/OverseerRoleCmd.java  |  102 +
 .../apache/solr/cloud/OverseerStatusCmd.java    |  122 ++
 .../java/org/apache/solr/cloud/RestoreCmd.java  |  243 +++
 .../org/apache/solr/cloud/SplitShardCmd.java    |  458 ++++
 17 files changed, 2695 insertions(+), 1913 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/bbd1efe5/solr/CHANGES.txt
----------------------------------------------------------------------
diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt
index 7458f46..fccfa43 100644
--- a/solr/CHANGES.txt
+++ b/solr/CHANGES.txt
@@ -282,6 +282,8 @@ Other Changes
 
 * SOLR-9404: Refactor move/renames in JSON FacetProcessor and FacetFieldProcessor. (David Smiley)
 
+* SOLR-9421: Refactored out OverseerCollectionMessageHandler to smaller classes (noble)
+
 ==================  6.1.0 ==================
 
 Consult the LUCENE_CHANGES.txt file for additional, low level, changes in this release.

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/bbd1efe5/solr/core/src/java/org/apache/solr/cloud/AddReplicaCmd.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/AddReplicaCmd.java b/solr/core/src/java/org/apache/solr/cloud/AddReplicaCmd.java
new file mode 100644
index 0000000..6bb3350
--- /dev/null
+++ b/solr/core/src/java/org/apache/solr/cloud/AddReplicaCmd.java
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.cloud;
+
+
+import java.lang.invoke.MethodHandles;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.solr.common.SolrException;
+import org.apache.solr.common.cloud.ClusterState;
+import org.apache.solr.common.cloud.DocCollection;
+import org.apache.solr.common.cloud.Replica;
+import org.apache.solr.common.cloud.Slice;
+import org.apache.solr.common.cloud.ZkNodeProps;
+import org.apache.solr.common.cloud.ZkStateReader;
+import org.apache.solr.common.params.CoreAdminParams;
+import org.apache.solr.common.params.ModifiableSolrParams;
+import org.apache.solr.common.params.ShardParams;
+import org.apache.solr.common.util.NamedList;
+import org.apache.solr.common.util.Utils;
+import org.apache.solr.handler.component.ShardHandler;
+import org.apache.zookeeper.KeeperException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.solr.cloud.Assign.getNodesForNewReplicas;
+import static org.apache.solr.cloud.OverseerCollectionMessageHandler.COLL_CONF;
+import static org.apache.solr.cloud.OverseerCollectionMessageHandler.SKIP_CREATE_REPLICA_IN_CLUSTER_STATE;
+import static org.apache.solr.common.cloud.ZkStateReader.COLLECTION_PROP;
+import static org.apache.solr.common.cloud.ZkStateReader.CORE_NAME_PROP;
+import static org.apache.solr.common.cloud.ZkStateReader.SHARD_ID_PROP;
+import static org.apache.solr.common.params.CollectionParams.CollectionAction.ADDREPLICA;
+import static org.apache.solr.common.params.CommonAdminParams.ASYNC;
+
+public class AddReplicaCmd implements OverseerCollectionMessageHandler.Cmd {
+  private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+  private final OverseerCollectionMessageHandler ocmh;
+
+  public AddReplicaCmd(OverseerCollectionMessageHandler ocmh) {
+    this.ocmh = ocmh;
+  }
+
+  @Override
+  public void call(ClusterState state, ZkNodeProps message, NamedList results) throws Exception {
+    addReplica(ocmh.zkStateReader.getClusterState(), message, results, null);
+  }
+
+  ZkNodeProps addReplica(ClusterState clusterState, ZkNodeProps message, NamedList results, Runnable onComplete)
+      throws KeeperException, InterruptedException {
+    log.info("addReplica() : {}", Utils.toJSONString(message));
+    String collection = message.getStr(COLLECTION_PROP);
+    String node = message.getStr(CoreAdminParams.NODE);
+    String shard = message.getStr(SHARD_ID_PROP);
+    String coreName = message.getStr(CoreAdminParams.NAME);
+    boolean parallel = message.getBool("parallel", false);
+    if (StringUtils.isBlank(coreName)) {
+      coreName = message.getStr(CoreAdminParams.PROPERTY_PREFIX + CoreAdminParams.NAME);
+    }
+
+    final String asyncId = message.getStr(ASYNC);
+
+    DocCollection coll = clusterState.getCollection(collection);
+    if (coll == null) {
+      throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Collection: " + collection + " does not exist");
+    }
+    if (coll.getSlice(shard) == null) {
+      throw new SolrException(SolrException.ErrorCode.BAD_REQUEST,
+          "Collection: " + collection + " shard: " + shard + " does not exist");
+    }
+    ShardHandler shardHandler = ocmh.shardHandlerFactory.getShardHandler();
+    boolean skipCreateReplicaInClusterState = message.getBool(SKIP_CREATE_REPLICA_IN_CLUSTER_STATE, false);
+
+    // Kind of unnecessary, but it does put the logic of whether to override maxShardsPerNode in one place.
+    if (!skipCreateReplicaInClusterState) {
+      node = getNodesForNewReplicas(clusterState, collection, shard, 1, node,
+          ocmh.overseer.getZkController().getCoreContainer()).get(0).nodeName;
+    }
+    log.info("Node Identified {} for creating new replica", node);
+
+    if (!clusterState.liveNodesContain(node)) {
+      throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Node: " + node + " is not live");
+    }
+    if (coreName == null) {
+      coreName = Assign.buildCoreName(coll, shard);
+    } else if (!skipCreateReplicaInClusterState) {
+      //Validate that the core name is unique in that collection
+      for (Slice slice : coll.getSlices()) {
+        for (Replica replica : slice.getReplicas()) {
+          String replicaCoreName = replica.getStr(CORE_NAME_PROP);
+          if (coreName.equals(replicaCoreName)) {
+            throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Another replica with the same core name already exists" +
+                " for this collection");
+          }
+        }
+      }
+    }
+    ModifiableSolrParams params = new ModifiableSolrParams();
+
+    ZkStateReader zkStateReader = ocmh.zkStateReader;
+    if (!Overseer.isLegacy(zkStateReader)) {
+      if (!skipCreateReplicaInClusterState) {
+        ZkNodeProps props = new ZkNodeProps(
+            Overseer.QUEUE_OPERATION, ADDREPLICA.toLower(),
+            ZkStateReader.COLLECTION_PROP, collection,
+            ZkStateReader.SHARD_ID_PROP, shard,
+            ZkStateReader.CORE_NAME_PROP, coreName,
+            ZkStateReader.STATE_PROP, Replica.State.DOWN.toString(),
+            ZkStateReader.BASE_URL_PROP, zkStateReader.getBaseUrlForNodeName(node),
+            ZkStateReader.NODE_NAME_PROP, node);
+        Overseer.getStateUpdateQueue(zkStateReader.getZkClient()).offer(Utils.toJSON(props));
+      }
+      params.set(CoreAdminParams.CORE_NODE_NAME,
+          ocmh.waitToSeeReplicasInState(collection, Collections.singletonList(coreName)).get(coreName).getName());
+    }
+
+    String configName = zkStateReader.readConfigName(collection);
+    String routeKey = message.getStr(ShardParams._ROUTE_);
+    String dataDir = message.getStr(CoreAdminParams.DATA_DIR);
+    String instanceDir = message.getStr(CoreAdminParams.INSTANCE_DIR);
+
+    params.set(CoreAdminParams.ACTION, CoreAdminParams.CoreAdminAction.CREATE.toString());
+    params.set(CoreAdminParams.NAME, coreName);
+    params.set(COLL_CONF, configName);
+    params.set(CoreAdminParams.COLLECTION, collection);
+    if (shard != null) {
+      params.set(CoreAdminParams.SHARD, shard);
+    } else if (routeKey != null) {
+      Collection<Slice> slices = coll.getRouter().getSearchSlicesSingle(routeKey, null, coll);
+      if (slices.isEmpty()) {
+        throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "No active shard serving _route_=" + routeKey + " found");
+      } else {
+        params.set(CoreAdminParams.SHARD, slices.iterator().next().getName());
+      }
+    } else {
+      throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Specify either 'shard' or _route_ param");
+    }
+    if (dataDir != null) {
+      params.set(CoreAdminParams.DATA_DIR, dataDir);
+    }
+    if (instanceDir != null) {
+      params.set(CoreAdminParams.INSTANCE_DIR, instanceDir);
+    }
+    ocmh.addPropertyParams(message, params);
+
+    // For tracking async calls.
+    Map<String,String> requestMap = new HashMap<>();
+    ocmh.sendShardRequest(node, params, shardHandler, asyncId, requestMap);
+
+    final String fnode = node;
+    final String fcoreName = coreName;
+
+    Runnable runnable = () -> {
+      ocmh.processResponses(results, shardHandler, true, "ADDREPLICA failed to create replica", asyncId, requestMap);
+      ocmh.waitForCoreNodeName(collection, fnode, fcoreName);
+      if (onComplete != null) onComplete.run();
+    };
+
+    if (!parallel) {
+      runnable.run();
+    } else {
+      ocmh.tpe.submit(runnable);
+    }
+
+
+    return new ZkNodeProps(
+        ZkStateReader.COLLECTION_PROP, collection,
+        ZkStateReader.SHARD_ID_PROP, shard,
+        ZkStateReader.CORE_NAME_PROP, coreName,
+        ZkStateReader.NODE_NAME_PROP, node
+    );
+  }
+}

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/bbd1efe5/solr/core/src/java/org/apache/solr/cloud/BackupCmd.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/BackupCmd.java b/solr/core/src/java/org/apache/solr/cloud/BackupCmd.java
new file mode 100644
index 0000000..679cb07
--- /dev/null
+++ b/solr/core/src/java/org/apache/solr/cloud/BackupCmd.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.cloud;
+
+import java.lang.invoke.MethodHandles;
+import java.net.URI;
+import java.time.Instant;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+
+import org.apache.solr.common.SolrException;
+import org.apache.solr.common.cloud.ClusterState;
+import org.apache.solr.common.cloud.DocCollection;
+import org.apache.solr.common.cloud.Replica;
+import org.apache.solr.common.cloud.Slice;
+import org.apache.solr.common.cloud.ZkNodeProps;
+import org.apache.solr.common.params.CoreAdminParams;
+import org.apache.solr.common.params.ModifiableSolrParams;
+import org.apache.solr.common.util.NamedList;
+import org.apache.solr.core.CoreContainer;
+import org.apache.solr.core.backup.BackupManager;
+import org.apache.solr.core.backup.repository.BackupRepository;
+import org.apache.solr.handler.component.ShardHandler;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.solr.cloud.OverseerCollectionMessageHandler.COLL_CONF;
+import static org.apache.solr.common.cloud.ZkStateReader.COLLECTION_PROP;
+import static org.apache.solr.common.cloud.ZkStateReader.CORE_NAME_PROP;
+import static org.apache.solr.common.params.CommonAdminParams.ASYNC;
+import static org.apache.solr.common.params.CommonParams.NAME;
+
+public class BackupCmd implements OverseerCollectionMessageHandler.Cmd {
+  private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+  private final OverseerCollectionMessageHandler ocmh;
+
+  public BackupCmd(OverseerCollectionMessageHandler ocmh) {
+    this.ocmh = ocmh;
+  }
+
+  @Override
+  public void call(ClusterState state, ZkNodeProps message, NamedList results) throws Exception {
+    String collectionName = message.getStr(COLLECTION_PROP);
+    String backupName = message.getStr(NAME);
+    ShardHandler shardHandler = ocmh.shardHandlerFactory.getShardHandler();
+    String asyncId = message.getStr(ASYNC);
+    String repo = message.getStr(CoreAdminParams.BACKUP_REPOSITORY);
+    String location = message.getStr(CoreAdminParams.BACKUP_LOCATION);
+
+    Map<String, String> requestMap = new HashMap<>();
+    Instant startTime = Instant.now();
+
+    CoreContainer cc = ocmh.overseer.getZkController().getCoreContainer();
+    BackupRepository repository = cc.newBackupRepository(Optional.ofNullable(repo));
+    BackupManager backupMgr = new BackupManager(repository, ocmh.zkStateReader, collectionName);
+
+    // Backup location
+    URI backupPath = repository.createURI(location, backupName);
+
+    //Validating if the directory already exists.
+    if (repository.exists(backupPath)) {
+      throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "The backup directory already exists: " + backupPath);
+    }
+
+    // Create a directory to store backup details.
+    repository.createDirectory(backupPath);
+
+    log.info("Starting backup of collection={} with backupName={} at location={}", collectionName, backupName,
+        backupPath);
+
+    for (Slice slice : ocmh.zkStateReader.getClusterState().getCollection(collectionName).getActiveSlices()) {
+      Replica replica = slice.getLeader();
+
+      String coreName = replica.getStr(CORE_NAME_PROP);
+
+      ModifiableSolrParams params = new ModifiableSolrParams();
+      params.set(CoreAdminParams.ACTION, CoreAdminParams.CoreAdminAction.BACKUPCORE.toString());
+      params.set(NAME, slice.getName());
+      params.set(CoreAdminParams.BACKUP_REPOSITORY, repo);
+      params.set(CoreAdminParams.BACKUP_LOCATION, backupPath.getPath()); // note: index dir will be here then the "snapshot." + slice name
+      params.set(CORE_NAME_PROP, coreName);
+
+      ocmh.sendShardRequest(replica.getNodeName(), params, shardHandler, asyncId, requestMap);
+      log.debug("Sent backup request to core={} for backupName={}", coreName, backupName);
+    }
+    log.debug("Sent backup requests to all shard leaders for backupName={}", backupName);
+
+    ocmh.processResponses(results, shardHandler, true, "Could not backup all replicas", asyncId, requestMap);
+
+    log.info("Starting to backup ZK data for backupName={}", backupName);
+
+    //Download the configs
+    String configName = ocmh.zkStateReader.readConfigName(collectionName);
+    backupMgr.downloadConfigDir(location, backupName, configName);
+
+    //Save the collection's state. Can be part of the monolithic clusterstate.json or a individual state.json
+    //Since we don't want to distinguish we extract the state and back it up as a separate json
+    DocCollection collectionState = ocmh.zkStateReader.getClusterState().getCollection(collectionName);
+    backupMgr.writeCollectionState(location, backupName, collectionName, collectionState);
+
+    Properties properties = new Properties();
+
+    properties.put(BackupManager.BACKUP_NAME_PROP, backupName);
+    properties.put(BackupManager.COLLECTION_NAME_PROP, collectionName);
+    properties.put(COLL_CONF, configName);
+    properties.put(BackupManager.START_TIME_PROP, startTime.toString());
+    //TODO: Add MD5 of the configset. If during restore the same name configset exists then we can compare checksums to see if they are the same.
+    //if they are not the same then we can throw an error or have an 'overwriteConfig' flag
+    //TODO save numDocs for the shardLeader. We can use it to sanity check the restore.
+
+    backupMgr.writeBackupProperties(location, backupName, properties);
+
+    log.info("Completed backing up ZK data for backupName={}", backupName);
+  }
+}

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/bbd1efe5/solr/core/src/java/org/apache/solr/cloud/CreateAliasCmd.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/CreateAliasCmd.java b/solr/core/src/java/org/apache/solr/cloud/CreateAliasCmd.java
new file mode 100644
index 0000000..b966ebd
--- /dev/null
+++ b/solr/core/src/java/org/apache/solr/cloud/CreateAliasCmd.java
@@ -0,0 +1,101 @@
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.cloud;
+
+import java.lang.invoke.MethodHandles;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.solr.cloud.OverseerCollectionMessageHandler.Cmd;
+import org.apache.solr.common.SolrException;
+import org.apache.solr.common.cloud.Aliases;
+import org.apache.solr.common.cloud.ClusterState;
+import org.apache.solr.common.cloud.ZkNodeProps;
+import org.apache.solr.common.cloud.ZkStateReader;
+import org.apache.solr.common.util.NamedList;
+import org.apache.solr.common.util.Utils;
+import org.apache.solr.util.TimeOut;
+import org.apache.zookeeper.KeeperException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.solr.common.params.CommonParams.NAME;
+
+
+public class CreateAliasCmd implements Cmd {
+  private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+  private final OverseerCollectionMessageHandler ocmh;
+
+  public CreateAliasCmd(OverseerCollectionMessageHandler ocmh) {
+    this.ocmh = ocmh;
+  }
+
+  @Override
+  public void call(ClusterState state, ZkNodeProps message, NamedList results)
+      throws Exception {
+    String aliasName = message.getStr(NAME);
+    String collections = message.getStr("collections");
+
+    Map<String, Map<String, String>> newAliasesMap = new HashMap<>();
+    Map<String, String> newCollectionAliasesMap = new HashMap<>();
+    ZkStateReader zkStateReader = ocmh.zkStateReader;
+    Map<String, String> prevColAliases = zkStateReader.getAliases().getCollectionAliasMap();
+    if (prevColAliases != null) {
+      newCollectionAliasesMap.putAll(prevColAliases);
+    }
+    newCollectionAliasesMap.put(aliasName, collections);
+    newAliasesMap.put("collection", newCollectionAliasesMap);
+    Aliases newAliases = new Aliases(newAliasesMap);
+    byte[] jsonBytes = null;
+    if (newAliases.collectionAliasSize() > 0) { // only sub map right now
+      jsonBytes = Utils.toJSON(newAliases.getAliasMap());
+    }
+    try {
+      zkStateReader.getZkClient().setData(ZkStateReader.ALIASES, jsonBytes, true);
+
+      checkForAlias(aliasName, collections);
+      // some fudge for other nodes
+      Thread.sleep(100);
+    } catch (KeeperException e) {
+      log.error("", e);
+      throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e);
+    } catch (InterruptedException e) {
+      log.warn("", e);
+      throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e);
+    }
+  }
+
+  private void checkForAlias(String name, String value) {
+
+    TimeOut timeout = new TimeOut(30, TimeUnit.SECONDS);
+    boolean success = false;
+    Aliases aliases;
+    while (!timeout.hasTimedOut()) {
+      aliases = ocmh.zkStateReader.getAliases();
+      String collections = aliases.getCollectionAlias(name);
+      if (collections != null && collections.equals(value)) {
+        success = true;
+        break;
+      }
+    }
+    if (!success) {
+      log.warn("Timeout waiting to be notified of Alias change...");
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/bbd1efe5/solr/core/src/java/org/apache/solr/cloud/CreateCollectionCmd.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/CreateCollectionCmd.java b/solr/core/src/java/org/apache/solr/cloud/CreateCollectionCmd.java
new file mode 100644
index 0000000..7f28600
--- /dev/null
+++ b/solr/core/src/java/org/apache/solr/cloud/CreateCollectionCmd.java
@@ -0,0 +1,291 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.cloud;
+
+
+import java.lang.invoke.MethodHandles;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.solr.cloud.OverseerCollectionMessageHandler.Cmd;
+import org.apache.solr.cloud.overseer.ClusterStateMutator;
+import org.apache.solr.cloud.rule.ReplicaAssigner;
+import org.apache.solr.common.SolrException;
+import org.apache.solr.common.cloud.ClusterState;
+import org.apache.solr.common.cloud.DocRouter;
+import org.apache.solr.common.cloud.ImplicitDocRouter;
+import org.apache.solr.common.cloud.Replica;
+import org.apache.solr.common.cloud.ZkConfigManager;
+import org.apache.solr.common.cloud.ZkNodeProps;
+import org.apache.solr.common.cloud.ZkStateReader;
+import org.apache.solr.common.params.CoreAdminParams;
+import org.apache.solr.common.params.ModifiableSolrParams;
+import org.apache.solr.common.util.NamedList;
+import org.apache.solr.common.util.SimpleOrderedMap;
+import org.apache.solr.common.util.Utils;
+import org.apache.solr.handler.component.ShardHandler;
+import org.apache.solr.handler.component.ShardRequest;
+import org.apache.solr.util.TimeOut;
+import org.apache.zookeeper.KeeperException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.solr.cloud.OverseerCollectionMessageHandler.COLL_CONF;
+import static org.apache.solr.cloud.OverseerCollectionMessageHandler.CREATE_NODE_SET;
+import static org.apache.solr.cloud.OverseerCollectionMessageHandler.NUM_SLICES;
+import static org.apache.solr.cloud.OverseerCollectionMessageHandler.RANDOM;
+import static org.apache.solr.common.cloud.ZkStateReader.MAX_SHARDS_PER_NODE;
+import static org.apache.solr.common.cloud.ZkStateReader.REPLICATION_FACTOR;
+import static org.apache.solr.common.params.CollectionParams.CollectionAction.ADDREPLICA;
+import static org.apache.solr.common.params.CommonAdminParams.ASYNC;
+import static org.apache.solr.common.params.CommonParams.NAME;
+import static org.apache.solr.common.util.StrUtils.formatString;
+
+public class CreateCollectionCmd implements Cmd {
+  private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+  private final OverseerCollectionMessageHandler ocmh;
+
+  public CreateCollectionCmd(OverseerCollectionMessageHandler ocmh) {
+    this.ocmh = ocmh;
+  }
+
+  @Override
+  public void call(ClusterState clusterState, ZkNodeProps message, NamedList results) throws Exception {
+    final String collectionName = message.getStr(NAME);
+    log.info("Create collection {}", collectionName);
+    if (clusterState.hasCollection(collectionName)) {
+      throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "collection already exists: " + collectionName);
+    }
+
+    String configName = getConfigName(collectionName, message);
+    if (configName == null) {
+      throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "No config set found to associate with the collection.");
+    }
+
+    ocmh.validateConfigOrThrowSolrException(configName);
+
+
+    try {
+      // look at the replication factor and see if it matches reality
+      // if it does not, find best nodes to create more cores
+
+      int repFactor = message.getInt(REPLICATION_FACTOR, 1);
+
+      ShardHandler shardHandler = ocmh.shardHandlerFactory.getShardHandler();
+      final String async = message.getStr(ASYNC);
+
+      Integer numSlices = message.getInt(NUM_SLICES, null);
+      String router = message.getStr("router.name", DocRouter.DEFAULT_NAME);
+      List<String> shardNames = new ArrayList<>();
+      if(ImplicitDocRouter.NAME.equals(router)){
+        ClusterStateMutator.getShardNames(shardNames, message.getStr("shards", null));
+        numSlices = shardNames.size();
+      } else {
+        if (numSlices == null ) {
+          throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, NUM_SLICES + " is a required param (when using CompositeId router).");
+        }
+        ClusterStateMutator.getShardNames(numSlices, shardNames);
+      }
+
+      int maxShardsPerNode = message.getInt(MAX_SHARDS_PER_NODE, 1);
+
+      if (repFactor <= 0) {
+        throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, REPLICATION_FACTOR + " must be greater than 0");
+      }
+
+      if (numSlices <= 0) {
+        throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, NUM_SLICES + " must be > 0");
+      }
+
+      // we need to look at every node and see how many cores it serves
+      // add our new cores to existing nodes serving the least number of cores
+      // but (for now) require that each core goes on a distinct node.
+
+      final List<String> nodeList = OverseerCollectionMessageHandler.getLiveOrLiveAndCreateNodeSetList(clusterState.getLiveNodes(), message, RANDOM);
+      Map<ReplicaAssigner.Position, String> positionVsNodes;
+      if (nodeList.isEmpty()) {
+        log.warn("It is unusual to create a collection ("+collectionName+") without cores.");
+
+        positionVsNodes = new HashMap<>();
+      } else {
+        if (repFactor > nodeList.size()) {
+          log.warn("Specified "
+              + REPLICATION_FACTOR
+              + " of "
+              + repFactor
+              + " on collection "
+              + collectionName
+              + " is higher than or equal to the number of Solr instances currently live or live and part of your " + CREATE_NODE_SET + "("
+              + nodeList.size()
+              + "). It's unusual to run two replica of the same slice on the same Solr-instance.");
+        }
+
+        int maxShardsAllowedToCreate = maxShardsPerNode * nodeList.size();
+        int requestedShardsToCreate = numSlices * repFactor;
+        if (maxShardsAllowedToCreate < requestedShardsToCreate) {
+          throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Cannot create collection " + collectionName + ". Value of "
+              + MAX_SHARDS_PER_NODE + " is " + maxShardsPerNode
+              + ", and the number of nodes currently live or live and part of your "+CREATE_NODE_SET+" is " + nodeList.size()
+              + ". This allows a maximum of " + maxShardsAllowedToCreate
+              + " to be created. Value of " + NUM_SLICES + " is " + numSlices
+              + " and value of " + REPLICATION_FACTOR + " is " + repFactor
+              + ". This requires " + requestedShardsToCreate
+              + " shards to be created (higher than the allowed number)");
+        }
+
+        positionVsNodes = ocmh.identifyNodes(clusterState, nodeList, message, shardNames, repFactor);
+      }
+
+      ZkStateReader zkStateReader = ocmh.zkStateReader;
+      boolean isLegacyCloud =  Overseer.isLegacy(zkStateReader);
+
+      ocmh.createConfNode(configName, collectionName, isLegacyCloud);
+
+      Overseer.getStateUpdateQueue(zkStateReader.getZkClient()).offer(Utils.toJSON(message));
+
+      // wait for a while until we don't see the collection
+      TimeOut waitUntil = new TimeOut(30, TimeUnit.SECONDS);
+      boolean created = false;
+      while (! waitUntil.hasTimedOut()) {
+        Thread.sleep(100);
+        created = zkStateReader.getClusterState().hasCollection(collectionName);
+        if(created) break;
+      }
+      if (!created)
+        throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Could not fully create collection: " + collectionName);
+
+      if (nodeList.isEmpty()) {
+        log.info("Finished create command for collection: {}", collectionName);
+        return;
+      }
+
+      // For tracking async calls.
+      Map<String, String> requestMap = new HashMap<>();
+
+
+      log.info(formatString("Creating SolrCores for new collection {0}, shardNames {1} , replicationFactor : {2}",
+          collectionName, shardNames, repFactor));
+      Map<String,ShardRequest> coresToCreate = new LinkedHashMap<>();
+      for (Map.Entry<ReplicaAssigner.Position, String> e : positionVsNodes.entrySet()) {
+        ReplicaAssigner.Position position = e.getKey();
+        String nodeName = e.getValue();
+        String coreName = collectionName + "_" + position.shard + "_replica" + (position.index + 1);
+        log.info(formatString("Creating core {0} as part of shard {1} of collection {2} on {3}"
+            , coreName, position.shard, collectionName, nodeName));
+
+
+        String baseUrl = zkStateReader.getBaseUrlForNodeName(nodeName);
+        //in the new mode, create the replica in clusterstate prior to creating the core.
+        // Otherwise the core creation fails
+        if (!isLegacyCloud) {
+          ZkNodeProps props = new ZkNodeProps(
+              Overseer.QUEUE_OPERATION, ADDREPLICA.toString(),
+              ZkStateReader.COLLECTION_PROP, collectionName,
+              ZkStateReader.SHARD_ID_PROP, position.shard,
+              ZkStateReader.CORE_NAME_PROP, coreName,
+              ZkStateReader.STATE_PROP, Replica.State.DOWN.toString(),
+              ZkStateReader.BASE_URL_PROP, baseUrl);
+          Overseer.getStateUpdateQueue(zkStateReader.getZkClient()).offer(Utils.toJSON(props));
+        }
+
+        // Need to create new params for each request
+        ModifiableSolrParams params = new ModifiableSolrParams();
+        params.set(CoreAdminParams.ACTION, CoreAdminParams.CoreAdminAction.CREATE.toString());
+
+        params.set(CoreAdminParams.NAME, coreName);
+        params.set(COLL_CONF, configName);
+        params.set(CoreAdminParams.COLLECTION, collectionName);
+        params.set(CoreAdminParams.SHARD, position.shard);
+        params.set(ZkStateReader.NUM_SHARDS_PROP, numSlices);
+
+        if (async != null) {
+          String coreAdminAsyncId = async + Math.abs(System.nanoTime());
+          params.add(ASYNC, coreAdminAsyncId);
+          requestMap.put(nodeName, coreAdminAsyncId);
+        }
+        ocmh.addPropertyParams(message, params);
+
+        ShardRequest sreq = new ShardRequest();
+        sreq.nodeName = nodeName;
+        params.set("qt", ocmh.adminPath);
+        sreq.purpose = 1;
+        sreq.shards = new String[]{baseUrl};
+        sreq.actualShards = sreq.shards;
+        sreq.params = params;
+
+        if (isLegacyCloud) {
+          shardHandler.submit(sreq, sreq.shards[0], sreq.params);
+        } else {
+          coresToCreate.put(coreName, sreq);
+        }
+      }
+
+      if(!isLegacyCloud) {
+        // wait for all replica entries to be created
+        Map<String, Replica> replicas = ocmh.waitToSeeReplicasInState(collectionName, coresToCreate.keySet());
+        for (Map.Entry<String, ShardRequest> e : coresToCreate.entrySet()) {
+          ShardRequest sreq = e.getValue();
+          sreq.params.set(CoreAdminParams.CORE_NODE_NAME, replicas.get(e.getKey()).getName());
+          shardHandler.submit(sreq, sreq.shards[0], sreq.params);
+        }
+      }
+
+      ocmh.processResponses(results, shardHandler, false, null, async, requestMap, Collections.emptySet());
+      if(results.get("failure") != null && ((SimpleOrderedMap)results.get("failure")).size() > 0) {
+        // Let's cleanup as we hit an exception
+        // We shouldn't be passing 'results' here for the cleanup as the response would then contain 'success'
+        // element, which may be interpreted by the user as a positive ack
+        ocmh.cleanupCollection(collectionName, new NamedList());
+        log.info("Cleaned up  artifacts for failed create collection for [" + collectionName + "]");
+      } else {
+        log.debug("Finished create command on all shards for collection: "
+            + collectionName);
+      }
+    } catch (SolrException ex) {
+      throw ex;
+    } catch (Exception ex) {
+      throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, null, ex);
+    }
+  }
+  String getConfigName(String coll, ZkNodeProps message) throws KeeperException, InterruptedException {
+    String configName = message.getStr(COLL_CONF);
+
+    if (configName == null) {
+      // if there is only one conf, use that
+      List<String> configNames = null;
+      try {
+        configNames = ocmh.zkStateReader.getZkClient().getChildren(ZkConfigManager.CONFIGS_ZKNODE, null, true);
+        if (configNames != null && configNames.size() == 1) {
+          configName = configNames.get(0);
+          // no config set named, but there is only 1 - use it
+          log.info("Only one config set found in zk - using it:" + configName);
+        } else if (configNames.contains(coll)) {
+          configName = coll;
+        }
+      } catch (KeeperException.NoNodeException e) {
+
+      }
+    }
+    return configName;
+  }
+}

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/bbd1efe5/solr/core/src/java/org/apache/solr/cloud/CreateShardCmd.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/CreateShardCmd.java b/solr/core/src/java/org/apache/solr/cloud/CreateShardCmd.java
new file mode 100644
index 0000000..3d5aa41
--- /dev/null
+++ b/solr/core/src/java/org/apache/solr/cloud/CreateShardCmd.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.cloud;
+
+
+import java.lang.invoke.MethodHandles;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.solr.cloud.OverseerCollectionMessageHandler.Cmd;
+import org.apache.solr.common.SolrException;
+import org.apache.solr.common.cloud.ClusterState;
+import org.apache.solr.common.cloud.DocCollection;
+import org.apache.solr.common.cloud.ZkNodeProps;
+import org.apache.solr.common.cloud.ZkStateReader;
+import org.apache.solr.common.params.CoreAdminParams;
+import org.apache.solr.common.params.ModifiableSolrParams;
+import org.apache.solr.common.util.NamedList;
+import org.apache.solr.common.util.Utils;
+import org.apache.solr.handler.component.ShardHandler;
+import org.apache.solr.util.TimeOut;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.solr.cloud.Assign.getNodesForNewReplicas;
+import static org.apache.solr.cloud.OverseerCollectionMessageHandler.COLL_CONF;
+import static org.apache.solr.common.cloud.ZkStateReader.COLLECTION_PROP;
+import static org.apache.solr.common.cloud.ZkStateReader.REPLICATION_FACTOR;
+import static org.apache.solr.common.cloud.ZkStateReader.SHARD_ID_PROP;
+import static org.apache.solr.common.params.CommonAdminParams.ASYNC;
+import static org.apache.solr.common.params.CommonParams.NAME;
+
+public class CreateShardCmd implements Cmd {
+  private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+  private final OverseerCollectionMessageHandler ocmh;
+
+  public CreateShardCmd(OverseerCollectionMessageHandler ocmh) {
+    this.ocmh = ocmh;
+  }
+
+  @Override
+  public void call(ClusterState clusterState, ZkNodeProps message, NamedList results) throws Exception {
+    String collectionName = message.getStr(COLLECTION_PROP);
+    String sliceName = message.getStr(SHARD_ID_PROP);
+
+    log.info("Create shard invoked: {}", message);
+    if (collectionName == null || sliceName == null)
+      throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "'collection' and 'shard' are required parameters");
+    int numSlices = 1;
+
+    ShardHandler shardHandler = ocmh.shardHandlerFactory.getShardHandler();
+    DocCollection collection = clusterState.getCollection(collectionName);
+    int repFactor = message.getInt(REPLICATION_FACTOR, collection.getInt(REPLICATION_FACTOR, 1));
+    String createNodeSetStr = message.getStr(OverseerCollectionMessageHandler.CREATE_NODE_SET);
+    List<Assign.ReplicaCount> sortedNodeList = getNodesForNewReplicas(clusterState, collectionName, sliceName, repFactor,
+        createNodeSetStr, ocmh.overseer.getZkController().getCoreContainer());
+
+    ZkStateReader zkStateReader = ocmh.zkStateReader;
+    Overseer.getStateUpdateQueue(zkStateReader.getZkClient()).offer(Utils.toJSON(message));
+    // wait for a while until we see the shard
+    TimeOut timeout = new TimeOut(30, TimeUnit.SECONDS);
+    boolean created = false;
+    while (!timeout.hasTimedOut()) {
+      Thread.sleep(100);
+      created = zkStateReader.getClusterState().getCollection(collectionName).getSlice(sliceName) != null;
+      if (created) break;
+    }
+    if (!created)
+      throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Could not fully create shard: " + message.getStr(NAME));
+
+    String configName = message.getStr(COLL_CONF);
+
+    String async = message.getStr(ASYNC);
+    Map<String, String> requestMap = null;
+    if (async != null) {
+      requestMap = new HashMap<>(repFactor, 1.0f);
+    }
+
+    for (int j = 1; j <= repFactor; j++) {
+      String nodeName = sortedNodeList.get(((j - 1)) % sortedNodeList.size()).nodeName;
+      String shardName = collectionName + "_" + sliceName + "_replica" + j;
+      log.info("Creating shard " + shardName + " as part of slice " + sliceName + " of collection " + collectionName
+          + " on " + nodeName);
+
+      // Need to create new params for each request
+      ModifiableSolrParams params = new ModifiableSolrParams();
+      params.set(CoreAdminParams.ACTION, CoreAdminParams.CoreAdminAction.CREATE.toString());
+      params.set(CoreAdminParams.NAME, shardName);
+      params.set(COLL_CONF, configName);
+      params.set(CoreAdminParams.COLLECTION, collectionName);
+      params.set(CoreAdminParams.SHARD, sliceName);
+      params.set(ZkStateReader.NUM_SHARDS_PROP, numSlices);
+      ocmh.addPropertyParams(message, params);
+
+      ocmh.sendShardRequest(nodeName, params, shardHandler, async, requestMap);
+    }
+
+    ocmh.processResponses(results, shardHandler, true, "Failed to create shard", async, requestMap, Collections.emptySet());
+
+    log.info("Finished create command on all shards for collection: " + collectionName);
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/bbd1efe5/solr/core/src/java/org/apache/solr/cloud/DeleteAliasCmd.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/DeleteAliasCmd.java b/solr/core/src/java/org/apache/solr/cloud/DeleteAliasCmd.java
new file mode 100644
index 0000000..7b1993c
--- /dev/null
+++ b/solr/core/src/java/org/apache/solr/cloud/DeleteAliasCmd.java
@@ -0,0 +1,95 @@
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.cloud;
+
+import java.lang.invoke.MethodHandles;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.solr.common.SolrException;
+import org.apache.solr.common.cloud.Aliases;
+import org.apache.solr.common.cloud.ClusterState;
+import org.apache.solr.common.cloud.ZkNodeProps;
+import org.apache.solr.common.cloud.ZkStateReader;
+import org.apache.solr.common.util.NamedList;
+import org.apache.solr.common.util.Utils;
+import org.apache.solr.util.TimeOut;
+import org.apache.zookeeper.KeeperException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.solr.common.params.CommonParams.NAME;
+
+public class DeleteAliasCmd implements OverseerCollectionMessageHandler.Cmd {
+  private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+  private final OverseerCollectionMessageHandler ocmh;
+
+  public DeleteAliasCmd(OverseerCollectionMessageHandler ocmh) {
+    this.ocmh = ocmh;
+  }
+
+  @Override
+  public void call(ClusterState state, ZkNodeProps message, NamedList results) throws Exception {
+    String aliasName = message.getStr(NAME);
+
+    Map<String,Map<String,String>> newAliasesMap = new HashMap<>();
+    Map<String,String> newCollectionAliasesMap = new HashMap<>();
+    ZkStateReader zkStateReader = ocmh.zkStateReader;
+    newCollectionAliasesMap.putAll(zkStateReader.getAliases().getCollectionAliasMap());
+    newCollectionAliasesMap.remove(aliasName);
+    newAliasesMap.put("collection", newCollectionAliasesMap);
+    Aliases newAliases = new Aliases(newAliasesMap);
+    byte[] jsonBytes = null;
+    if (newAliases.collectionAliasSize() > 0) { // only sub map right now
+      jsonBytes  = Utils.toJSON(newAliases.getAliasMap());
+    }
+    try {
+      zkStateReader.getZkClient().setData(ZkStateReader.ALIASES,
+          jsonBytes, true);
+      checkForAliasAbsence(aliasName);
+      // some fudge for other nodes
+      Thread.sleep(100);
+    } catch (KeeperException e) {
+      log.error("", e);
+      throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e);
+    } catch (InterruptedException e) {
+      log.warn("", e);
+      throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e);
+    }
+
+  }
+  private void checkForAliasAbsence(String name) {
+
+    TimeOut timeout = new TimeOut(30, TimeUnit.SECONDS);
+    boolean success = false;
+    Aliases aliases = null;
+    while (! timeout.hasTimedOut()) {
+      aliases = ocmh.zkStateReader.getAliases();
+      String collections = aliases.getCollectionAlias(name);
+      if (collections == null) {
+        success = true;
+        break;
+      }
+    }
+    if (!success) {
+      log.warn("Timeout waiting to be notified of Alias change...");
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/bbd1efe5/solr/core/src/java/org/apache/solr/cloud/DeleteCollectionCmd.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/DeleteCollectionCmd.java b/solr/core/src/java/org/apache/solr/cloud/DeleteCollectionCmd.java
new file mode 100644
index 0000000..4c5ae00
--- /dev/null
+++ b/solr/core/src/java/org/apache/solr/cloud/DeleteCollectionCmd.java
@@ -0,0 +1,121 @@
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.cloud;
+
+import java.lang.invoke.MethodHandles;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.solr.common.NonExistentCoreException;
+import org.apache.solr.common.SolrException;
+import org.apache.solr.common.cloud.ClusterState;
+import org.apache.solr.common.cloud.ZkNodeProps;
+import org.apache.solr.common.cloud.ZkStateReader;
+import org.apache.solr.common.params.CoreAdminParams;
+import org.apache.solr.common.params.ModifiableSolrParams;
+import org.apache.solr.common.util.NamedList;
+import org.apache.solr.common.util.Utils;
+import org.apache.solr.util.TimeOut;
+import org.apache.zookeeper.KeeperException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.solr.common.params.CollectionParams.CollectionAction.DELETE;
+import static org.apache.solr.common.params.CommonAdminParams.ASYNC;
+import static org.apache.solr.common.params.CommonParams.NAME;
+
+public class DeleteCollectionCmd implements OverseerCollectionMessageHandler.Cmd {
+  private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+  private final OverseerCollectionMessageHandler ocmh;
+
+  public DeleteCollectionCmd(OverseerCollectionMessageHandler ocmh) {
+    this.ocmh = ocmh;
+  }
+
+  @Override
+  public void call(ClusterState state, ZkNodeProps message, NamedList results) throws Exception {
+    ZkStateReader zkStateReader = ocmh.zkStateReader;
+    final String collection = message.getStr(NAME);
+    try {
+      if (zkStateReader.getClusterState().getCollectionOrNull(collection) == null) {
+        if (zkStateReader.getZkClient().exists(ZkStateReader.COLLECTIONS_ZKNODE + "/" + collection, true)) {
+          // if the collection is not in the clusterstate, but is listed in zk, do nothing, it will just
+          // be removed in the finally - we cannot continue, because the below code will error if the collection
+          // is not in the clusterstate
+          return;
+        }
+      }
+      ModifiableSolrParams params = new ModifiableSolrParams();
+      params.set(CoreAdminParams.ACTION, CoreAdminParams.CoreAdminAction.UNLOAD.toString());
+      params.set(CoreAdminParams.DELETE_INSTANCE_DIR, true);
+      params.set(CoreAdminParams.DELETE_DATA_DIR, true);
+
+      String asyncId = message.getStr(ASYNC);
+      Map<String, String> requestMap = null;
+      if (asyncId != null) {
+        requestMap = new HashMap<>();
+      }
+
+      Set<String> okayExceptions = new HashSet<>(1);
+      okayExceptions.add(NonExistentCoreException.class.getName());
+
+      ocmh.collectionCmd(message, params, results, null, asyncId, requestMap, okayExceptions);
+
+      ZkNodeProps m = new ZkNodeProps(Overseer.QUEUE_OPERATION, DELETE.toLower(), NAME, collection);
+      Overseer.getStateUpdateQueue(zkStateReader.getZkClient()).offer(Utils.toJSON(m));
+
+      // wait for a while until we don't see the collection
+      TimeOut timeout = new TimeOut(30, TimeUnit.SECONDS);
+      boolean removed = false;
+      while (! timeout.hasTimedOut()) {
+        Thread.sleep(100);
+        removed = !zkStateReader.getClusterState().hasCollection(collection);
+        if (removed) {
+          Thread.sleep(500); // just a bit of time so it's more likely other
+          // readers see on return
+          break;
+        }
+      }
+      if (!removed) {
+        throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
+            "Could not fully remove collection: " + collection);
+      }
+
+    } finally {
+
+      try {
+        if (zkStateReader.getZkClient().exists(
+            ZkStateReader.COLLECTIONS_ZKNODE + "/" + collection, true)) {
+          zkStateReader.getZkClient().clean(
+              ZkStateReader.COLLECTIONS_ZKNODE + "/" + collection);
+        }
+      } catch (InterruptedException e) {
+        SolrException.log(log, "Cleaning up collection in zk was interrupted:"
+            + collection, e);
+        Thread.currentThread().interrupt();
+      } catch (KeeperException e) {
+        SolrException.log(log, "Problem cleaning up collection in zk:"
+            + collection, e);
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/bbd1efe5/solr/core/src/java/org/apache/solr/cloud/DeleteNodeCmd.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/DeleteNodeCmd.java b/solr/core/src/java/org/apache/solr/cloud/DeleteNodeCmd.java
index 3e60090..b3c5055 100644
--- a/solr/core/src/java/org/apache/solr/cloud/DeleteNodeCmd.java
+++ b/solr/core/src/java/org/apache/solr/cloud/DeleteNodeCmd.java
@@ -34,6 +34,7 @@ import org.slf4j.LoggerFactory;
 
 import static org.apache.solr.common.cloud.ZkStateReader.COLLECTION_PROP;
 import static org.apache.solr.common.cloud.ZkStateReader.SHARD_ID_PROP;
+import static org.apache.solr.common.params.CollectionParams.CollectionAction.DELETEREPLICA;
 
 public class DeleteNodeCmd implements OverseerCollectionMessageHandler.Cmd {
   private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
@@ -64,7 +65,7 @@ public class DeleteNodeCmd implements OverseerCollectionMessageHandler.Cmd {
       log.info("Deleting replica for collection={} shard={} on node={}", sourceReplica.getStr(COLLECTION_PROP), sourceReplica.getStr(SHARD_ID_PROP), node);
       NamedList deleteResult = new NamedList();
       try {
-        ocmh.deleteReplica(clusterState, sourceReplica.plus("parallel", "true"), deleteResult, () -> {
+        ((DeleteReplicaCmd)ocmh.commandMap.get(DELETEREPLICA)).deleteReplica(clusterState, sourceReplica.plus("parallel", "true"), deleteResult, () -> {
           cleanupLatch.countDown();
           if (deleteResult.get("failure") != null) {
             synchronized (results) {

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/bbd1efe5/solr/core/src/java/org/apache/solr/cloud/DeleteReplicaCmd.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/DeleteReplicaCmd.java b/solr/core/src/java/org/apache/solr/cloud/DeleteReplicaCmd.java
new file mode 100644
index 0000000..6f5fc62
--- /dev/null
+++ b/solr/core/src/java/org/apache/solr/cloud/DeleteReplicaCmd.java
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.cloud;
+
+import java.lang.invoke.MethodHandles;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.solr.cloud.OverseerCollectionMessageHandler.Cmd;
+import org.apache.solr.common.SolrException;
+import org.apache.solr.common.cloud.ClusterState;
+import org.apache.solr.common.cloud.DocCollection;
+import org.apache.solr.common.cloud.Replica;
+import org.apache.solr.common.cloud.Slice;
+import org.apache.solr.common.cloud.ZkNodeProps;
+import org.apache.solr.common.cloud.ZkStateReader;
+import org.apache.solr.common.params.CoreAdminParams;
+import org.apache.solr.common.params.ModifiableSolrParams;
+import org.apache.solr.common.util.NamedList;
+import org.apache.solr.common.util.StrUtils;
+import org.apache.solr.handler.component.ShardHandler;
+import org.apache.zookeeper.KeeperException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.solr.cloud.OverseerCollectionMessageHandler.ONLY_IF_DOWN;
+import static org.apache.solr.common.cloud.ZkStateReader.COLLECTION_PROP;
+import static org.apache.solr.common.cloud.ZkStateReader.REPLICA_PROP;
+import static org.apache.solr.common.cloud.ZkStateReader.SHARD_ID_PROP;
+import static org.apache.solr.common.params.CommonAdminParams.ASYNC;
+
+
+public class DeleteReplicaCmd implements Cmd {
+  private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+  private final OverseerCollectionMessageHandler ocmh;
+
+  public DeleteReplicaCmd(OverseerCollectionMessageHandler ocmh) {
+    this.ocmh = ocmh;
+  }
+
+  @Override
+  @SuppressWarnings("unchecked")
+
+  public void call(ClusterState clusterState, ZkNodeProps message, NamedList results) throws Exception {
+    deleteReplica(clusterState, message, results,null);
+  }
+
+  @SuppressWarnings("unchecked")
+  void deleteReplica(ClusterState clusterState, ZkNodeProps message, NamedList results, Runnable onComplete)
+      throws KeeperException, InterruptedException {
+    ocmh.checkRequired(message, COLLECTION_PROP, SHARD_ID_PROP, REPLICA_PROP);
+    String collectionName = message.getStr(COLLECTION_PROP);
+    String shard = message.getStr(SHARD_ID_PROP);
+    String replicaName = message.getStr(REPLICA_PROP);
+    boolean parallel = message.getBool("parallel", false);
+
+    DocCollection coll = clusterState.getCollection(collectionName);
+    Slice slice = coll.getSlice(shard);
+    if (slice == null) {
+      throw new SolrException(SolrException.ErrorCode.BAD_REQUEST,
+          "Invalid shard name : " + shard + " in collection : " + collectionName);
+    }
+    Replica replica = slice.getReplica(replicaName);
+    if (replica == null) {
+      ArrayList<String> l = new ArrayList<>();
+      for (Replica r : slice.getReplicas())
+        l.add(r.getName());
+      throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Invalid replica : " + replicaName + " in shard/collection : "
+          + shard + "/" + collectionName + " available replicas are " + StrUtils.join(l, ','));
+    }
+
+    // If users are being safe and only want to remove a shard if it is down, they can specify onlyIfDown=true
+    // on the command.
+    if (Boolean.parseBoolean(message.getStr(ONLY_IF_DOWN)) && replica.getState() != Replica.State.DOWN) {
+      throw new SolrException(SolrException.ErrorCode.BAD_REQUEST,
+          "Attempted to remove replica : " + collectionName + "/" + shard + "/" + replicaName
+              + " with onlyIfDown='true', but state is '" + replica.getStr(ZkStateReader.STATE_PROP) + "'");
+    }
+
+    ShardHandler shardHandler = ocmh.shardHandlerFactory.getShardHandler();
+    String core = replica.getStr(ZkStateReader.CORE_NAME_PROP);
+    String asyncId = message.getStr(ASYNC);
+    AtomicReference<Map<String, String>> requestMap = new AtomicReference<>(null);
+    if (asyncId != null) {
+      requestMap.set(new HashMap<>(1, 1.0f));
+    }
+
+    ModifiableSolrParams params = new ModifiableSolrParams();
+    params.add(CoreAdminParams.ACTION, CoreAdminParams.CoreAdminAction.UNLOAD.toString());
+    params.add(CoreAdminParams.CORE, core);
+
+    params.set(CoreAdminParams.DELETE_INDEX, message.getBool(CoreAdminParams.DELETE_INDEX, true));
+    params.set(CoreAdminParams.DELETE_INSTANCE_DIR, message.getBool(CoreAdminParams.DELETE_INSTANCE_DIR, true));
+    params.set(CoreAdminParams.DELETE_DATA_DIR, message.getBool(CoreAdminParams.DELETE_DATA_DIR, true));
+
+    boolean isLive = ocmh.zkStateReader.getClusterState().getLiveNodes().contains(replica.getNodeName());
+    if (isLive) {
+      ocmh.sendShardRequest(replica.getNodeName(), params, shardHandler, asyncId, requestMap.get());
+    }
+
+    Callable<Boolean> callable = () -> {
+      try {
+        if (isLive) {
+          ocmh.processResponses(results, shardHandler, false, null, asyncId, requestMap.get());
+
+          //check if the core unload removed the corenode zk entry
+          if (ocmh.waitForCoreNodeGone(collectionName, shard, replicaName, 5000)) return Boolean.TRUE;
+        }
+
+        // try and ensure core info is removed from cluster state
+        ocmh.deleteCoreNode(collectionName, replicaName, replica, core);
+        if (ocmh.waitForCoreNodeGone(collectionName, shard, replicaName, 30000)) return Boolean.TRUE;
+        return Boolean.FALSE;
+      } catch (Exception e) {
+        results.add("failure", "Could not complete delete " + e.getMessage());
+        throw e;
+      } finally {
+        if (onComplete != null) onComplete.run();
+      }
+    };
+
+    if (!parallel) {
+      try {
+        if (!callable.call())
+          throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
+              "Could not  remove replica : " + collectionName + "/" + shard + "/" + replicaName);
+      } catch (InterruptedException | KeeperException e) {
+        throw e;
+      } catch (Exception ex) {
+        throw new SolrException(SolrException.ErrorCode.UNKNOWN, "Error waiting for corenode gone", ex);
+      }
+
+    } else {
+      ocmh.tpe.submit(callable);
+    }
+  }
+
+ }

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/bbd1efe5/solr/core/src/java/org/apache/solr/cloud/DeleteShardCmd.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/DeleteShardCmd.java b/solr/core/src/java/org/apache/solr/cloud/DeleteShardCmd.java
new file mode 100644
index 0000000..f2ae5ca
--- /dev/null
+++ b/solr/core/src/java/org/apache/solr/cloud/DeleteShardCmd.java
@@ -0,0 +1,126 @@
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.cloud;
+import java.lang.invoke.MethodHandles;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.solr.cloud.OverseerCollectionMessageHandler.Cmd;
+import org.apache.solr.common.SolrException;
+import org.apache.solr.common.cloud.ClusterState;
+import org.apache.solr.common.cloud.DocCollection;
+import org.apache.solr.common.cloud.Slice;
+import org.apache.solr.common.cloud.ZkNodeProps;
+import org.apache.solr.common.cloud.ZkStateReader;
+import org.apache.solr.common.params.CoreAdminParams;
+import org.apache.solr.common.params.ModifiableSolrParams;
+import org.apache.solr.common.util.NamedList;
+import org.apache.solr.common.util.Utils;
+import org.apache.solr.handler.component.ShardHandler;
+import org.apache.solr.util.TimeOut;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.solr.common.params.CollectionParams.CollectionAction.DELETESHARD;
+import static org.apache.solr.common.params.CommonAdminParams.ASYNC;
+
+public class DeleteShardCmd implements Cmd {
+  private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+  private final OverseerCollectionMessageHandler ocmh;
+
+  public DeleteShardCmd(OverseerCollectionMessageHandler ocmh) {
+    this.ocmh = ocmh;
+  }
+
+  @Override
+  public void call(ClusterState clusterState, ZkNodeProps message, NamedList results) throws Exception {
+    String collectionName = message.getStr(ZkStateReader.COLLECTION_PROP);
+    String sliceId = message.getStr(ZkStateReader.SHARD_ID_PROP);
+
+    log.info("Delete shard invoked");
+    Slice slice = clusterState.getSlice(collectionName, sliceId);
+
+    if (slice == null) {
+      if (clusterState.hasCollection(collectionName)) {
+        throw new SolrException(SolrException.ErrorCode.BAD_REQUEST,
+            "No shard with name " + sliceId + " exists for collection " + collectionName);
+      } else {
+        throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "No collection with the specified name exists: " + collectionName);
+      }
+    }
+    // For now, only allow for deletions of Inactive slices or custom hashes (range==null).
+    // TODO: Add check for range gaps on Slice deletion
+    final Slice.State state = slice.getState();
+    if (!(slice.getRange() == null || state == Slice.State.INACTIVE || state == Slice.State.RECOVERY
+        || state == Slice.State.CONSTRUCTION)) {
+      throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "The slice: " + slice.getName() + " is currently " + state
+          + ". Only non-active (or custom-hashed) slices can be deleted.");
+    }
+    ShardHandler shardHandler = ocmh.shardHandlerFactory.getShardHandler();
+
+    String asyncId = message.getStr(ASYNC);
+    Map<String, String> requestMap = null;
+    if (asyncId != null) {
+      requestMap = new HashMap<>(slice.getReplicas().size(), 1.0f);
+    }
+
+    try {
+      ModifiableSolrParams params = new ModifiableSolrParams();
+      params.set(CoreAdminParams.ACTION, CoreAdminParams.CoreAdminAction.UNLOAD.toString());
+      params.set(CoreAdminParams.DELETE_INDEX, message.getBool(CoreAdminParams.DELETE_INDEX, true));
+      params.set(CoreAdminParams.DELETE_INSTANCE_DIR, message.getBool(CoreAdminParams.DELETE_INSTANCE_DIR, true));
+      params.set(CoreAdminParams.DELETE_DATA_DIR, message.getBool(CoreAdminParams.DELETE_DATA_DIR, true));
+
+      ocmh.sliceCmd(clusterState, params, null, slice, shardHandler, asyncId, requestMap);
+
+      ocmh.processResponses(results, shardHandler, true, "Failed to delete shard", asyncId, requestMap, Collections.emptySet());
+
+      ZkNodeProps m = new ZkNodeProps(Overseer.QUEUE_OPERATION, DELETESHARD.toLower(), ZkStateReader.COLLECTION_PROP,
+          collectionName, ZkStateReader.SHARD_ID_PROP, sliceId);
+      ZkStateReader zkStateReader = ocmh.zkStateReader;
+      Overseer.getStateUpdateQueue(zkStateReader.getZkClient()).offer(Utils.toJSON(m));
+
+      // wait for a while until we don't see the shard
+      TimeOut timeout = new TimeOut(30, TimeUnit.SECONDS);
+      boolean removed = false;
+      while (! timeout.hasTimedOut()) {
+        Thread.sleep(100);
+        DocCollection collection = zkStateReader.getClusterState().getCollection(collectionName);
+        removed = collection.getSlice(sliceId) == null;
+        if (removed) {
+          Thread.sleep(100); // just a bit of time so it's more likely other readers see on return
+          break;
+        }
+      }
+      if (!removed) {
+        throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
+            "Could not fully remove collection: " + collectionName + " shard: " + sliceId);
+      }
+
+      log.info("Successfully deleted collection: " + collectionName + ", shard: " + sliceId);
+
+    } catch (SolrException e) {
+      throw e;
+    } catch (Exception e) {
+      throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
+          "Error executing delete operation for collection: " + collectionName + " shard: " + sliceId, e);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/bbd1efe5/solr/core/src/java/org/apache/solr/cloud/MigrateCmd.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/MigrateCmd.java b/solr/core/src/java/org/apache/solr/cloud/MigrateCmd.java
new file mode 100644
index 0000000..7b1ad2c
--- /dev/null
+++ b/solr/core/src/java/org/apache/solr/cloud/MigrateCmd.java
@@ -0,0 +1,333 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.cloud;
+
+import java.lang.invoke.MethodHandles;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.solr.client.solrj.request.CoreAdminRequest;
+import org.apache.solr.cloud.overseer.OverseerAction;
+import org.apache.solr.common.SolrException;
+import org.apache.solr.common.cloud.ClusterState;
+import org.apache.solr.common.cloud.CompositeIdRouter;
+import org.apache.solr.common.cloud.DocCollection;
+import org.apache.solr.common.cloud.DocRouter;
+import org.apache.solr.common.cloud.Replica;
+import org.apache.solr.common.cloud.RoutingRule;
+import org.apache.solr.common.cloud.Slice;
+import org.apache.solr.common.cloud.ZkNodeProps;
+import org.apache.solr.common.cloud.ZkStateReader;
+import org.apache.solr.common.params.CoreAdminParams;
+import org.apache.solr.common.params.ModifiableSolrParams;
+import org.apache.solr.common.util.NamedList;
+import org.apache.solr.common.util.Utils;
+import org.apache.solr.handler.component.ShardHandler;
+import org.apache.solr.handler.component.ShardHandlerFactory;
+import org.apache.solr.update.SolrIndexSplitter;
+import org.apache.solr.util.TimeOut;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.solr.cloud.OverseerCollectionMessageHandler.COLL_CONF;
+import static org.apache.solr.cloud.OverseerCollectionMessageHandler.COLL_PROP_PREFIX;
+import static org.apache.solr.cloud.OverseerCollectionMessageHandler.CREATE_NODE_SET;
+import static org.apache.solr.cloud.OverseerCollectionMessageHandler.NUM_SLICES;
+import static org.apache.solr.common.cloud.ZkStateReader.COLLECTION_PROP;
+import static org.apache.solr.common.cloud.ZkStateReader.REPLICATION_FACTOR;
+import static org.apache.solr.common.cloud.ZkStateReader.SHARD_ID_PROP;
+import static org.apache.solr.common.params.CollectionParams.CollectionAction.ADDREPLICA;
+import static org.apache.solr.common.params.CollectionParams.CollectionAction.CREATE;
+import static org.apache.solr.common.params.CollectionParams.CollectionAction.DELETE;
+import static org.apache.solr.common.params.CommonAdminParams.ASYNC;
+import static org.apache.solr.common.params.CommonParams.NAME;
+import static org.apache.solr.common.util.Utils.makeMap;
+
+public class MigrateCmd implements OverseerCollectionMessageHandler.Cmd {
+  private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+  private final OverseerCollectionMessageHandler ocmh;
+
+  public MigrateCmd(OverseerCollectionMessageHandler ocmh) {
+    this.ocmh = ocmh;
+  }
+
+
+  @Override
+  public void call(ClusterState clusterState, ZkNodeProps message, NamedList results) throws Exception {
+    String sourceCollectionName = message.getStr("collection");
+    String splitKey = message.getStr("split.key");
+    String targetCollectionName = message.getStr("target.collection");
+    int timeout = message.getInt("forward.timeout", 10 * 60) * 1000;
+
+    DocCollection sourceCollection = clusterState.getCollection(sourceCollectionName);
+    if (sourceCollection == null) {
+      throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Unknown source collection: " + sourceCollectionName);
+    }
+    DocCollection targetCollection = clusterState.getCollection(targetCollectionName);
+    if (targetCollection == null) {
+      throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Unknown target collection: " + sourceCollectionName);
+    }
+    if (!(sourceCollection.getRouter() instanceof CompositeIdRouter)) {
+      throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Source collection must use a compositeId router");
+    }
+    if (!(targetCollection.getRouter() instanceof CompositeIdRouter)) {
+      throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Target collection must use a compositeId router");
+    }
+    CompositeIdRouter sourceRouter = (CompositeIdRouter) sourceCollection.getRouter();
+    CompositeIdRouter targetRouter = (CompositeIdRouter) targetCollection.getRouter();
+    Collection<Slice> sourceSlices = sourceRouter.getSearchSlicesSingle(splitKey, null, sourceCollection);
+    if (sourceSlices.isEmpty()) {
+      throw new SolrException(SolrException.ErrorCode.BAD_REQUEST,
+          "No active slices available in source collection: " + sourceCollection + "for given split.key: " + splitKey);
+    }
+    Collection<Slice> targetSlices = targetRouter.getSearchSlicesSingle(splitKey, null, targetCollection);
+    if (targetSlices.isEmpty()) {
+      throw new SolrException(SolrException.ErrorCode.BAD_REQUEST,
+          "No active slices available in target collection: " + targetCollection + "for given split.key: " + splitKey);
+    }
+
+    String asyncId = null;
+    if (message.containsKey(ASYNC) && message.get(ASYNC) != null)
+      asyncId = message.getStr(ASYNC);
+
+    for (Slice sourceSlice : sourceSlices) {
+      for (Slice targetSlice : targetSlices) {
+        log.info("Migrating source shard: {} to target shard: {} for split.key = " + splitKey, sourceSlice, targetSlice);
+        migrateKey(clusterState, sourceCollection, sourceSlice, targetCollection, targetSlice, splitKey,
+            timeout, results, asyncId, message);
+      }
+    }
+  }
+
+  private void migrateKey(ClusterState clusterState, DocCollection sourceCollection, Slice sourceSlice,
+                          DocCollection targetCollection, Slice targetSlice,
+                          String splitKey, int timeout,
+                          NamedList results, String asyncId, ZkNodeProps message) throws Exception {
+    String tempSourceCollectionName = "split_" + sourceSlice.getName() + "_temp_" + targetSlice.getName();
+    ZkStateReader zkStateReader = ocmh.zkStateReader;
+    if (clusterState.hasCollection(tempSourceCollectionName)) {
+      log.info("Deleting temporary collection: " + tempSourceCollectionName);
+      Map<String, Object> props = makeMap(
+          Overseer.QUEUE_OPERATION, DELETE.toLower(),
+          NAME, tempSourceCollectionName);
+
+      try {
+        ocmh.commandMap.get(DELETE).call(zkStateReader.getClusterState(), new ZkNodeProps(props), results);
+        clusterState = zkStateReader.getClusterState();
+      } catch (Exception e) {
+        log.warn("Unable to clean up existing temporary collection: " + tempSourceCollectionName, e);
+      }
+    }
+
+    CompositeIdRouter sourceRouter = (CompositeIdRouter) sourceCollection.getRouter();
+    DocRouter.Range keyHashRange = sourceRouter.keyHashRange(splitKey);
+
+    ShardHandlerFactory shardHandlerFactory = ocmh.shardHandlerFactory;
+    ShardHandler shardHandler = shardHandlerFactory.getShardHandler();
+
+    log.info("Hash range for split.key: {} is: {}", splitKey, keyHashRange);
+    // intersect source range, keyHashRange and target range
+    // this is the range that has to be split from source and transferred to target
+    DocRouter.Range splitRange = ocmh.intersect(targetSlice.getRange(), ocmh.intersect(sourceSlice.getRange(), keyHashRange));
+    if (splitRange == null) {
+      log.info("No common hashes between source shard: {} and target shard: {}", sourceSlice.getName(), targetSlice.getName());
+      return;
+    }
+    log.info("Common hash range between source shard: {} and target shard: {} = " + splitRange, sourceSlice.getName(), targetSlice.getName());
+
+    Replica targetLeader = zkStateReader.getLeaderRetry(targetCollection.getName(), targetSlice.getName(), 10000);
+    // For tracking async calls.
+    Map<String, String> requestMap = new HashMap<>();
+
+    log.info("Asking target leader node: " + targetLeader.getNodeName() + " core: "
+        + targetLeader.getStr("core") + " to buffer updates");
+    ModifiableSolrParams params = new ModifiableSolrParams();
+    params.set(CoreAdminParams.ACTION, CoreAdminParams.CoreAdminAction.REQUESTBUFFERUPDATES.toString());
+    params.set(CoreAdminParams.NAME, targetLeader.getStr("core"));
+
+    ocmh.sendShardRequest(targetLeader.getNodeName(), params, shardHandler, asyncId, requestMap);
+
+    ocmh.processResponses(results, shardHandler, true, "MIGRATE failed to request node to buffer updates", asyncId, requestMap);
+
+    ZkNodeProps m = new ZkNodeProps(
+        Overseer.QUEUE_OPERATION, OverseerAction.ADDROUTINGRULE.toLower(),
+        COLLECTION_PROP, sourceCollection.getName(),
+        SHARD_ID_PROP, sourceSlice.getName(),
+        "routeKey", SolrIndexSplitter.getRouteKey(splitKey) + "!",
+        "range", splitRange.toString(),
+        "targetCollection", targetCollection.getName(),
+        "expireAt", RoutingRule.makeExpiryAt(timeout));
+    log.info("Adding routing rule: " + m);
+    Overseer.getStateUpdateQueue(zkStateReader.getZkClient()).offer(Utils.toJSON(m));
+
+    // wait for a while until we see the new rule
+    log.info("Waiting to see routing rule updated in clusterstate");
+    TimeOut waitUntil = new TimeOut(60, TimeUnit.SECONDS);
+    boolean added = false;
+    while (!waitUntil.hasTimedOut()) {
+      Thread.sleep(100);
+      sourceCollection = zkStateReader.getClusterState().getCollection(sourceCollection.getName());
+      sourceSlice = sourceCollection.getSlice(sourceSlice.getName());
+      Map<String, RoutingRule> rules = sourceSlice.getRoutingRules();
+      if (rules != null) {
+        RoutingRule rule = rules.get(SolrIndexSplitter.getRouteKey(splitKey) + "!");
+        if (rule != null && rule.getRouteRanges().contains(splitRange)) {
+          added = true;
+          break;
+        }
+      }
+    }
+    if (!added) {
+      throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Could not add routing rule: " + m);
+    }
+
+    log.info("Routing rule added successfully");
+
+    // Create temp core on source shard
+    Replica sourceLeader = zkStateReader.getLeaderRetry(sourceCollection.getName(), sourceSlice.getName(), 10000);
+
+    // create a temporary collection with just one node on the shard leader
+    String configName = zkStateReader.readConfigName(sourceCollection.getName());
+    Map<String, Object> props = makeMap(
+        Overseer.QUEUE_OPERATION, CREATE.toLower(),
+        NAME, tempSourceCollectionName,
+        REPLICATION_FACTOR, 1,
+        NUM_SLICES, 1,
+        COLL_CONF, configName,
+        CREATE_NODE_SET, sourceLeader.getNodeName());
+    if (asyncId != null) {
+      String internalAsyncId = asyncId + Math.abs(System.nanoTime());
+      props.put(ASYNC, internalAsyncId);
+    }
+
+    log.info("Creating temporary collection: " + props);
+    ocmh.commandMap.get(CREATE).call(clusterState, new ZkNodeProps(props), results);
+    // refresh cluster state
+    clusterState = zkStateReader.getClusterState();
+    Slice tempSourceSlice = clusterState.getCollection(tempSourceCollectionName).getSlices().iterator().next();
+    Replica tempSourceLeader = zkStateReader.getLeaderRetry(tempSourceCollectionName, tempSourceSlice.getName(), 120000);
+
+    String tempCollectionReplica1 = tempSourceCollectionName + "_" + tempSourceSlice.getName() + "_replica1";
+    String coreNodeName = ocmh.waitForCoreNodeName(tempSourceCollectionName,
+        sourceLeader.getNodeName(), tempCollectionReplica1);
+    // wait for the replicas to be seen as active on temp source leader
+    log.info("Asking source leader to wait for: " + tempCollectionReplica1 + " to be alive on: " + sourceLeader.getNodeName());
+    CoreAdminRequest.WaitForState cmd = new CoreAdminRequest.WaitForState();
+    cmd.setCoreName(tempCollectionReplica1);
+    cmd.setNodeName(sourceLeader.getNodeName());
+    cmd.setCoreNodeName(coreNodeName);
+    cmd.setState(Replica.State.ACTIVE);
+    cmd.setCheckLive(true);
+    cmd.setOnlyIfLeader(true);
+    // we don't want this to happen asynchronously
+    ocmh.sendShardRequest(tempSourceLeader.getNodeName(), new ModifiableSolrParams(cmd.getParams()), shardHandler, null, null);
+
+    ocmh.processResponses(results, shardHandler, true, "MIGRATE failed to create temp collection leader" +
+        " or timed out waiting for it to come up", asyncId, requestMap);
+
+    log.info("Asking source leader to split index");
+    params = new ModifiableSolrParams();
+    params.set(CoreAdminParams.ACTION, CoreAdminParams.CoreAdminAction.SPLIT.toString());
+    params.set(CoreAdminParams.CORE, sourceLeader.getStr("core"));
+    params.add(CoreAdminParams.TARGET_CORE, tempSourceLeader.getStr("core"));
+    params.set(CoreAdminParams.RANGES, splitRange.toString());
+    params.set("split.key", splitKey);
+
+    String tempNodeName = sourceLeader.getNodeName();
+
+    ocmh.sendShardRequest(tempNodeName, params, shardHandler, asyncId, requestMap);
+    ocmh.processResponses(results, shardHandler, true, "MIGRATE failed to invoke SPLIT core admin command", asyncId, requestMap);
+
+    log.info("Creating a replica of temporary collection: {} on the target leader node: {}",
+        tempSourceCollectionName, targetLeader.getNodeName());
+    String tempCollectionReplica2 = tempSourceCollectionName + "_" + tempSourceSlice.getName() + "_replica2";
+    props = new HashMap<>();
+    props.put(Overseer.QUEUE_OPERATION, ADDREPLICA.toLower());
+    props.put(COLLECTION_PROP, tempSourceCollectionName);
+    props.put(SHARD_ID_PROP, tempSourceSlice.getName());
+    props.put("node", targetLeader.getNodeName());
+    props.put(CoreAdminParams.NAME, tempCollectionReplica2);
+    // copy over property params:
+    for (String key : message.keySet()) {
+      if (key.startsWith(COLL_PROP_PREFIX)) {
+        props.put(key, message.getStr(key));
+      }
+    }
+    // add async param
+    if (asyncId != null) {
+      props.put(ASYNC, asyncId);
+    }
+    ((AddReplicaCmd)ocmh.commandMap.get(ADDREPLICA)).addReplica(clusterState, new ZkNodeProps(props), results, null);
+
+    ocmh.processResponses(results, shardHandler, true, "MIGRATE failed to create replica of " +
+        "temporary collection in target leader node.", asyncId, requestMap);
+
+    coreNodeName = ocmh.waitForCoreNodeName(tempSourceCollectionName,
+        targetLeader.getNodeName(), tempCollectionReplica2);
+    // wait for the replicas to be seen as active on temp source leader
+    log.info("Asking temp source leader to wait for: " + tempCollectionReplica2 + " to be alive on: " + targetLeader.getNodeName());
+    cmd = new CoreAdminRequest.WaitForState();
+    cmd.setCoreName(tempSourceLeader.getStr("core"));
+    cmd.setNodeName(targetLeader.getNodeName());
+    cmd.setCoreNodeName(coreNodeName);
+    cmd.setState(Replica.State.ACTIVE);
+    cmd.setCheckLive(true);
+    cmd.setOnlyIfLeader(true);
+    params = new ModifiableSolrParams(cmd.getParams());
+
+    ocmh.sendShardRequest(tempSourceLeader.getNodeName(), params, shardHandler, asyncId, requestMap);
+
+    ocmh.processResponses(results, shardHandler, true, "MIGRATE failed to create temp collection" +
+        " replica or timed out waiting for them to come up", asyncId, requestMap);
+
+    log.info("Successfully created replica of temp source collection on target leader node");
+
+    log.info("Requesting merge of temp source collection replica to target leader");
+    params = new ModifiableSolrParams();
+    params.set(CoreAdminParams.ACTION, CoreAdminParams.CoreAdminAction.MERGEINDEXES.toString());
+    params.set(CoreAdminParams.CORE, targetLeader.getStr("core"));
+    params.set(CoreAdminParams.SRC_CORE, tempCollectionReplica2);
+
+    ocmh.sendShardRequest(targetLeader.getNodeName(), params, shardHandler, asyncId, requestMap);
+    String msg = "MIGRATE failed to merge " + tempCollectionReplica2 + " to "
+        + targetLeader.getStr("core") + " on node: " + targetLeader.getNodeName();
+    ocmh.processResponses(results, shardHandler, true, msg, asyncId, requestMap);
+
+    log.info("Asking target leader to apply buffered updates");
+    params = new ModifiableSolrParams();
+    params.set(CoreAdminParams.ACTION, CoreAdminParams.CoreAdminAction.REQUESTAPPLYUPDATES.toString());
+    params.set(CoreAdminParams.NAME, targetLeader.getStr("core"));
+
+    ocmh.sendShardRequest(targetLeader.getNodeName(), params, shardHandler, asyncId, requestMap);
+    ocmh.processResponses(results, shardHandler, true, "MIGRATE failed to request node to apply buffered updates",
+        asyncId, requestMap);
+
+    try {
+      log.info("Deleting temporary collection: " + tempSourceCollectionName);
+      props = makeMap(
+          Overseer.QUEUE_OPERATION, DELETE.toLower(),
+          NAME, tempSourceCollectionName);
+      ocmh.commandMap.get(DELETE). call(zkStateReader.getClusterState(), new ZkNodeProps(props), results);
+    } catch (Exception e) {
+      log.error("Unable to delete temporary collection: " + tempSourceCollectionName
+          + ". Please remove it manually", e);
+    }
+  }
+}