You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by va...@apache.org on 2018/01/16 20:39:50 UTC

[09/15] lucene-solr:branch_7x: SOLR-11817: Move Collections API classes to it's own package

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/1c6cc20e/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteReplicaCmd.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteReplicaCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteReplicaCmd.java
new file mode 100644
index 0000000..eefe903
--- /dev/null
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteReplicaCmd.java
@@ -0,0 +1,280 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.cloud.api.collections;
+
+import java.lang.invoke.MethodHandles;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.solr.cloud.api.collections.OverseerCollectionMessageHandler.Cmd;
+import org.apache.solr.common.SolrException;
+import org.apache.solr.common.cloud.ClusterState;
+import org.apache.solr.common.cloud.DocCollection;
+import org.apache.solr.common.cloud.Replica;
+import org.apache.solr.common.cloud.Slice;
+import org.apache.solr.common.cloud.ZkNodeProps;
+import org.apache.solr.common.cloud.ZkStateReader;
+import org.apache.solr.common.params.CoreAdminParams;
+import org.apache.solr.common.params.ModifiableSolrParams;
+import org.apache.solr.common.util.NamedList;
+import org.apache.solr.common.util.StrUtils;
+import org.apache.solr.common.util.Utils;
+import org.apache.solr.handler.component.ShardHandler;
+import org.apache.zookeeper.KeeperException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.solr.common.cloud.ZkStateReader.COLLECTION_PROP;
+import static org.apache.solr.common.cloud.ZkStateReader.REPLICA_PROP;
+import static org.apache.solr.common.cloud.ZkStateReader.SHARD_ID_PROP;
+import static org.apache.solr.common.params.CollectionAdminParams.COUNT_PROP;
+import static org.apache.solr.common.params.CommonAdminParams.ASYNC;
+
+
+public class DeleteReplicaCmd implements Cmd {
+  private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+  private final OverseerCollectionMessageHandler ocmh;
+
+  public DeleteReplicaCmd(OverseerCollectionMessageHandler ocmh) {
+    this.ocmh = ocmh;
+  }
+
+  @Override
+  @SuppressWarnings("unchecked")
+
+  public void call(ClusterState clusterState, ZkNodeProps message, NamedList results) throws Exception {
+    deleteReplica(clusterState, message, results,null);
+  }
+
+
+  @SuppressWarnings("unchecked")
+  void deleteReplica(ClusterState clusterState, ZkNodeProps message, NamedList results, Runnable onComplete)
+          throws KeeperException, InterruptedException {
+    log.debug("deleteReplica() : {}", Utils.toJSONString(message));
+    boolean parallel = message.getBool("parallel", false);
+
+    //If a count is specified the strategy needs be different
+    if (message.getStr(COUNT_PROP) != null) {
+      deleteReplicaBasedOnCount(clusterState, message, results, onComplete, parallel);
+      return;
+    }
+
+
+    ocmh.checkRequired(message, COLLECTION_PROP, SHARD_ID_PROP, REPLICA_PROP);
+    String collectionName = message.getStr(COLLECTION_PROP);
+    String shard = message.getStr(SHARD_ID_PROP);
+    String replicaName = message.getStr(REPLICA_PROP);
+
+    DocCollection coll = clusterState.getCollection(collectionName);
+    Slice slice = coll.getSlice(shard);
+    if (slice == null) {
+      throw new SolrException(SolrException.ErrorCode.BAD_REQUEST,
+              "Invalid shard name : " +  shard + " in collection : " +  collectionName);
+    }
+
+    deleteCore(slice, collectionName, replicaName, message, shard, results, onComplete,  parallel);
+
+  }
+
+
+  /**
+   * Delete replicas based on count for a given collection. If a shard is passed, uses that
+   * else deletes given num replicas across all shards for the given collection.
+   */
+  void deleteReplicaBasedOnCount(ClusterState clusterState,
+                                 ZkNodeProps message,
+                                 NamedList results,
+                                 Runnable onComplete,
+                                 boolean parallel)
+          throws KeeperException, InterruptedException {
+    ocmh.checkRequired(message, COLLECTION_PROP, COUNT_PROP);
+    int count = Integer.parseInt(message.getStr(COUNT_PROP));
+    String collectionName = message.getStr(COLLECTION_PROP);
+    String shard = message.getStr(SHARD_ID_PROP);
+    DocCollection coll = clusterState.getCollection(collectionName);
+    Slice slice = null;
+    //Validate if shard is passed.
+    if (shard != null) {
+      slice = coll.getSlice(shard);
+      if (slice == null) {
+        throw new SolrException(SolrException.ErrorCode.BAD_REQUEST,
+                "Invalid shard name : " +  shard +  " in collection : " + collectionName);
+      }
+    }
+
+    Map<Slice, Set<String>> shardToReplicasMapping = new HashMap<Slice, Set<String>>();
+    if (slice != null) {
+      Set<String> replicasToBeDeleted = pickReplicasTobeDeleted(slice, shard, collectionName, count);
+      shardToReplicasMapping.put(slice,replicasToBeDeleted);
+    } else {
+
+      //If there are many replicas left, remove the rest based on count.
+      Collection<Slice> allSlices = coll.getSlices();
+      for (Slice individualSlice : allSlices) {
+        Set<String> replicasToBeDeleted = pickReplicasTobeDeleted(individualSlice, individualSlice.getName(), collectionName, count);
+        shardToReplicasMapping.put(individualSlice, replicasToBeDeleted);
+      }
+    }
+
+    for (Slice shardSlice: shardToReplicasMapping.keySet()) {
+      String shardId = shardSlice.getName();
+      Set<String> replicas = shardToReplicasMapping.get(shardSlice);
+      //callDeleteReplica on all replicas
+      for (String replica: replicas) {
+        log.debug("Deleting replica {}  for shard {} based on count {}", replica, shardId, count);
+        deleteCore(shardSlice, collectionName, replica, message, shard, results, onComplete, parallel);
+      }
+      results.add("shard_id", shardId);
+      results.add("replicas_deleted", replicas);
+    }
+
+  }
+
+
+  /**
+   * Pick replicas to be deleted. Avoid picking the leader.
+   */
+  private Set<String> pickReplicasTobeDeleted(Slice slice, String shard, String collectionName, int count) {
+    validateReplicaAvailability(slice, shard, collectionName, count);
+    Collection<Replica> allReplicas = slice.getReplicas();
+    Set<String> replicasToBeRemoved = new HashSet<String>();
+    Replica leader = slice.getLeader();
+    for (Replica replica: allReplicas) {
+      if (count == 0) {
+        break;
+      }
+      //Try avoiding to pick up the leader to minimize activity on the cluster.
+      if (leader.getCoreName().equals(replica.getCoreName())) {
+        continue;
+      }
+      replicasToBeRemoved.add(replica.getName());
+      count --;
+    }
+    return replicasToBeRemoved;
+  }
+
+  /**
+   * Validate if there is less replicas than requested to remove. Also error out if there is
+   * only one replica available
+   */
+  private void validateReplicaAvailability(Slice slice, String shard, String collectionName, int count) {
+    //If there is a specific shard passed, validate if there any or just 1 replica left
+    if (slice != null) {
+      Collection<Replica> allReplicasForShard = slice.getReplicas();
+      if (allReplicasForShard == null) {
+        throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "No replicas found  in shard/collection: " +
+                shard + "/"  + collectionName);
+      }
+
+
+      if (allReplicasForShard.size() == 1) {
+        throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "There is only one replica available in shard/collection: " +
+                shard + "/" + collectionName + ". Cannot delete that.");
+      }
+
+      if (allReplicasForShard.size() <= count) {
+        throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "There are lesser num replicas requested to be deleted than are available in shard/collection : " +
+                shard + "/"  + collectionName  + " Requested: "  + count + " Available: " + allReplicasForShard.size() + ".");
+      }
+    }
+  }
+
+  void deleteCore(Slice slice, String collectionName, String replicaName,ZkNodeProps message, String shard, NamedList results, Runnable onComplete, boolean parallel) throws KeeperException, InterruptedException {
+
+    Replica replica = slice.getReplica(replicaName);
+    if (replica == null) {
+      ArrayList<String> l = new ArrayList<>();
+      for (Replica r : slice.getReplicas())
+        l.add(r.getName());
+      throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Invalid replica : " +  replicaName + " in shard/collection : " +
+              shard  + "/" + collectionName + " available replicas are " +  StrUtils.join(l, ','));
+    }
+
+    // If users are being safe and only want to remove a shard if it is down, they can specify onlyIfDown=true
+    // on the command.
+    if (Boolean.parseBoolean(message.getStr(OverseerCollectionMessageHandler.ONLY_IF_DOWN)) && replica.getState() != Replica.State.DOWN) {
+      throw new SolrException(SolrException.ErrorCode.BAD_REQUEST,
+              "Attempted to remove replica : " + collectionName + "/"  + shard + "/" + replicaName +
+              " with onlyIfDown='true', but state is '" + replica.getStr(ZkStateReader.STATE_PROP) + "'");
+    }
+
+    ShardHandler shardHandler = ocmh.shardHandlerFactory.getShardHandler();
+    String core = replica.getStr(ZkStateReader.CORE_NAME_PROP);
+    String asyncId = message.getStr(ASYNC);
+    AtomicReference<Map<String, String>> requestMap = new AtomicReference<>(null);
+    if (asyncId != null) {
+      requestMap.set(new HashMap<>(1, 1.0f));
+    }
+
+    ModifiableSolrParams params = new ModifiableSolrParams();
+    params.add(CoreAdminParams.ACTION, CoreAdminParams.CoreAdminAction.UNLOAD.toString());
+    params.add(CoreAdminParams.CORE, core);
+
+    params.set(CoreAdminParams.DELETE_INDEX, message.getBool(CoreAdminParams.DELETE_INDEX, true));
+    params.set(CoreAdminParams.DELETE_INSTANCE_DIR, message.getBool(CoreAdminParams.DELETE_INSTANCE_DIR, true));
+    params.set(CoreAdminParams.DELETE_DATA_DIR, message.getBool(CoreAdminParams.DELETE_DATA_DIR, true));
+
+    boolean isLive = ocmh.zkStateReader.getClusterState().getLiveNodes().contains(replica.getNodeName());
+    if (isLive) {
+      ocmh.sendShardRequest(replica.getNodeName(), params, shardHandler, asyncId, requestMap.get());
+    }
+
+    Callable<Boolean> callable = () -> {
+      try {
+        if (isLive) {
+          ocmh.processResponses(results, shardHandler, false, null, asyncId, requestMap.get());
+
+          //check if the core unload removed the corenode zk entry
+          if (ocmh.waitForCoreNodeGone(collectionName, shard, replicaName, 5000)) return Boolean.TRUE;
+        }
+
+        // try and ensure core info is removed from cluster state
+        ocmh.deleteCoreNode(collectionName, replicaName, replica, core);
+        if (ocmh.waitForCoreNodeGone(collectionName, shard, replicaName, 30000)) return Boolean.TRUE;
+        return Boolean.FALSE;
+      } catch (Exception e) {
+        results.add("failure", "Could not complete delete " + e.getMessage());
+        throw e;
+      } finally {
+        if (onComplete != null) onComplete.run();
+      }
+    };
+
+    if (!parallel) {
+      try {
+        if (!callable.call())
+          throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
+                  "Could not remove replica : " + collectionName + "/" + shard + "/" + replicaName);
+      } catch (InterruptedException | KeeperException e) {
+        throw e;
+      } catch (Exception ex) {
+        throw new SolrException(SolrException.ErrorCode.UNKNOWN, "Error waiting for corenode gone", ex);
+      }
+
+    } else {
+      ocmh.tpe.submit(callable);
+    }
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/1c6cc20e/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteShardCmd.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteShardCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteShardCmd.java
new file mode 100644
index 0000000..2ef2955
--- /dev/null
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteShardCmd.java
@@ -0,0 +1,178 @@
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.cloud.api.collections;
+
+import java.lang.invoke.MethodHandles;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.solr.client.solrj.cloud.DistributedQueue;
+import org.apache.solr.cloud.Overseer;
+import org.apache.solr.cloud.overseer.OverseerAction;
+import org.apache.solr.common.SolrException;
+import org.apache.solr.common.cloud.ClusterState;
+import org.apache.solr.common.cloud.DocCollection;
+import org.apache.solr.common.cloud.Replica;
+import org.apache.solr.common.cloud.Slice;
+import org.apache.solr.common.cloud.ZkNodeProps;
+import org.apache.solr.common.cloud.ZkStateReader;
+import org.apache.solr.common.params.CoreAdminParams;
+import org.apache.solr.common.util.NamedList;
+import org.apache.solr.common.util.SimpleOrderedMap;
+import org.apache.solr.common.util.TimeSource;
+import org.apache.solr.common.util.Utils;
+import org.apache.solr.util.TimeOut;
+import org.apache.zookeeper.KeeperException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.solr.common.cloud.ZkStateReader.COLLECTION_PROP;
+import static org.apache.solr.common.cloud.ZkStateReader.NODE_NAME_PROP;
+import static org.apache.solr.common.cloud.ZkStateReader.SHARD_ID_PROP;
+import static org.apache.solr.common.params.CollectionParams.CollectionAction.DELETEREPLICA;
+import static org.apache.solr.common.params.CollectionParams.CollectionAction.DELETESHARD;
+import static org.apache.solr.common.params.CommonAdminParams.ASYNC;
+
+public class DeleteShardCmd implements OverseerCollectionMessageHandler.Cmd {
+  private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+  private final OverseerCollectionMessageHandler ocmh;
+  private final TimeSource timeSource;
+
+  public DeleteShardCmd(OverseerCollectionMessageHandler ocmh) {
+    this.ocmh = ocmh;
+    this.timeSource = ocmh.cloudManager.getTimeSource();
+  }
+
+  @Override
+  public void call(ClusterState clusterState, ZkNodeProps message, NamedList results) throws Exception {
+    String collectionName = message.getStr(ZkStateReader.COLLECTION_PROP);
+    String sliceId = message.getStr(ZkStateReader.SHARD_ID_PROP);
+
+    log.info("Delete shard invoked");
+    Slice slice = clusterState.getCollection(collectionName).getSlice(sliceId);
+    if (slice == null) throw new SolrException(SolrException.ErrorCode.BAD_REQUEST,
+        "No shard with name " + sliceId + " exists for collection " + collectionName);
+
+    // For now, only allow for deletions of Inactive slices or custom hashes (range==null).
+    // TODO: Add check for range gaps on Slice deletion
+    final Slice.State state = slice.getState();
+    if (!(slice.getRange() == null || state == Slice.State.INACTIVE || state == Slice.State.RECOVERY
+        || state == Slice.State.CONSTRUCTION) || state == Slice.State.RECOVERY_FAILED) {
+      throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "The slice: " + slice.getName() + " is currently " + state
+          + ". Only non-active (or custom-hashed) slices can be deleted.");
+    }
+
+    if (state == Slice.State.RECOVERY)  {
+      // mark the slice as 'construction' and only then try to delete the cores
+      // see SOLR-9455
+      DistributedQueue inQueue = Overseer.getStateUpdateQueue(ocmh.zkStateReader.getZkClient());
+      Map<String, Object> propMap = new HashMap<>();
+      propMap.put(Overseer.QUEUE_OPERATION, OverseerAction.UPDATESHARDSTATE.toLower());
+      propMap.put(sliceId, Slice.State.CONSTRUCTION.toString());
+      propMap.put(ZkStateReader.COLLECTION_PROP, collectionName);
+      ZkNodeProps m = new ZkNodeProps(propMap);
+      inQueue.offer(Utils.toJSON(m));
+    }
+
+    String asyncId = message.getStr(ASYNC);
+
+    try {
+      List<ZkNodeProps> replicas = getReplicasForSlice(collectionName, slice);
+      CountDownLatch cleanupLatch = new CountDownLatch(replicas.size());
+      for (ZkNodeProps r : replicas) {
+        final ZkNodeProps replica = r.plus(message.getProperties()).plus("parallel", "true").plus(ASYNC, asyncId);
+        log.info("Deleting replica for collection={} shard={} on node={}", replica.getStr(COLLECTION_PROP), replica.getStr(SHARD_ID_PROP), replica.getStr(CoreAdminParams.NODE));
+        NamedList deleteResult = new NamedList();
+        try {
+          ((DeleteReplicaCmd)ocmh.commandMap.get(DELETEREPLICA)).deleteReplica(clusterState, replica, deleteResult, () -> {
+            cleanupLatch.countDown();
+            if (deleteResult.get("failure") != null) {
+              synchronized (results) {
+                results.add("failure", String.format(Locale.ROOT, "Failed to delete replica for collection=%s shard=%s" +
+                    " on node=%s", replica.getStr(COLLECTION_PROP), replica.getStr(SHARD_ID_PROP), replica.getStr(NODE_NAME_PROP)));
+              }
+            }
+            SimpleOrderedMap success = (SimpleOrderedMap) deleteResult.get("success");
+            if (success != null) {
+              synchronized (results)  {
+                results.add("success", success);
+              }
+            }
+          });
+        } catch (KeeperException e) {
+          log.warn("Error deleting replica: " + r, e);
+          cleanupLatch.countDown();
+        } catch (Exception e) {
+          log.warn("Error deleting replica: " + r, e);
+          cleanupLatch.countDown();
+          throw e;
+        }
+      }
+      log.debug("Waiting for delete shard action to complete");
+      cleanupLatch.await(5, TimeUnit.MINUTES);
+
+      ZkNodeProps m = new ZkNodeProps(Overseer.QUEUE_OPERATION, DELETESHARD.toLower(), ZkStateReader.COLLECTION_PROP,
+          collectionName, ZkStateReader.SHARD_ID_PROP, sliceId);
+      ZkStateReader zkStateReader = ocmh.zkStateReader;
+      Overseer.getStateUpdateQueue(zkStateReader.getZkClient()).offer(Utils.toJSON(m));
+
+      // wait for a while until we don't see the shard
+      TimeOut timeout = new TimeOut(30, TimeUnit.SECONDS, timeSource);
+      boolean removed = false;
+      while (!timeout.hasTimedOut()) {
+        timeout.sleep(100);
+        DocCollection collection = zkStateReader.getClusterState().getCollection(collectionName);
+        removed = collection.getSlice(sliceId) == null;
+        if (removed) {
+          timeout.sleep(100); // just a bit of time so it's more likely other readers see on return
+          break;
+        }
+      }
+      if (!removed) {
+        throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
+            "Could not fully remove collection: " + collectionName + " shard: " + sliceId);
+      }
+
+      log.info("Successfully deleted collection: " + collectionName + ", shard: " + sliceId);
+    } catch (SolrException e) {
+      throw e;
+    } catch (Exception e) {
+      throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
+          "Error executing delete operation for collection: " + collectionName + " shard: " + sliceId, e);
+    }
+  }
+
+  private List<ZkNodeProps> getReplicasForSlice(String collectionName, Slice slice) {
+    List<ZkNodeProps> sourceReplicas = new ArrayList<>();
+    for (Replica replica : slice.getReplicas()) {
+      ZkNodeProps props = new ZkNodeProps(
+          COLLECTION_PROP, collectionName,
+          SHARD_ID_PROP, slice.getName(),
+          ZkStateReader.CORE_NAME_PROP, replica.getCoreName(),
+          ZkStateReader.REPLICA_PROP, replica.getName(),
+          CoreAdminParams.NODE, replica.getNodeName());
+      sourceReplicas.add(props);
+    }
+    return sourceReplicas;
+  }
+}

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/1c6cc20e/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteSnapshotCmd.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteSnapshotCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteSnapshotCmd.java
new file mode 100644
index 0000000..cf0a234
--- /dev/null
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteSnapshotCmd.java
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.cloud.api.collections;
+
+import static org.apache.solr.common.cloud.ZkStateReader.COLLECTION_PROP;
+import static org.apache.solr.common.cloud.ZkStateReader.CORE_NAME_PROP;
+import static org.apache.solr.common.params.CommonAdminParams.ASYNC;
+import static org.apache.solr.common.params.CommonParams.NAME;
+
+import java.lang.invoke.MethodHandles;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+import org.apache.solr.common.SolrException;
+import org.apache.solr.common.SolrException.ErrorCode;
+import org.apache.solr.common.cloud.ClusterState;
+import org.apache.solr.common.cloud.Replica;
+import org.apache.solr.common.cloud.Slice;
+import org.apache.solr.common.cloud.SolrZkClient;
+import org.apache.solr.common.cloud.ZkNodeProps;
+import org.apache.solr.common.cloud.Replica.State;
+import org.apache.solr.common.params.CoreAdminParams;
+import org.apache.solr.common.params.CoreAdminParams.CoreAdminAction;
+import org.apache.solr.common.params.ModifiableSolrParams;
+import org.apache.solr.common.util.NamedList;
+import org.apache.solr.common.util.Utils;
+import org.apache.solr.core.snapshots.CollectionSnapshotMetaData;
+import org.apache.solr.core.snapshots.CollectionSnapshotMetaData.CoreSnapshotMetaData;
+import org.apache.solr.core.snapshots.CollectionSnapshotMetaData.SnapshotStatus;
+import org.apache.solr.core.snapshots.SolrSnapshotManager;
+import org.apache.solr.handler.component.ShardHandler;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This class implements the functionality of deleting a collection level snapshot.
+ */
+public class DeleteSnapshotCmd implements OverseerCollectionMessageHandler.Cmd {
+  private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+  private final OverseerCollectionMessageHandler ocmh;
+
+  public DeleteSnapshotCmd (OverseerCollectionMessageHandler ocmh) {
+    this.ocmh = ocmh;
+  }
+
+  @Override
+  public void call(ClusterState state, ZkNodeProps message, NamedList results) throws Exception {
+    String collectionName =  message.getStr(COLLECTION_PROP);
+    String commitName =  message.getStr(CoreAdminParams.COMMIT_NAME);
+    String asyncId = message.getStr(ASYNC);
+    Map<String, String> requestMap = new HashMap<>();
+    NamedList shardRequestResults = new NamedList();
+    ShardHandler shardHandler = ocmh.shardHandlerFactory.getShardHandler();
+    SolrZkClient zkClient = ocmh.zkStateReader.getZkClient();
+
+    Optional<CollectionSnapshotMetaData> meta = SolrSnapshotManager.getCollectionLevelSnapshot(zkClient, collectionName, commitName);
+    if (!meta.isPresent()) { // Snapshot not found. Nothing to do.
+      return;
+    }
+
+    log.info("Deleting a snapshot for collection={} with commitName={}", collectionName, commitName);
+
+    Set<String> existingCores = new HashSet<>();
+    for (Slice s : ocmh.zkStateReader.getClusterState().getCollection(collectionName).getSlices()) {
+      for (Replica r : s.getReplicas()) {
+        existingCores.add(r.getCoreName());
+      }
+    }
+
+    Set<String> coresWithSnapshot = new HashSet<>();
+    for (CoreSnapshotMetaData m : meta.get().getReplicaSnapshots()) {
+      if (existingCores.contains(m.getCoreName())) {
+        coresWithSnapshot.add(m.getCoreName());
+      }
+    }
+
+    log.info("Existing cores with snapshot for collection={} are {}", collectionName, existingCores);
+    for (Slice slice : ocmh.zkStateReader.getClusterState().getCollection(collectionName).getSlices()) {
+      for (Replica replica : slice.getReplicas()) {
+        if (replica.getState() == State.DOWN) {
+          continue; // Since replica is down - no point sending a request.
+        }
+
+        // Note - when a snapshot is found in_progress state - it is the result of overseer
+        // failure while handling the snapshot creation. Since we don't know the exact set of
+        // replicas to contact at this point, we try on all replicas.
+        if (meta.get().getStatus() == SnapshotStatus.InProgress || coresWithSnapshot.contains(replica.getCoreName())) {
+          String coreName = replica.getStr(CORE_NAME_PROP);
+
+          ModifiableSolrParams params = new ModifiableSolrParams();
+          params.set(CoreAdminParams.ACTION, CoreAdminAction.DELETESNAPSHOT.toString());
+          params.set(NAME, slice.getName());
+          params.set(CORE_NAME_PROP, coreName);
+          params.set(CoreAdminParams.COMMIT_NAME, commitName);
+
+          log.info("Sending deletesnapshot request to core={} with commitName={}", coreName, commitName);
+          ocmh.sendShardRequest(replica.getNodeName(), params, shardHandler, asyncId, requestMap);
+        }
+      }
+    }
+
+    ocmh.processResponses(shardRequestResults, shardHandler, false, null, asyncId, requestMap);
+    NamedList success = (NamedList) shardRequestResults.get("success");
+    List<CoreSnapshotMetaData> replicas = new ArrayList<>();
+    if (success != null) {
+      for ( int i = 0 ; i < success.size() ; i++) {
+        NamedList resp = (NamedList)success.getVal(i);
+        // Unfortunately async processing logic doesn't provide the "core" name automatically.
+        String coreName = (String)resp.get("core");
+        coresWithSnapshot.remove(coreName);
+      }
+    }
+
+    if (!coresWithSnapshot.isEmpty()) { // One or more failures.
+      log.warn("Failed to delete a snapshot for collection {} with commitName = {}. Snapshot could not be deleted for following cores {}",
+          collectionName, commitName, coresWithSnapshot);
+
+      List<CoreSnapshotMetaData> replicasWithSnapshot = new ArrayList<>();
+      for (CoreSnapshotMetaData m : meta.get().getReplicaSnapshots()) {
+        if (coresWithSnapshot.contains(m.getCoreName())) {
+          replicasWithSnapshot.add(m);
+        }
+      }
+
+      // Update the ZK meta-data to include only cores with the snapshot. This will enable users to figure out
+      // which cores still contain the named snapshot.
+      CollectionSnapshotMetaData newResult = new CollectionSnapshotMetaData(meta.get().getName(), SnapshotStatus.Failed,
+          meta.get().getCreationDate(), replicasWithSnapshot);
+      SolrSnapshotManager.updateCollectionLevelSnapshot(zkClient, collectionName, newResult);
+      log.info("Saved snapshot information for collection={} with commitName={} in Zookeeper as follows", collectionName, commitName,
+          Utils.toJSON(newResult));
+      throw new SolrException(ErrorCode.SERVER_ERROR, "Failed to delete snapshot on cores " + coresWithSnapshot);
+
+    } else {
+      // Delete the ZK path so that we eliminate the references of this snapshot from collection level meta-data.
+      SolrSnapshotManager.deleteCollectionLevelSnapshot(zkClient, collectionName, commitName);
+      log.info("Deleted Zookeeper snapshot metdata for collection={} with commitName={}", collectionName, commitName);
+      log.info("Successfully deleted snapshot for collection={} with commitName={}", collectionName, commitName);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/1c6cc20e/solr/core/src/java/org/apache/solr/cloud/api/collections/LeaderRecoveryWatcher.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/LeaderRecoveryWatcher.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/LeaderRecoveryWatcher.java
new file mode 100644
index 0000000..a80fdc0
--- /dev/null
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/LeaderRecoveryWatcher.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.cloud.api.collections;
+
+import java.util.Set;
+
+import org.apache.solr.common.SolrCloseableLatch;
+import org.apache.solr.common.cloud.CollectionStateWatcher;
+import org.apache.solr.common.cloud.DocCollection;
+import org.apache.solr.common.cloud.Replica;
+import org.apache.solr.common.cloud.Slice;
+import org.apache.solr.common.cloud.ZkStateReader;
+
+/**
+ * We use this watcher to wait for any eligible replica in a shard to become active so that it can become a leader.
+ */
+public class LeaderRecoveryWatcher implements CollectionStateWatcher {
+  String collectionId;
+  String shardId;
+  String replicaId;
+  String targetCore;
+  SolrCloseableLatch latch;
+
+  /**
+   * Watch for recovery of a replica
+   *
+   * @param collectionId   collection name
+   * @param shardId        shard id
+   * @param replicaId      source replica name (coreNodeName)
+   * @param targetCore     specific target core name - if null then any active replica will do
+   * @param latch countdown when recovered
+   */
+  LeaderRecoveryWatcher(String collectionId, String shardId, String replicaId, String targetCore, SolrCloseableLatch latch) {
+    this.collectionId = collectionId;
+    this.shardId = shardId;
+    this.replicaId = replicaId;
+    this.targetCore = targetCore;
+    this.latch = latch;
+  }
+
+  @Override
+  public boolean onStateChanged(Set<String> liveNodes, DocCollection collectionState) {
+    if (collectionState == null) { // collection has been deleted - don't wait
+      latch.countDown();
+      return true;
+    }
+    Slice slice = collectionState.getSlice(shardId);
+    if (slice == null) { // shard has been removed - don't wait
+      latch.countDown();
+      return true;
+    }
+    for (Replica replica : slice.getReplicas()) {
+      // check if another replica exists - doesn't have to be the one we're moving
+      // as long as it's active and can become a leader, in which case we don't have to wait
+      // for recovery of specifically the one that we've just added
+      if (!replica.getName().equals(replicaId)) {
+        if (replica.getType().equals(Replica.Type.PULL)) { // not eligible for leader election
+          continue;
+        }
+        // check its state
+        String coreName = replica.getStr(ZkStateReader.CORE_NAME_PROP);
+        if (targetCore != null && !targetCore.equals(coreName)) {
+          continue;
+        }
+        if (replica.isActive(liveNodes)) { // recovered - stop waiting
+          latch.countDown();
+          return true;
+        }
+      }
+    }
+    // set the watch again to wait for the new replica to recover
+    return false;
+  }
+}

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/1c6cc20e/solr/core/src/java/org/apache/solr/cloud/api/collections/MigrateCmd.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/MigrateCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/MigrateCmd.java
new file mode 100644
index 0000000..4edc363
--- /dev/null
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/MigrateCmd.java
@@ -0,0 +1,334 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.cloud.api.collections;
+
+import java.lang.invoke.MethodHandles;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.solr.client.solrj.request.CoreAdminRequest;
+import org.apache.solr.cloud.Overseer;
+import org.apache.solr.cloud.overseer.OverseerAction;
+import org.apache.solr.common.SolrException;
+import org.apache.solr.common.cloud.ClusterState;
+import org.apache.solr.common.cloud.CompositeIdRouter;
+import org.apache.solr.common.cloud.DocCollection;
+import org.apache.solr.common.cloud.DocRouter;
+import org.apache.solr.common.cloud.Replica;
+import org.apache.solr.common.cloud.RoutingRule;
+import org.apache.solr.common.cloud.Slice;
+import org.apache.solr.common.cloud.ZkNodeProps;
+import org.apache.solr.common.cloud.ZkStateReader;
+import org.apache.solr.common.params.CoreAdminParams;
+import org.apache.solr.common.params.ModifiableSolrParams;
+import org.apache.solr.common.util.NamedList;
+import org.apache.solr.common.util.TimeSource;
+import org.apache.solr.common.util.Utils;
+import org.apache.solr.handler.component.ShardHandler;
+import org.apache.solr.handler.component.ShardHandlerFactory;
+import org.apache.solr.update.SolrIndexSplitter;
+import org.apache.solr.util.TimeOut;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.solr.common.cloud.ZkStateReader.COLLECTION_PROP;
+import static org.apache.solr.common.cloud.ZkStateReader.NRT_REPLICAS;
+import static org.apache.solr.common.cloud.ZkStateReader.SHARD_ID_PROP;
+import static org.apache.solr.common.params.CollectionParams.CollectionAction.ADDREPLICA;
+import static org.apache.solr.common.params.CollectionParams.CollectionAction.CREATE;
+import static org.apache.solr.common.params.CollectionParams.CollectionAction.DELETE;
+import static org.apache.solr.common.params.CommonAdminParams.ASYNC;
+import static org.apache.solr.common.params.CommonParams.NAME;
+import static org.apache.solr.common.util.Utils.makeMap;
+
+public class MigrateCmd implements OverseerCollectionMessageHandler.Cmd {
+  private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+  private final OverseerCollectionMessageHandler ocmh;
+  private final TimeSource timeSource;
+
+  public MigrateCmd(OverseerCollectionMessageHandler ocmh) {
+    this.ocmh = ocmh;
+    this.timeSource = ocmh.cloudManager.getTimeSource();
+  }
+
+
+  @Override
+  public void call(ClusterState clusterState, ZkNodeProps message, NamedList results) throws Exception {
+    String sourceCollectionName = message.getStr("collection");
+    String splitKey = message.getStr("split.key");
+    String targetCollectionName = message.getStr("target.collection");
+    int timeout = message.getInt("forward.timeout", 10 * 60) * 1000;
+
+    DocCollection sourceCollection = clusterState.getCollection(sourceCollectionName);
+    if (sourceCollection == null) {
+      throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Unknown source collection: " + sourceCollectionName);
+    }
+    DocCollection targetCollection = clusterState.getCollection(targetCollectionName);
+    if (targetCollection == null) {
+      throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Unknown target collection: " + sourceCollectionName);
+    }
+    if (!(sourceCollection.getRouter() instanceof CompositeIdRouter)) {
+      throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Source collection must use a compositeId router");
+    }
+    if (!(targetCollection.getRouter() instanceof CompositeIdRouter)) {
+      throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Target collection must use a compositeId router");
+    }
+    CompositeIdRouter sourceRouter = (CompositeIdRouter) sourceCollection.getRouter();
+    CompositeIdRouter targetRouter = (CompositeIdRouter) targetCollection.getRouter();
+    Collection<Slice> sourceSlices = sourceRouter.getSearchSlicesSingle(splitKey, null, sourceCollection);
+    if (sourceSlices.isEmpty()) {
+      throw new SolrException(SolrException.ErrorCode.BAD_REQUEST,
+          "No active slices available in source collection: " + sourceCollection + "for given split.key: " + splitKey);
+    }
+    Collection<Slice> targetSlices = targetRouter.getSearchSlicesSingle(splitKey, null, targetCollection);
+    if (targetSlices.isEmpty()) {
+      throw new SolrException(SolrException.ErrorCode.BAD_REQUEST,
+          "No active slices available in target collection: " + targetCollection + "for given split.key: " + splitKey);
+    }
+
+    String asyncId = null;
+    if (message.containsKey(ASYNC) && message.get(ASYNC) != null)
+      asyncId = message.getStr(ASYNC);
+
+    for (Slice sourceSlice : sourceSlices) {
+      for (Slice targetSlice : targetSlices) {
+        log.info("Migrating source shard: {} to target shard: {} for split.key = " + splitKey, sourceSlice, targetSlice);
+        migrateKey(clusterState, sourceCollection, sourceSlice, targetCollection, targetSlice, splitKey,
+            timeout, results, asyncId, message);
+      }
+    }
+  }
+
+  private void migrateKey(ClusterState clusterState, DocCollection sourceCollection, Slice sourceSlice,
+                          DocCollection targetCollection, Slice targetSlice,
+                          String splitKey, int timeout,
+                          NamedList results, String asyncId, ZkNodeProps message) throws Exception {
+    String tempSourceCollectionName = "split_" + sourceSlice.getName() + "_temp_" + targetSlice.getName();
+    ZkStateReader zkStateReader = ocmh.zkStateReader;
+    if (clusterState.hasCollection(tempSourceCollectionName)) {
+      log.info("Deleting temporary collection: " + tempSourceCollectionName);
+      Map<String, Object> props = makeMap(
+          Overseer.QUEUE_OPERATION, DELETE.toLower(),
+          NAME, tempSourceCollectionName);
+
+      try {
+        ocmh.commandMap.get(DELETE).call(zkStateReader.getClusterState(), new ZkNodeProps(props), results);
+        clusterState = zkStateReader.getClusterState();
+      } catch (Exception e) {
+        log.warn("Unable to clean up existing temporary collection: " + tempSourceCollectionName, e);
+      }
+    }
+
+    CompositeIdRouter sourceRouter = (CompositeIdRouter) sourceCollection.getRouter();
+    DocRouter.Range keyHashRange = sourceRouter.keyHashRange(splitKey);
+
+    ShardHandlerFactory shardHandlerFactory = ocmh.shardHandlerFactory;
+    ShardHandler shardHandler = shardHandlerFactory.getShardHandler();
+
+    log.info("Hash range for split.key: {} is: {}", splitKey, keyHashRange);
+    // intersect source range, keyHashRange and target range
+    // this is the range that has to be split from source and transferred to target
+    DocRouter.Range splitRange = ocmh.intersect(targetSlice.getRange(), ocmh.intersect(sourceSlice.getRange(), keyHashRange));
+    if (splitRange == null) {
+      log.info("No common hashes between source shard: {} and target shard: {}", sourceSlice.getName(), targetSlice.getName());
+      return;
+    }
+    log.info("Common hash range between source shard: {} and target shard: {} = " + splitRange, sourceSlice.getName(), targetSlice.getName());
+
+    Replica targetLeader = zkStateReader.getLeaderRetry(targetCollection.getName(), targetSlice.getName(), 10000);
+    // For tracking async calls.
+    Map<String, String> requestMap = new HashMap<>();
+
+    log.info("Asking target leader node: " + targetLeader.getNodeName() + " core: "
+        + targetLeader.getStr("core") + " to buffer updates");
+    ModifiableSolrParams params = new ModifiableSolrParams();
+    params.set(CoreAdminParams.ACTION, CoreAdminParams.CoreAdminAction.REQUESTBUFFERUPDATES.toString());
+    params.set(CoreAdminParams.NAME, targetLeader.getStr("core"));
+
+    ocmh.sendShardRequest(targetLeader.getNodeName(), params, shardHandler, asyncId, requestMap);
+
+    ocmh.processResponses(results, shardHandler, true, "MIGRATE failed to request node to buffer updates", asyncId, requestMap);
+
+    ZkNodeProps m = new ZkNodeProps(
+        Overseer.QUEUE_OPERATION, OverseerAction.ADDROUTINGRULE.toLower(),
+        COLLECTION_PROP, sourceCollection.getName(),
+        SHARD_ID_PROP, sourceSlice.getName(),
+        "routeKey", SolrIndexSplitter.getRouteKey(splitKey) + "!",
+        "range", splitRange.toString(),
+        "targetCollection", targetCollection.getName(),
+        "expireAt", RoutingRule.makeExpiryAt(timeout));
+    log.info("Adding routing rule: " + m);
+    Overseer.getStateUpdateQueue(zkStateReader.getZkClient()).offer(Utils.toJSON(m));
+
+    // wait for a while until we see the new rule
+    log.info("Waiting to see routing rule updated in clusterstate");
+    TimeOut waitUntil = new TimeOut(60, TimeUnit.SECONDS, timeSource);
+    boolean added = false;
+    while (!waitUntil.hasTimedOut()) {
+      waitUntil.sleep(100);
+      sourceCollection = zkStateReader.getClusterState().getCollection(sourceCollection.getName());
+      sourceSlice = sourceCollection.getSlice(sourceSlice.getName());
+      Map<String, RoutingRule> rules = sourceSlice.getRoutingRules();
+      if (rules != null) {
+        RoutingRule rule = rules.get(SolrIndexSplitter.getRouteKey(splitKey) + "!");
+        if (rule != null && rule.getRouteRanges().contains(splitRange)) {
+          added = true;
+          break;
+        }
+      }
+    }
+    if (!added) {
+      throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Could not add routing rule: " + m);
+    }
+
+    log.info("Routing rule added successfully");
+
+    // Create temp core on source shard
+    Replica sourceLeader = zkStateReader.getLeaderRetry(sourceCollection.getName(), sourceSlice.getName(), 10000);
+
+    // create a temporary collection with just one node on the shard leader
+    String configName = zkStateReader.readConfigName(sourceCollection.getName());
+    Map<String, Object> props = makeMap(
+        Overseer.QUEUE_OPERATION, CREATE.toLower(),
+        NAME, tempSourceCollectionName,
+        NRT_REPLICAS, 1,
+        OverseerCollectionMessageHandler.NUM_SLICES, 1,
+        OverseerCollectionMessageHandler.COLL_CONF, configName,
+        OverseerCollectionMessageHandler.CREATE_NODE_SET, sourceLeader.getNodeName());
+    if (asyncId != null) {
+      String internalAsyncId = asyncId + Math.abs(System.nanoTime());
+      props.put(ASYNC, internalAsyncId);
+    }
+
+    log.info("Creating temporary collection: " + props);
+    ocmh.commandMap.get(CREATE).call(clusterState, new ZkNodeProps(props), results);
+    // refresh cluster state
+    clusterState = zkStateReader.getClusterState();
+    Slice tempSourceSlice = clusterState.getCollection(tempSourceCollectionName).getSlices().iterator().next();
+    Replica tempSourceLeader = zkStateReader.getLeaderRetry(tempSourceCollectionName, tempSourceSlice.getName(), 120000);
+
+    String tempCollectionReplica1 = tempSourceLeader.getCoreName();
+    String coreNodeName = ocmh.waitForCoreNodeName(tempSourceCollectionName,
+        sourceLeader.getNodeName(), tempCollectionReplica1);
+    // wait for the replicas to be seen as active on temp source leader
+    log.info("Asking source leader to wait for: " + tempCollectionReplica1 + " to be alive on: " + sourceLeader.getNodeName());
+    CoreAdminRequest.WaitForState cmd = new CoreAdminRequest.WaitForState();
+    cmd.setCoreName(tempCollectionReplica1);
+    cmd.setNodeName(sourceLeader.getNodeName());
+    cmd.setCoreNodeName(coreNodeName);
+    cmd.setState(Replica.State.ACTIVE);
+    cmd.setCheckLive(true);
+    cmd.setOnlyIfLeader(true);
+    // we don't want this to happen asynchronously
+    ocmh.sendShardRequest(tempSourceLeader.getNodeName(), new ModifiableSolrParams(cmd.getParams()), shardHandler, null, null);
+
+    ocmh.processResponses(results, shardHandler, true, "MIGRATE failed to create temp collection leader" +
+        " or timed out waiting for it to come up", asyncId, requestMap);
+
+    log.info("Asking source leader to split index");
+    params = new ModifiableSolrParams();
+    params.set(CoreAdminParams.ACTION, CoreAdminParams.CoreAdminAction.SPLIT.toString());
+    params.set(CoreAdminParams.CORE, sourceLeader.getStr("core"));
+    params.add(CoreAdminParams.TARGET_CORE, tempSourceLeader.getStr("core"));
+    params.set(CoreAdminParams.RANGES, splitRange.toString());
+    params.set("split.key", splitKey);
+
+    String tempNodeName = sourceLeader.getNodeName();
+
+    ocmh.sendShardRequest(tempNodeName, params, shardHandler, asyncId, requestMap);
+    ocmh.processResponses(results, shardHandler, true, "MIGRATE failed to invoke SPLIT core admin command", asyncId, requestMap);
+
+    log.info("Creating a replica of temporary collection: {} on the target leader node: {}",
+        tempSourceCollectionName, targetLeader.getNodeName());
+    String tempCollectionReplica2 = Assign.buildSolrCoreName(ocmh.overseer.getSolrCloudManager().getDistribStateManager(),
+        zkStateReader.getClusterState().getCollection(tempSourceCollectionName), tempSourceSlice.getName(), Replica.Type.NRT);
+    props = new HashMap<>();
+    props.put(Overseer.QUEUE_OPERATION, ADDREPLICA.toLower());
+    props.put(COLLECTION_PROP, tempSourceCollectionName);
+    props.put(SHARD_ID_PROP, tempSourceSlice.getName());
+    props.put("node", targetLeader.getNodeName());
+    props.put(CoreAdminParams.NAME, tempCollectionReplica2);
+    // copy over property params:
+    for (String key : message.keySet()) {
+      if (key.startsWith(OverseerCollectionMessageHandler.COLL_PROP_PREFIX)) {
+        props.put(key, message.getStr(key));
+      }
+    }
+    // add async param
+    if (asyncId != null) {
+      props.put(ASYNC, asyncId);
+    }
+    ((AddReplicaCmd)ocmh.commandMap.get(ADDREPLICA)).addReplica(clusterState, new ZkNodeProps(props), results, null);
+
+    ocmh.processResponses(results, shardHandler, true, "MIGRATE failed to create replica of " +
+        "temporary collection in target leader node.", asyncId, requestMap);
+
+    coreNodeName = ocmh.waitForCoreNodeName(tempSourceCollectionName,
+        targetLeader.getNodeName(), tempCollectionReplica2);
+    // wait for the replicas to be seen as active on temp source leader
+    log.info("Asking temp source leader to wait for: " + tempCollectionReplica2 + " to be alive on: " + targetLeader.getNodeName());
+    cmd = new CoreAdminRequest.WaitForState();
+    cmd.setCoreName(tempSourceLeader.getStr("core"));
+    cmd.setNodeName(targetLeader.getNodeName());
+    cmd.setCoreNodeName(coreNodeName);
+    cmd.setState(Replica.State.ACTIVE);
+    cmd.setCheckLive(true);
+    cmd.setOnlyIfLeader(true);
+    params = new ModifiableSolrParams(cmd.getParams());
+
+    ocmh.sendShardRequest(tempSourceLeader.getNodeName(), params, shardHandler, asyncId, requestMap);
+
+    ocmh.processResponses(results, shardHandler, true, "MIGRATE failed to create temp collection" +
+        " replica or timed out waiting for them to come up", asyncId, requestMap);
+
+    log.info("Successfully created replica of temp source collection on target leader node");
+
+    log.info("Requesting merge of temp source collection replica to target leader");
+    params = new ModifiableSolrParams();
+    params.set(CoreAdminParams.ACTION, CoreAdminParams.CoreAdminAction.MERGEINDEXES.toString());
+    params.set(CoreAdminParams.CORE, targetLeader.getStr("core"));
+    params.set(CoreAdminParams.SRC_CORE, tempCollectionReplica2);
+
+    ocmh.sendShardRequest(targetLeader.getNodeName(), params, shardHandler, asyncId, requestMap);
+    String msg = "MIGRATE failed to merge " + tempCollectionReplica2 + " to "
+        + targetLeader.getStr("core") + " on node: " + targetLeader.getNodeName();
+    ocmh.processResponses(results, shardHandler, true, msg, asyncId, requestMap);
+
+    log.info("Asking target leader to apply buffered updates");
+    params = new ModifiableSolrParams();
+    params.set(CoreAdminParams.ACTION, CoreAdminParams.CoreAdminAction.REQUESTAPPLYUPDATES.toString());
+    params.set(CoreAdminParams.NAME, targetLeader.getStr("core"));
+
+    ocmh.sendShardRequest(targetLeader.getNodeName(), params, shardHandler, asyncId, requestMap);
+    ocmh.processResponses(results, shardHandler, true, "MIGRATE failed to request node to apply buffered updates",
+        asyncId, requestMap);
+
+    try {
+      log.info("Deleting temporary collection: " + tempSourceCollectionName);
+      props = makeMap(
+          Overseer.QUEUE_OPERATION, DELETE.toLower(),
+          NAME, tempSourceCollectionName);
+      ocmh.commandMap.get(DELETE). call(zkStateReader.getClusterState(), new ZkNodeProps(props), results);
+    } catch (Exception e) {
+      log.error("Unable to delete temporary collection: " + tempSourceCollectionName
+          + ". Please remove it manually", e);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/1c6cc20e/solr/core/src/java/org/apache/solr/cloud/api/collections/MoveReplicaCmd.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/MoveReplicaCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/MoveReplicaCmd.java
new file mode 100644
index 0000000..f9392b5
--- /dev/null
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/MoveReplicaCmd.java
@@ -0,0 +1,303 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.cloud.api.collections;
+
+import java.lang.invoke.MethodHandles;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Locale;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.solr.cloud.ActiveReplicaWatcher;
+import org.apache.solr.common.SolrCloseableLatch;
+import org.apache.solr.common.SolrException;
+import org.apache.solr.common.cloud.ClusterState;
+import org.apache.solr.common.cloud.DocCollection;
+import org.apache.solr.common.cloud.Replica;
+import org.apache.solr.common.cloud.Slice;
+import org.apache.solr.common.cloud.ZkNodeProps;
+import org.apache.solr.common.cloud.ZkStateReader;
+import org.apache.solr.common.params.CollectionParams;
+import org.apache.solr.common.params.CoreAdminParams;
+import org.apache.solr.common.util.NamedList;
+import org.apache.solr.common.util.TimeSource;
+import org.apache.solr.common.util.Utils;
+import org.apache.solr.update.UpdateLog;
+import org.apache.solr.util.TimeOut;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.solr.cloud.api.collections.OverseerCollectionMessageHandler.SKIP_CREATE_REPLICA_IN_CLUSTER_STATE;
+import static org.apache.solr.common.cloud.ZkStateReader.COLLECTION_PROP;
+import static org.apache.solr.common.cloud.ZkStateReader.REPLICA_PROP;
+import static org.apache.solr.common.cloud.ZkStateReader.SHARD_ID_PROP;
+import static org.apache.solr.common.params.CommonAdminParams.ASYNC;
+import static org.apache.solr.common.params.CommonAdminParams.IN_PLACE_MOVE;
+import static org.apache.solr.common.params.CommonAdminParams.TIMEOUT;
+import static org.apache.solr.common.params.CommonAdminParams.WAIT_FOR_FINAL_STATE;
+
+public class MoveReplicaCmd implements OverseerCollectionMessageHandler.Cmd {
+  private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+  private final OverseerCollectionMessageHandler ocmh;
+  private final TimeSource timeSource;
+
+  public MoveReplicaCmd(OverseerCollectionMessageHandler ocmh) {
+    this.ocmh = ocmh;
+    this.timeSource = ocmh.cloudManager.getTimeSource();
+  }
+
+  @Override
+  public void call(ClusterState state, ZkNodeProps message, NamedList results) throws Exception {
+    moveReplica(ocmh.zkStateReader.getClusterState(), message, results);
+  }
+
+  private void moveReplica(ClusterState clusterState, ZkNodeProps message, NamedList results) throws Exception {
+    log.debug("moveReplica() : {}", Utils.toJSONString(message));
+    ocmh.checkRequired(message, COLLECTION_PROP, CollectionParams.TARGET_NODE);
+    String collection = message.getStr(COLLECTION_PROP);
+    String targetNode = message.getStr(CollectionParams.TARGET_NODE);
+    boolean waitForFinalState = message.getBool(WAIT_FOR_FINAL_STATE, false);
+    boolean inPlaceMove = message.getBool(IN_PLACE_MOVE, true);
+    int timeout = message.getInt(TIMEOUT, 10 * 60); // 10 minutes
+
+    String async = message.getStr(ASYNC);
+
+    DocCollection coll = clusterState.getCollection(collection);
+    if (coll == null) {
+      throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Collection: " + collection + " does not exist");
+    }
+    if (!clusterState.getLiveNodes().contains(targetNode)) {
+      throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Target node: " + targetNode + " not in live nodes: " + clusterState.getLiveNodes());
+    }
+    Replica replica = null;
+    if (message.containsKey(REPLICA_PROP)) {
+      String replicaName = message.getStr(REPLICA_PROP);
+      replica = coll.getReplica(replicaName);
+      if (replica == null) {
+        throw new SolrException(SolrException.ErrorCode.BAD_REQUEST,
+            "Collection: " + collection + " replica: " + replicaName + " does not exist");
+      }
+    } else {
+      String sourceNode = message.getStr(CollectionParams.SOURCE_NODE, message.getStr(CollectionParams.FROM_NODE));
+      if (sourceNode == null) {
+        throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "'" + CollectionParams.SOURCE_NODE +
+            " or '" + CollectionParams.FROM_NODE + "' is a required param");
+      }
+      String shardId = message.getStr(SHARD_ID_PROP);
+      if (shardId == null) {
+        throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "'" + SHARD_ID_PROP + "' is a required param");
+      }
+      Slice slice = clusterState.getCollection(collection).getSlice(shardId);
+      List<Replica> sliceReplicas = new ArrayList<>(slice.getReplicas());
+      Collections.shuffle(sliceReplicas, OverseerCollectionMessageHandler.RANDOM);
+      // this picks up a single random replica from the sourceNode
+      for (Replica r : slice.getReplicas()) {
+        if (r.getNodeName().equals(sourceNode)) {
+          replica = r;
+        }
+      }
+      if (replica == null) {
+        throw new SolrException(SolrException.ErrorCode.BAD_REQUEST,
+            "Collection: " + collection + " node: " + sourceNode + " does not have any replica belonging to shard: " + shardId);
+      }
+    }
+
+    log.info("Replica will be moved to node {}: {}", targetNode, replica);
+    Slice slice = null;
+    for (Slice s : coll.getSlices()) {
+      if (s.getReplicas().contains(replica)) {
+        slice = s;
+      }
+    }
+    assert slice != null;
+    Object dataDir = replica.get("dataDir");
+    boolean isSharedFS = replica.getBool(ZkStateReader.SHARED_STORAGE_PROP, false) && dataDir != null;
+
+    if (isSharedFS && inPlaceMove) {
+      log.debug("-- moveHdfsReplica");
+      moveHdfsReplica(clusterState, results, dataDir.toString(), targetNode, async, coll, replica, slice, timeout, waitForFinalState);
+    } else {
+      log.debug("-- moveNormalReplica (inPlaceMove=" + inPlaceMove + ", isSharedFS=" + isSharedFS);
+      moveNormalReplica(clusterState, results, targetNode, async, coll, replica, slice, timeout, waitForFinalState);
+    }
+  }
+
+  private void moveHdfsReplica(ClusterState clusterState, NamedList results, String dataDir, String targetNode, String async,
+                                 DocCollection coll, Replica replica, Slice slice, int timeout, boolean waitForFinalState) throws Exception {
+    String skipCreateReplicaInClusterState = "true";
+    if (clusterState.getLiveNodes().contains(replica.getNodeName())) {
+      skipCreateReplicaInClusterState = "false";
+      ZkNodeProps removeReplicasProps = new ZkNodeProps(
+          COLLECTION_PROP, coll.getName(),
+          SHARD_ID_PROP, slice.getName(),
+          REPLICA_PROP, replica.getName()
+      );
+      removeReplicasProps.getProperties().put(CoreAdminParams.DELETE_DATA_DIR, false);
+      removeReplicasProps.getProperties().put(CoreAdminParams.DELETE_INDEX, false);
+      if(async!=null) removeReplicasProps.getProperties().put(ASYNC, async);
+      NamedList deleteResult = new NamedList();
+      ocmh.deleteReplica(clusterState, removeReplicasProps, deleteResult, null);
+      if (deleteResult.get("failure") != null) {
+        String errorString = String.format(Locale.ROOT, "Failed to cleanup replica collection=%s shard=%s name=%s, failure=%s",
+            coll.getName(), slice.getName(), replica.getName(), deleteResult.get("failure"));
+        log.warn(errorString);
+        results.add("failure", errorString);
+        return;
+      }
+
+      TimeOut timeOut = new TimeOut(20L, TimeUnit.SECONDS, timeSource);
+      while (!timeOut.hasTimedOut()) {
+        coll = ocmh.zkStateReader.getClusterState().getCollection(coll.getName());
+        if (coll.getReplica(replica.getName()) != null) {
+          timeOut.sleep(100);
+        } else {
+          break;
+        }
+      }
+      if (timeOut.hasTimedOut()) {
+        results.add("failure", "Still see deleted replica in clusterstate!");
+        return;
+      }
+
+    }
+
+    String ulogDir = replica.getStr(CoreAdminParams.ULOG_DIR);
+    ZkNodeProps addReplicasProps = new ZkNodeProps(
+        COLLECTION_PROP, coll.getName(),
+        SHARD_ID_PROP, slice.getName(),
+        CoreAdminParams.NODE, targetNode,
+        CoreAdminParams.CORE_NODE_NAME, replica.getName(),
+        CoreAdminParams.NAME, replica.getCoreName(),
+        WAIT_FOR_FINAL_STATE, String.valueOf(waitForFinalState),
+        SKIP_CREATE_REPLICA_IN_CLUSTER_STATE, skipCreateReplicaInClusterState,
+        CoreAdminParams.ULOG_DIR, ulogDir.substring(0, ulogDir.lastIndexOf(UpdateLog.TLOG_NAME)),
+        CoreAdminParams.DATA_DIR, dataDir);
+    if(async!=null) addReplicasProps.getProperties().put(ASYNC, async);
+    NamedList addResult = new NamedList();
+    try {
+      ocmh.addReplica(ocmh.zkStateReader.getClusterState(), addReplicasProps, addResult, null);
+    } catch (Exception e) {
+      // fatal error - try rolling back
+      String errorString = String.format(Locale.ROOT, "Failed to create replica for collection=%s shard=%s" +
+          " on node=%s, failure=%s", coll.getName(), slice.getName(), targetNode, addResult.get("failure"));
+      results.add("failure", errorString);
+      log.warn("Error adding replica " + addReplicasProps + " - trying to roll back...", e);
+      addReplicasProps = addReplicasProps.plus(CoreAdminParams.NODE, replica.getNodeName());
+      NamedList rollback = new NamedList();
+      ocmh.addReplica(ocmh.zkStateReader.getClusterState(), addReplicasProps, rollback, null);
+      if (rollback.get("failure") != null) {
+        throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Fatal error during MOVEREPLICA of " + replica
+            + ", collection may be inconsistent: " + rollback.get("failure"));
+      }
+      return;
+    }
+    if (addResult.get("failure") != null) {
+      String errorString = String.format(Locale.ROOT, "Failed to create replica for collection=%s shard=%s" +
+          " on node=%s, failure=%s", coll.getName(), slice.getName(), targetNode, addResult.get("failure"));
+      log.warn(errorString);
+      results.add("failure", errorString);
+      log.debug("--- trying to roll back...");
+      // try to roll back
+      addReplicasProps = addReplicasProps.plus(CoreAdminParams.NODE, replica.getNodeName());
+      NamedList rollback = new NamedList();
+      try {
+        ocmh.addReplica(ocmh.zkStateReader.getClusterState(), addReplicasProps, rollback, null);
+      } catch (Exception e) {
+        throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Fatal error during MOVEREPLICA of " + replica
+            + ", collection may be inconsistent!", e);
+      }
+      if (rollback.get("failure") != null) {
+        throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Fatal error during MOVEREPLICA of " + replica
+            + ", collection may be inconsistent! Failure: " + rollback.get("failure"));
+      }
+      return;
+    } else {
+      String successString = String.format(Locale.ROOT, "MOVEREPLICA action completed successfully, moved replica=%s at node=%s " +
+          "to replica=%s at node=%s", replica.getCoreName(), replica.getNodeName(), replica.getCoreName(), targetNode);
+      results.add("success", successString);
+    }
+  }
+
+  private void moveNormalReplica(ClusterState clusterState, NamedList results, String targetNode, String async,
+                                 DocCollection coll, Replica replica, Slice slice, int timeout, boolean waitForFinalState) throws Exception {
+    String newCoreName = Assign.buildSolrCoreName(ocmh.overseer.getSolrCloudManager().getDistribStateManager(), coll, slice.getName(), replica.getType());
+    ZkNodeProps addReplicasProps = new ZkNodeProps(
+        COLLECTION_PROP, coll.getName(),
+        SHARD_ID_PROP, slice.getName(),
+        CoreAdminParams.NODE, targetNode,
+        CoreAdminParams.NAME, newCoreName);
+    if (async != null) addReplicasProps.getProperties().put(ASYNC, async);
+    NamedList addResult = new NamedList();
+    SolrCloseableLatch countDownLatch = new SolrCloseableLatch(1, ocmh);
+    ActiveReplicaWatcher watcher = null;
+    ZkNodeProps props = ocmh.addReplica(clusterState, addReplicasProps, addResult, null);
+    log.debug("props " + props);
+    if (replica.equals(slice.getLeader()) || waitForFinalState) {
+      watcher = new ActiveReplicaWatcher(coll.getName(), null, Collections.singletonList(newCoreName), countDownLatch);
+      log.debug("-- registered watcher " + watcher);
+      ocmh.zkStateReader.registerCollectionStateWatcher(coll.getName(), watcher);
+    }
+    if (addResult.get("failure") != null) {
+      String errorString = String.format(Locale.ROOT, "Failed to create replica for collection=%s shard=%s" +
+          " on node=%s, failure=", coll.getName(), slice.getName(), targetNode, addResult.get("failure"));
+      log.warn(errorString);
+      results.add("failure", errorString);
+      if (watcher != null) { // unregister
+        ocmh.zkStateReader.removeCollectionStateWatcher(coll.getName(), watcher);
+      }
+      return;
+    }
+    // wait for the other replica to be active if the source replica was a leader
+    if (watcher != null) {
+      try {
+        log.debug("Waiting for leader's replica to recover.");
+        if (!countDownLatch.await(timeout, TimeUnit.SECONDS)) {
+          String errorString = String.format(Locale.ROOT, "Timed out waiting for leader's replica to recover, collection=%s shard=%s" +
+              " on node=%s", coll.getName(), slice.getName(), targetNode);
+          log.warn(errorString);
+          results.add("failure", errorString);
+          return;
+        } else {
+          log.debug("Replica " + watcher.getActiveReplicas() + " is active - deleting the source...");
+        }
+      } finally {
+        ocmh.zkStateReader.removeCollectionStateWatcher(coll.getName(), watcher);
+      }
+    }
+
+    ZkNodeProps removeReplicasProps = new ZkNodeProps(
+        COLLECTION_PROP, coll.getName(),
+        SHARD_ID_PROP, slice.getName(),
+        REPLICA_PROP, replica.getName());
+    if (async != null) removeReplicasProps.getProperties().put(ASYNC, async);
+    NamedList deleteResult = new NamedList();
+    ocmh.deleteReplica(clusterState, removeReplicasProps, deleteResult, null);
+    if (deleteResult.get("failure") != null) {
+      String errorString = String.format(Locale.ROOT, "Failed to cleanup replica collection=%s shard=%s name=%s, failure=%s",
+          coll.getName(), slice.getName(), replica.getName(), deleteResult.get("failure"));
+      log.warn(errorString);
+      results.add("failure", errorString);
+    } else {
+      String successString = String.format(Locale.ROOT, "MOVEREPLICA action completed successfully, moved replica=%s at node=%s " +
+          "to replica=%s at node=%s", replica.getCoreName(), replica.getNodeName(), newCoreName, targetNode);
+      results.add("success", successString);
+    }
+  }
+}