You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ab...@apache.org on 2018/01/23 13:28:08 UTC

[36/51] lucene-solr:jira/solr-11714: SOLR-11817: Move Collections API classes to it's own package

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/a3c4f738/solr/core/src/java/org/apache/solr/cloud/DeleteSnapshotCmd.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/DeleteSnapshotCmd.java b/solr/core/src/java/org/apache/solr/cloud/DeleteSnapshotCmd.java
deleted file mode 100644
index 765f4b9..0000000
--- a/solr/core/src/java/org/apache/solr/cloud/DeleteSnapshotCmd.java
+++ /dev/null
@@ -1,160 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.solr.cloud;
-
-import static org.apache.solr.common.cloud.ZkStateReader.COLLECTION_PROP;
-import static org.apache.solr.common.cloud.ZkStateReader.CORE_NAME_PROP;
-import static org.apache.solr.common.params.CommonAdminParams.ASYNC;
-import static org.apache.solr.common.params.CommonParams.NAME;
-
-import java.lang.invoke.MethodHandles;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-import java.util.Set;
-
-import org.apache.solr.common.SolrException;
-import org.apache.solr.common.SolrException.ErrorCode;
-import org.apache.solr.common.cloud.ClusterState;
-import org.apache.solr.common.cloud.Replica;
-import org.apache.solr.common.cloud.Slice;
-import org.apache.solr.common.cloud.SolrZkClient;
-import org.apache.solr.common.cloud.ZkNodeProps;
-import org.apache.solr.common.cloud.Replica.State;
-import org.apache.solr.common.params.CoreAdminParams;
-import org.apache.solr.common.params.CoreAdminParams.CoreAdminAction;
-import org.apache.solr.common.params.ModifiableSolrParams;
-import org.apache.solr.common.util.NamedList;
-import org.apache.solr.common.util.Utils;
-import org.apache.solr.core.snapshots.CollectionSnapshotMetaData;
-import org.apache.solr.core.snapshots.CollectionSnapshotMetaData.CoreSnapshotMetaData;
-import org.apache.solr.core.snapshots.CollectionSnapshotMetaData.SnapshotStatus;
-import org.apache.solr.core.snapshots.SolrSnapshotManager;
-import org.apache.solr.handler.component.ShardHandler;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * This class implements the functionality of deleting a collection level snapshot.
- */
-public class DeleteSnapshotCmd implements OverseerCollectionMessageHandler.Cmd {
-  private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
-  private final OverseerCollectionMessageHandler ocmh;
-
-  public DeleteSnapshotCmd (OverseerCollectionMessageHandler ocmh) {
-    this.ocmh = ocmh;
-  }
-
-  @Override
-  public void call(ClusterState state, ZkNodeProps message, NamedList results) throws Exception {
-    String collectionName =  message.getStr(COLLECTION_PROP);
-    String commitName =  message.getStr(CoreAdminParams.COMMIT_NAME);
-    String asyncId = message.getStr(ASYNC);
-    Map<String, String> requestMap = new HashMap<>();
-    NamedList shardRequestResults = new NamedList();
-    ShardHandler shardHandler = ocmh.shardHandlerFactory.getShardHandler();
-    SolrZkClient zkClient = ocmh.overseer.getZkController().getZkClient();
-
-    Optional<CollectionSnapshotMetaData> meta = SolrSnapshotManager.getCollectionLevelSnapshot(zkClient, collectionName, commitName);
-    if (!meta.isPresent()) { // Snapshot not found. Nothing to do.
-      return;
-    }
-
-    log.info("Deleting a snapshot for collection={} with commitName={}", collectionName, commitName);
-
-    Set<String> existingCores = new HashSet<>();
-    for (Slice s : ocmh.zkStateReader.getClusterState().getCollection(collectionName).getSlices()) {
-      for (Replica r : s.getReplicas()) {
-        existingCores.add(r.getCoreName());
-      }
-    }
-
-    Set<String> coresWithSnapshot = new HashSet<>();
-    for (CoreSnapshotMetaData m : meta.get().getReplicaSnapshots()) {
-      if (existingCores.contains(m.getCoreName())) {
-        coresWithSnapshot.add(m.getCoreName());
-      }
-    }
-
-    log.info("Existing cores with snapshot for collection={} are {}", collectionName, existingCores);
-    for (Slice slice : ocmh.zkStateReader.getClusterState().getCollection(collectionName).getSlices()) {
-      for (Replica replica : slice.getReplicas()) {
-        if (replica.getState() == State.DOWN) {
-          continue; // Since replica is down - no point sending a request.
-        }
-
-        // Note - when a snapshot is found in_progress state - it is the result of overseer
-        // failure while handling the snapshot creation. Since we don't know the exact set of
-        // replicas to contact at this point, we try on all replicas.
-        if (meta.get().getStatus() == SnapshotStatus.InProgress || coresWithSnapshot.contains(replica.getCoreName())) {
-          String coreName = replica.getStr(CORE_NAME_PROP);
-
-          ModifiableSolrParams params = new ModifiableSolrParams();
-          params.set(CoreAdminParams.ACTION, CoreAdminAction.DELETESNAPSHOT.toString());
-          params.set(NAME, slice.getName());
-          params.set(CORE_NAME_PROP, coreName);
-          params.set(CoreAdminParams.COMMIT_NAME, commitName);
-
-          log.info("Sending deletesnapshot request to core={} with commitName={}", coreName, commitName);
-          ocmh.sendShardRequest(replica.getNodeName(), params, shardHandler, asyncId, requestMap);
-        }
-      }
-    }
-
-    ocmh.processResponses(shardRequestResults, shardHandler, false, null, asyncId, requestMap);
-    NamedList success = (NamedList) shardRequestResults.get("success");
-    List<CoreSnapshotMetaData> replicas = new ArrayList<>();
-    if (success != null) {
-      for ( int i = 0 ; i < success.size() ; i++) {
-        NamedList resp = (NamedList)success.getVal(i);
-        // Unfortunately async processing logic doesn't provide the "core" name automatically.
-        String coreName = (String)resp.get("core");
-        coresWithSnapshot.remove(coreName);
-      }
-    }
-
-    if (!coresWithSnapshot.isEmpty()) { // One or more failures.
-      log.warn("Failed to delete a snapshot for collection {} with commitName = {}. Snapshot could not be deleted for following cores {}",
-          collectionName, commitName, coresWithSnapshot);
-
-      List<CoreSnapshotMetaData> replicasWithSnapshot = new ArrayList<>();
-      for (CoreSnapshotMetaData m : meta.get().getReplicaSnapshots()) {
-        if (coresWithSnapshot.contains(m.getCoreName())) {
-          replicasWithSnapshot.add(m);
-        }
-      }
-
-      // Update the ZK meta-data to include only cores with the snapshot. This will enable users to figure out
-      // which cores still contain the named snapshot.
-      CollectionSnapshotMetaData newResult = new CollectionSnapshotMetaData(meta.get().getName(), SnapshotStatus.Failed,
-          meta.get().getCreationDate(), replicasWithSnapshot);
-      SolrSnapshotManager.updateCollectionLevelSnapshot(zkClient, collectionName, newResult);
-      log.info("Saved snapshot information for collection={} with commitName={} in Zookeeper as follows", collectionName, commitName,
-          Utils.toJSON(newResult));
-      throw new SolrException(ErrorCode.SERVER_ERROR, "Failed to delete snapshot on cores " + coresWithSnapshot);
-
-    } else {
-      // Delete the ZK path so that we eliminate the references of this snapshot from collection level meta-data.
-      SolrSnapshotManager.deleteCollectionLevelSnapshot(zkClient, collectionName, commitName);
-      log.info("Deleted Zookeeper snapshot metdata for collection={} with commitName={}", collectionName, commitName);
-      log.info("Successfully deleted snapshot for collection={} with commitName={}", collectionName, commitName);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/a3c4f738/solr/core/src/java/org/apache/solr/cloud/ExclusiveSliceProperty.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/ExclusiveSliceProperty.java b/solr/core/src/java/org/apache/solr/cloud/ExclusiveSliceProperty.java
index 2faf6e9..953023f 100644
--- a/solr/core/src/java/org/apache/solr/cloud/ExclusiveSliceProperty.java
+++ b/solr/core/src/java/org/apache/solr/cloud/ExclusiveSliceProperty.java
@@ -28,6 +28,7 @@ import java.util.Random;
 import java.util.Set;
 
 import org.apache.commons.lang.StringUtils;
+import org.apache.solr.cloud.api.collections.OverseerCollectionMessageHandler;
 import org.apache.solr.cloud.overseer.ClusterStateMutator;
 import org.apache.solr.cloud.overseer.CollectionMutator;
 import org.apache.solr.cloud.overseer.SliceMutator;
@@ -39,8 +40,8 @@ import org.apache.solr.common.cloud.Slice;
 import org.apache.solr.common.cloud.ZkNodeProps;
 import org.apache.solr.common.cloud.ZkStateReader;
 
-import static org.apache.solr.cloud.OverseerCollectionMessageHandler.ONLY_ACTIVE_NODES;
-import static org.apache.solr.cloud.OverseerCollectionMessageHandler.SHARD_UNIQUE;
+import static org.apache.solr.cloud.api.collections.OverseerCollectionMessageHandler.ONLY_ACTIVE_NODES;
+import static org.apache.solr.cloud.api.collections.OverseerCollectionMessageHandler.SHARD_UNIQUE;
 import static org.apache.solr.common.params.CollectionParams.CollectionAction.BALANCESHARDUNIQUE;
 
 // Class to encapsulate processing replica properties that have at most one replica hosting a property per slice.

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/a3c4f738/solr/core/src/java/org/apache/solr/cloud/LeaderRecoveryWatcher.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/LeaderRecoveryWatcher.java b/solr/core/src/java/org/apache/solr/cloud/LeaderRecoveryWatcher.java
deleted file mode 100644
index 1eb4873..0000000
--- a/solr/core/src/java/org/apache/solr/cloud/LeaderRecoveryWatcher.java
+++ /dev/null
@@ -1,88 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.solr.cloud;
-
-import java.util.Set;
-
-import org.apache.solr.common.SolrCloseableLatch;
-import org.apache.solr.common.cloud.CollectionStateWatcher;
-import org.apache.solr.common.cloud.DocCollection;
-import org.apache.solr.common.cloud.Replica;
-import org.apache.solr.common.cloud.Slice;
-import org.apache.solr.common.cloud.ZkStateReader;
-
-/**
- * We use this watcher to wait for any eligible replica in a shard to become active so that it can become a leader.
- */
-public class LeaderRecoveryWatcher implements CollectionStateWatcher {
-  String collectionId;
-  String shardId;
-  String replicaId;
-  String targetCore;
-  SolrCloseableLatch latch;
-
-  /**
-   * Watch for recovery of a replica
-   *
-   * @param collectionId   collection name
-   * @param shardId        shard id
-   * @param replicaId      source replica name (coreNodeName)
-   * @param targetCore     specific target core name - if null then any active replica will do
-   * @param latch countdown when recovered
-   */
-  LeaderRecoveryWatcher(String collectionId, String shardId, String replicaId, String targetCore, SolrCloseableLatch latch) {
-    this.collectionId = collectionId;
-    this.shardId = shardId;
-    this.replicaId = replicaId;
-    this.targetCore = targetCore;
-    this.latch = latch;
-  }
-
-  @Override
-  public boolean onStateChanged(Set<String> liveNodes, DocCollection collectionState) {
-    if (collectionState == null) { // collection has been deleted - don't wait
-      latch.countDown();
-      return true;
-    }
-    Slice slice = collectionState.getSlice(shardId);
-    if (slice == null) { // shard has been removed - don't wait
-      latch.countDown();
-      return true;
-    }
-    for (Replica replica : slice.getReplicas()) {
-      // check if another replica exists - doesn't have to be the one we're moving
-      // as long as it's active and can become a leader, in which case we don't have to wait
-      // for recovery of specifically the one that we've just added
-      if (!replica.getName().equals(replicaId)) {
-        if (replica.getType().equals(Replica.Type.PULL)) { // not eligible for leader election
-          continue;
-        }
-        // check its state
-        String coreName = replica.getStr(ZkStateReader.CORE_NAME_PROP);
-        if (targetCore != null && !targetCore.equals(coreName)) {
-          continue;
-        }
-        if (replica.isActive(liveNodes)) { // recovered - stop waiting
-          latch.countDown();
-          return true;
-        }
-      }
-    }
-    // set the watch again to wait for the new replica to recover
-    return false;
-  }
-}

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/a3c4f738/solr/core/src/java/org/apache/solr/cloud/MigrateCmd.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/MigrateCmd.java b/solr/core/src/java/org/apache/solr/cloud/MigrateCmd.java
deleted file mode 100644
index 02fdb5c..0000000
--- a/solr/core/src/java/org/apache/solr/cloud/MigrateCmd.java
+++ /dev/null
@@ -1,337 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.solr.cloud;
-
-import java.lang.invoke.MethodHandles;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.solr.client.solrj.request.CoreAdminRequest;
-import org.apache.solr.cloud.overseer.OverseerAction;
-import org.apache.solr.common.SolrException;
-import org.apache.solr.common.cloud.ClusterState;
-import org.apache.solr.common.cloud.CompositeIdRouter;
-import org.apache.solr.common.cloud.DocCollection;
-import org.apache.solr.common.cloud.DocRouter;
-import org.apache.solr.common.cloud.Replica;
-import org.apache.solr.common.cloud.RoutingRule;
-import org.apache.solr.common.cloud.Slice;
-import org.apache.solr.common.cloud.ZkNodeProps;
-import org.apache.solr.common.cloud.ZkStateReader;
-import org.apache.solr.common.params.CoreAdminParams;
-import org.apache.solr.common.params.ModifiableSolrParams;
-import org.apache.solr.common.util.NamedList;
-import org.apache.solr.common.util.TimeSource;
-import org.apache.solr.common.util.Utils;
-import org.apache.solr.handler.component.ShardHandler;
-import org.apache.solr.handler.component.ShardHandlerFactory;
-import org.apache.solr.update.SolrIndexSplitter;
-import org.apache.solr.util.TimeOut;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import static org.apache.solr.cloud.OverseerCollectionMessageHandler.COLL_CONF;
-import static org.apache.solr.cloud.OverseerCollectionMessageHandler.COLL_PROP_PREFIX;
-import static org.apache.solr.cloud.OverseerCollectionMessageHandler.CREATE_NODE_SET;
-import static org.apache.solr.cloud.OverseerCollectionMessageHandler.NUM_SLICES;
-import static org.apache.solr.common.cloud.ZkStateReader.COLLECTION_PROP;
-import static org.apache.solr.common.cloud.ZkStateReader.NRT_REPLICAS;
-import static org.apache.solr.common.cloud.ZkStateReader.SHARD_ID_PROP;
-import static org.apache.solr.common.params.CollectionParams.CollectionAction.ADDREPLICA;
-import static org.apache.solr.common.params.CollectionParams.CollectionAction.CREATE;
-import static org.apache.solr.common.params.CollectionParams.CollectionAction.DELETE;
-import static org.apache.solr.common.params.CommonAdminParams.ASYNC;
-import static org.apache.solr.common.params.CommonParams.NAME;
-import static org.apache.solr.common.util.Utils.makeMap;
-
-public class MigrateCmd implements OverseerCollectionMessageHandler.Cmd {
-  private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
-  private final OverseerCollectionMessageHandler ocmh;
-  private final TimeSource timeSource;
-
-  public MigrateCmd(OverseerCollectionMessageHandler ocmh) {
-    this.ocmh = ocmh;
-    this.timeSource = ocmh.cloudManager.getTimeSource();
-  }
-
-
-  @Override
-  public void call(ClusterState clusterState, ZkNodeProps message, NamedList results) throws Exception {
-    String sourceCollectionName = message.getStr("collection");
-    String splitKey = message.getStr("split.key");
-    String targetCollectionName = message.getStr("target.collection");
-    int timeout = message.getInt("forward.timeout", 10 * 60) * 1000;
-
-    DocCollection sourceCollection = clusterState.getCollection(sourceCollectionName);
-    if (sourceCollection == null) {
-      throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Unknown source collection: " + sourceCollectionName);
-    }
-    DocCollection targetCollection = clusterState.getCollection(targetCollectionName);
-    if (targetCollection == null) {
-      throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Unknown target collection: " + sourceCollectionName);
-    }
-    if (!(sourceCollection.getRouter() instanceof CompositeIdRouter)) {
-      throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Source collection must use a compositeId router");
-    }
-    if (!(targetCollection.getRouter() instanceof CompositeIdRouter)) {
-      throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Target collection must use a compositeId router");
-    }
-    CompositeIdRouter sourceRouter = (CompositeIdRouter) sourceCollection.getRouter();
-    CompositeIdRouter targetRouter = (CompositeIdRouter) targetCollection.getRouter();
-    Collection<Slice> sourceSlices = sourceRouter.getSearchSlicesSingle(splitKey, null, sourceCollection);
-    if (sourceSlices.isEmpty()) {
-      throw new SolrException(SolrException.ErrorCode.BAD_REQUEST,
-          "No active slices available in source collection: " + sourceCollection + "for given split.key: " + splitKey);
-    }
-    Collection<Slice> targetSlices = targetRouter.getSearchSlicesSingle(splitKey, null, targetCollection);
-    if (targetSlices.isEmpty()) {
-      throw new SolrException(SolrException.ErrorCode.BAD_REQUEST,
-          "No active slices available in target collection: " + targetCollection + "for given split.key: " + splitKey);
-    }
-
-    String asyncId = null;
-    if (message.containsKey(ASYNC) && message.get(ASYNC) != null)
-      asyncId = message.getStr(ASYNC);
-
-    for (Slice sourceSlice : sourceSlices) {
-      for (Slice targetSlice : targetSlices) {
-        log.info("Migrating source shard: {} to target shard: {} for split.key = " + splitKey, sourceSlice, targetSlice);
-        migrateKey(clusterState, sourceCollection, sourceSlice, targetCollection, targetSlice, splitKey,
-            timeout, results, asyncId, message);
-      }
-    }
-  }
-
-  private void migrateKey(ClusterState clusterState, DocCollection sourceCollection, Slice sourceSlice,
-                          DocCollection targetCollection, Slice targetSlice,
-                          String splitKey, int timeout,
-                          NamedList results, String asyncId, ZkNodeProps message) throws Exception {
-    String tempSourceCollectionName = "split_" + sourceSlice.getName() + "_temp_" + targetSlice.getName();
-    ZkStateReader zkStateReader = ocmh.zkStateReader;
-    if (clusterState.hasCollection(tempSourceCollectionName)) {
-      log.info("Deleting temporary collection: " + tempSourceCollectionName);
-      Map<String, Object> props = makeMap(
-          Overseer.QUEUE_OPERATION, DELETE.toLower(),
-          NAME, tempSourceCollectionName);
-
-      try {
-        ocmh.commandMap.get(DELETE).call(zkStateReader.getClusterState(), new ZkNodeProps(props), results);
-        clusterState = zkStateReader.getClusterState();
-      } catch (Exception e) {
-        log.warn("Unable to clean up existing temporary collection: " + tempSourceCollectionName, e);
-      }
-    }
-
-    CompositeIdRouter sourceRouter = (CompositeIdRouter) sourceCollection.getRouter();
-    DocRouter.Range keyHashRange = sourceRouter.keyHashRange(splitKey);
-
-    ShardHandlerFactory shardHandlerFactory = ocmh.shardHandlerFactory;
-    ShardHandler shardHandler = shardHandlerFactory.getShardHandler();
-
-    log.info("Hash range for split.key: {} is: {}", splitKey, keyHashRange);
-    // intersect source range, keyHashRange and target range
-    // this is the range that has to be split from source and transferred to target
-    DocRouter.Range splitRange = ocmh.intersect(targetSlice.getRange(), ocmh.intersect(sourceSlice.getRange(), keyHashRange));
-    if (splitRange == null) {
-      log.info("No common hashes between source shard: {} and target shard: {}", sourceSlice.getName(), targetSlice.getName());
-      return;
-    }
-    log.info("Common hash range between source shard: {} and target shard: {} = " + splitRange, sourceSlice.getName(), targetSlice.getName());
-
-    Replica targetLeader = zkStateReader.getLeaderRetry(targetCollection.getName(), targetSlice.getName(), 10000);
-    // For tracking async calls.
-    Map<String, String> requestMap = new HashMap<>();
-
-    log.info("Asking target leader node: " + targetLeader.getNodeName() + " core: "
-        + targetLeader.getStr("core") + " to buffer updates");
-    ModifiableSolrParams params = new ModifiableSolrParams();
-    params.set(CoreAdminParams.ACTION, CoreAdminParams.CoreAdminAction.REQUESTBUFFERUPDATES.toString());
-    params.set(CoreAdminParams.NAME, targetLeader.getStr("core"));
-
-    ocmh.sendShardRequest(targetLeader.getNodeName(), params, shardHandler, asyncId, requestMap);
-
-    ocmh.processResponses(results, shardHandler, true, "MIGRATE failed to request node to buffer updates", asyncId, requestMap);
-
-    ZkNodeProps m = new ZkNodeProps(
-        Overseer.QUEUE_OPERATION, OverseerAction.ADDROUTINGRULE.toLower(),
-        COLLECTION_PROP, sourceCollection.getName(),
-        SHARD_ID_PROP, sourceSlice.getName(),
-        "routeKey", SolrIndexSplitter.getRouteKey(splitKey) + "!",
-        "range", splitRange.toString(),
-        "targetCollection", targetCollection.getName(),
-        "expireAt", RoutingRule.makeExpiryAt(timeout));
-    log.info("Adding routing rule: " + m);
-    Overseer.getStateUpdateQueue(zkStateReader.getZkClient()).offer(Utils.toJSON(m));
-
-    // wait for a while until we see the new rule
-    log.info("Waiting to see routing rule updated in clusterstate");
-    TimeOut waitUntil = new TimeOut(60, TimeUnit.SECONDS, timeSource);
-    boolean added = false;
-    while (!waitUntil.hasTimedOut()) {
-      waitUntil.sleep(100);
-      sourceCollection = zkStateReader.getClusterState().getCollection(sourceCollection.getName());
-      sourceSlice = sourceCollection.getSlice(sourceSlice.getName());
-      Map<String, RoutingRule> rules = sourceSlice.getRoutingRules();
-      if (rules != null) {
-        RoutingRule rule = rules.get(SolrIndexSplitter.getRouteKey(splitKey) + "!");
-        if (rule != null && rule.getRouteRanges().contains(splitRange)) {
-          added = true;
-          break;
-        }
-      }
-    }
-    if (!added) {
-      throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Could not add routing rule: " + m);
-    }
-
-    log.info("Routing rule added successfully");
-
-    // Create temp core on source shard
-    Replica sourceLeader = zkStateReader.getLeaderRetry(sourceCollection.getName(), sourceSlice.getName(), 10000);
-
-    // create a temporary collection with just one node on the shard leader
-    String configName = zkStateReader.readConfigName(sourceCollection.getName());
-    Map<String, Object> props = makeMap(
-        Overseer.QUEUE_OPERATION, CREATE.toLower(),
-        NAME, tempSourceCollectionName,
-        NRT_REPLICAS, 1,
-        NUM_SLICES, 1,
-        COLL_CONF, configName,
-        CREATE_NODE_SET, sourceLeader.getNodeName());
-    if (asyncId != null) {
-      String internalAsyncId = asyncId + Math.abs(System.nanoTime());
-      props.put(ASYNC, internalAsyncId);
-    }
-
-    log.info("Creating temporary collection: " + props);
-    ocmh.commandMap.get(CREATE).call(clusterState, new ZkNodeProps(props), results);
-    // refresh cluster state
-    clusterState = zkStateReader.getClusterState();
-    Slice tempSourceSlice = clusterState.getCollection(tempSourceCollectionName).getSlices().iterator().next();
-    Replica tempSourceLeader = zkStateReader.getLeaderRetry(tempSourceCollectionName, tempSourceSlice.getName(), 120000);
-
-    String tempCollectionReplica1 = tempSourceLeader.getCoreName();
-    String coreNodeName = ocmh.waitForCoreNodeName(tempSourceCollectionName,
-        sourceLeader.getNodeName(), tempCollectionReplica1);
-    // wait for the replicas to be seen as active on temp source leader
-    log.info("Asking source leader to wait for: " + tempCollectionReplica1 + " to be alive on: " + sourceLeader.getNodeName());
-    CoreAdminRequest.WaitForState cmd = new CoreAdminRequest.WaitForState();
-    cmd.setCoreName(tempCollectionReplica1);
-    cmd.setNodeName(sourceLeader.getNodeName());
-    cmd.setCoreNodeName(coreNodeName);
-    cmd.setState(Replica.State.ACTIVE);
-    cmd.setCheckLive(true);
-    cmd.setOnlyIfLeader(true);
-    // we don't want this to happen asynchronously
-    ocmh.sendShardRequest(tempSourceLeader.getNodeName(), new ModifiableSolrParams(cmd.getParams()), shardHandler, null, null);
-
-    ocmh.processResponses(results, shardHandler, true, "MIGRATE failed to create temp collection leader" +
-        " or timed out waiting for it to come up", asyncId, requestMap);
-
-    log.info("Asking source leader to split index");
-    params = new ModifiableSolrParams();
-    params.set(CoreAdminParams.ACTION, CoreAdminParams.CoreAdminAction.SPLIT.toString());
-    params.set(CoreAdminParams.CORE, sourceLeader.getStr("core"));
-    params.add(CoreAdminParams.TARGET_CORE, tempSourceLeader.getStr("core"));
-    params.set(CoreAdminParams.RANGES, splitRange.toString());
-    params.set("split.key", splitKey);
-
-    String tempNodeName = sourceLeader.getNodeName();
-
-    ocmh.sendShardRequest(tempNodeName, params, shardHandler, asyncId, requestMap);
-    ocmh.processResponses(results, shardHandler, true, "MIGRATE failed to invoke SPLIT core admin command", asyncId, requestMap);
-
-    log.info("Creating a replica of temporary collection: {} on the target leader node: {}",
-        tempSourceCollectionName, targetLeader.getNodeName());
-    String tempCollectionReplica2 = Assign.buildSolrCoreName(ocmh.overseer.getSolrCloudManager().getDistribStateManager(),
-        zkStateReader.getClusterState().getCollection(tempSourceCollectionName), tempSourceSlice.getName(), Replica.Type.NRT);
-    props = new HashMap<>();
-    props.put(Overseer.QUEUE_OPERATION, ADDREPLICA.toLower());
-    props.put(COLLECTION_PROP, tempSourceCollectionName);
-    props.put(SHARD_ID_PROP, tempSourceSlice.getName());
-    props.put("node", targetLeader.getNodeName());
-    props.put(CoreAdminParams.NAME, tempCollectionReplica2);
-    // copy over property params:
-    for (String key : message.keySet()) {
-      if (key.startsWith(COLL_PROP_PREFIX)) {
-        props.put(key, message.getStr(key));
-      }
-    }
-    // add async param
-    if (asyncId != null) {
-      props.put(ASYNC, asyncId);
-    }
-    ((AddReplicaCmd)ocmh.commandMap.get(ADDREPLICA)).addReplica(clusterState, new ZkNodeProps(props), results, null);
-
-    ocmh.processResponses(results, shardHandler, true, "MIGRATE failed to create replica of " +
-        "temporary collection in target leader node.", asyncId, requestMap);
-
-    coreNodeName = ocmh.waitForCoreNodeName(tempSourceCollectionName,
-        targetLeader.getNodeName(), tempCollectionReplica2);
-    // wait for the replicas to be seen as active on temp source leader
-    log.info("Asking temp source leader to wait for: " + tempCollectionReplica2 + " to be alive on: " + targetLeader.getNodeName());
-    cmd = new CoreAdminRequest.WaitForState();
-    cmd.setCoreName(tempSourceLeader.getStr("core"));
-    cmd.setNodeName(targetLeader.getNodeName());
-    cmd.setCoreNodeName(coreNodeName);
-    cmd.setState(Replica.State.ACTIVE);
-    cmd.setCheckLive(true);
-    cmd.setOnlyIfLeader(true);
-    params = new ModifiableSolrParams(cmd.getParams());
-
-    ocmh.sendShardRequest(tempSourceLeader.getNodeName(), params, shardHandler, asyncId, requestMap);
-
-    ocmh.processResponses(results, shardHandler, true, "MIGRATE failed to create temp collection" +
-        " replica or timed out waiting for them to come up", asyncId, requestMap);
-
-    log.info("Successfully created replica of temp source collection on target leader node");
-
-    log.info("Requesting merge of temp source collection replica to target leader");
-    params = new ModifiableSolrParams();
-    params.set(CoreAdminParams.ACTION, CoreAdminParams.CoreAdminAction.MERGEINDEXES.toString());
-    params.set(CoreAdminParams.CORE, targetLeader.getStr("core"));
-    params.set(CoreAdminParams.SRC_CORE, tempCollectionReplica2);
-
-    ocmh.sendShardRequest(targetLeader.getNodeName(), params, shardHandler, asyncId, requestMap);
-    String msg = "MIGRATE failed to merge " + tempCollectionReplica2 + " to "
-        + targetLeader.getStr("core") + " on node: " + targetLeader.getNodeName();
-    ocmh.processResponses(results, shardHandler, true, msg, asyncId, requestMap);
-
-    log.info("Asking target leader to apply buffered updates");
-    params = new ModifiableSolrParams();
-    params.set(CoreAdminParams.ACTION, CoreAdminParams.CoreAdminAction.REQUESTAPPLYUPDATES.toString());
-    params.set(CoreAdminParams.NAME, targetLeader.getStr("core"));
-
-    ocmh.sendShardRequest(targetLeader.getNodeName(), params, shardHandler, asyncId, requestMap);
-    ocmh.processResponses(results, shardHandler, true, "MIGRATE failed to request node to apply buffered updates",
-        asyncId, requestMap);
-
-    try {
-      log.info("Deleting temporary collection: " + tempSourceCollectionName);
-      props = makeMap(
-          Overseer.QUEUE_OPERATION, DELETE.toLower(),
-          NAME, tempSourceCollectionName);
-      ocmh.commandMap.get(DELETE). call(zkStateReader.getClusterState(), new ZkNodeProps(props), results);
-    } catch (Exception e) {
-      log.error("Unable to delete temporary collection: " + tempSourceCollectionName
-          + ". Please remove it manually", e);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/a3c4f738/solr/core/src/java/org/apache/solr/cloud/MoveReplicaCmd.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/MoveReplicaCmd.java b/solr/core/src/java/org/apache/solr/cloud/MoveReplicaCmd.java
deleted file mode 100644
index 44493ec..0000000
--- a/solr/core/src/java/org/apache/solr/cloud/MoveReplicaCmd.java
+++ /dev/null
@@ -1,302 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.solr.cloud;
-
-import java.lang.invoke.MethodHandles;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.Locale;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.solr.common.SolrCloseableLatch;
-import org.apache.solr.common.SolrException;
-import org.apache.solr.common.cloud.ClusterState;
-import org.apache.solr.common.cloud.DocCollection;
-import org.apache.solr.common.cloud.Replica;
-import org.apache.solr.common.cloud.Slice;
-import org.apache.solr.common.cloud.ZkNodeProps;
-import org.apache.solr.common.cloud.ZkStateReader;
-import org.apache.solr.common.params.CollectionParams;
-import org.apache.solr.common.params.CoreAdminParams;
-import org.apache.solr.common.util.NamedList;
-import org.apache.solr.common.util.TimeSource;
-import org.apache.solr.common.util.Utils;
-import org.apache.solr.update.UpdateLog;
-import org.apache.solr.util.TimeOut;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import static org.apache.solr.cloud.OverseerCollectionMessageHandler.*;
-import static org.apache.solr.common.cloud.ZkStateReader.COLLECTION_PROP;
-import static org.apache.solr.common.cloud.ZkStateReader.REPLICA_PROP;
-import static org.apache.solr.common.cloud.ZkStateReader.SHARD_ID_PROP;
-import static org.apache.solr.common.params.CommonAdminParams.ASYNC;
-import static org.apache.solr.common.params.CommonAdminParams.IN_PLACE_MOVE;
-import static org.apache.solr.common.params.CommonAdminParams.TIMEOUT;
-import static org.apache.solr.common.params.CommonAdminParams.WAIT_FOR_FINAL_STATE;
-
-public class MoveReplicaCmd implements Cmd{
-  private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
-
-  private final OverseerCollectionMessageHandler ocmh;
-  private final TimeSource timeSource;
-
-  public MoveReplicaCmd(OverseerCollectionMessageHandler ocmh) {
-    this.ocmh = ocmh;
-    this.timeSource = ocmh.cloudManager.getTimeSource();
-  }
-
-  @Override
-  public void call(ClusterState state, ZkNodeProps message, NamedList results) throws Exception {
-    moveReplica(ocmh.zkStateReader.getClusterState(), message, results);
-  }
-
-  private void moveReplica(ClusterState clusterState, ZkNodeProps message, NamedList results) throws Exception {
-    log.debug("moveReplica() : {}", Utils.toJSONString(message));
-    ocmh.checkRequired(message, COLLECTION_PROP, CollectionParams.TARGET_NODE);
-    String collection = message.getStr(COLLECTION_PROP);
-    String targetNode = message.getStr(CollectionParams.TARGET_NODE);
-    boolean waitForFinalState = message.getBool(WAIT_FOR_FINAL_STATE, false);
-    boolean inPlaceMove = message.getBool(IN_PLACE_MOVE, true);
-    int timeout = message.getInt(TIMEOUT, 10 * 60); // 10 minutes
-
-    String async = message.getStr(ASYNC);
-
-    DocCollection coll = clusterState.getCollection(collection);
-    if (coll == null) {
-      throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Collection: " + collection + " does not exist");
-    }
-    if (!clusterState.getLiveNodes().contains(targetNode)) {
-      throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Target node: " + targetNode + " not in live nodes: " + clusterState.getLiveNodes());
-    }
-    Replica replica = null;
-    if (message.containsKey(REPLICA_PROP)) {
-      String replicaName = message.getStr(REPLICA_PROP);
-      replica = coll.getReplica(replicaName);
-      if (replica == null) {
-        throw new SolrException(SolrException.ErrorCode.BAD_REQUEST,
-            "Collection: " + collection + " replica: " + replicaName + " does not exist");
-      }
-    } else {
-      String sourceNode = message.getStr(CollectionParams.SOURCE_NODE, message.getStr(CollectionParams.FROM_NODE));
-      if (sourceNode == null) {
-        throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "'" + CollectionParams.SOURCE_NODE +
-            " or '" + CollectionParams.FROM_NODE + "' is a required param");
-      }
-      String shardId = message.getStr(SHARD_ID_PROP);
-      if (shardId == null) {
-        throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "'" + SHARD_ID_PROP + "' is a required param");
-      }
-      Slice slice = clusterState.getCollection(collection).getSlice(shardId);
-      List<Replica> sliceReplicas = new ArrayList<>(slice.getReplicas());
-      Collections.shuffle(sliceReplicas, RANDOM);
-      // this picks up a single random replica from the sourceNode
-      for (Replica r : slice.getReplicas()) {
-        if (r.getNodeName().equals(sourceNode)) {
-          replica = r;
-        }
-      }
-      if (replica == null) {
-        throw new SolrException(SolrException.ErrorCode.BAD_REQUEST,
-            "Collection: " + collection + " node: " + sourceNode + " does not have any replica belonging to shard: " + shardId);
-      }
-    }
-
-    log.info("Replica will be moved to node {}: {}", targetNode, replica);
-    Slice slice = null;
-    for (Slice s : coll.getSlices()) {
-      if (s.getReplicas().contains(replica)) {
-        slice = s;
-      }
-    }
-    assert slice != null;
-    Object dataDir = replica.get("dataDir");
-    boolean isSharedFS = replica.getBool(ZkStateReader.SHARED_STORAGE_PROP, false) && dataDir != null;
-
-    if (isSharedFS && inPlaceMove) {
-      log.debug("-- moveHdfsReplica");
-      moveHdfsReplica(clusterState, results, dataDir.toString(), targetNode, async, coll, replica, slice, timeout, waitForFinalState);
-    } else {
-      log.debug("-- moveNormalReplica (inPlaceMove=" + inPlaceMove + ", isSharedFS=" + isSharedFS);
-      moveNormalReplica(clusterState, results, targetNode, async, coll, replica, slice, timeout, waitForFinalState);
-    }
-  }
-
-  private void moveHdfsReplica(ClusterState clusterState, NamedList results, String dataDir, String targetNode, String async,
-                                 DocCollection coll, Replica replica, Slice slice, int timeout, boolean waitForFinalState) throws Exception {
-    String skipCreateReplicaInClusterState = "true";
-    if (clusterState.getLiveNodes().contains(replica.getNodeName())) {
-      skipCreateReplicaInClusterState = "false";
-      ZkNodeProps removeReplicasProps = new ZkNodeProps(
-          COLLECTION_PROP, coll.getName(),
-          SHARD_ID_PROP, slice.getName(),
-          REPLICA_PROP, replica.getName()
-      );
-      removeReplicasProps.getProperties().put(CoreAdminParams.DELETE_DATA_DIR, false);
-      removeReplicasProps.getProperties().put(CoreAdminParams.DELETE_INDEX, false);
-      if(async!=null) removeReplicasProps.getProperties().put(ASYNC, async);
-      NamedList deleteResult = new NamedList();
-      ocmh.deleteReplica(clusterState, removeReplicasProps, deleteResult, null);
-      if (deleteResult.get("failure") != null) {
-        String errorString = String.format(Locale.ROOT, "Failed to cleanup replica collection=%s shard=%s name=%s, failure=%s",
-            coll.getName(), slice.getName(), replica.getName(), deleteResult.get("failure"));
-        log.warn(errorString);
-        results.add("failure", errorString);
-        return;
-      }
-
-      TimeOut timeOut = new TimeOut(20L, TimeUnit.SECONDS, timeSource);
-      while (!timeOut.hasTimedOut()) {
-        coll = ocmh.zkStateReader.getClusterState().getCollection(coll.getName());
-        if (coll.getReplica(replica.getName()) != null) {
-          timeOut.sleep(100);
-        } else {
-          break;
-        }
-      }
-      if (timeOut.hasTimedOut()) {
-        results.add("failure", "Still see deleted replica in clusterstate!");
-        return;
-      }
-
-    }
-
-    String ulogDir = replica.getStr(CoreAdminParams.ULOG_DIR);
-    ZkNodeProps addReplicasProps = new ZkNodeProps(
-        COLLECTION_PROP, coll.getName(),
-        SHARD_ID_PROP, slice.getName(),
-        CoreAdminParams.NODE, targetNode,
-        CoreAdminParams.CORE_NODE_NAME, replica.getName(),
-        CoreAdminParams.NAME, replica.getCoreName(),
-        WAIT_FOR_FINAL_STATE, String.valueOf(waitForFinalState),
-        SKIP_CREATE_REPLICA_IN_CLUSTER_STATE, skipCreateReplicaInClusterState,
-        CoreAdminParams.ULOG_DIR, ulogDir.substring(0, ulogDir.lastIndexOf(UpdateLog.TLOG_NAME)),
-        CoreAdminParams.DATA_DIR, dataDir);
-    if(async!=null) addReplicasProps.getProperties().put(ASYNC, async);
-    NamedList addResult = new NamedList();
-    try {
-      ocmh.addReplica(ocmh.zkStateReader.getClusterState(), addReplicasProps, addResult, null);
-    } catch (Exception e) {
-      // fatal error - try rolling back
-      String errorString = String.format(Locale.ROOT, "Failed to create replica for collection=%s shard=%s" +
-          " on node=%s, failure=%s", coll.getName(), slice.getName(), targetNode, addResult.get("failure"));
-      results.add("failure", errorString);
-      log.warn("Error adding replica " + addReplicasProps + " - trying to roll back...", e);
-      addReplicasProps = addReplicasProps.plus(CoreAdminParams.NODE, replica.getNodeName());
-      NamedList rollback = new NamedList();
-      ocmh.addReplica(ocmh.zkStateReader.getClusterState(), addReplicasProps, rollback, null);
-      if (rollback.get("failure") != null) {
-        throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Fatal error during MOVEREPLICA of " + replica
-            + ", collection may be inconsistent: " + rollback.get("failure"));
-      }
-      return;
-    }
-    if (addResult.get("failure") != null) {
-      String errorString = String.format(Locale.ROOT, "Failed to create replica for collection=%s shard=%s" +
-          " on node=%s, failure=%s", coll.getName(), slice.getName(), targetNode, addResult.get("failure"));
-      log.warn(errorString);
-      results.add("failure", errorString);
-      log.debug("--- trying to roll back...");
-      // try to roll back
-      addReplicasProps = addReplicasProps.plus(CoreAdminParams.NODE, replica.getNodeName());
-      NamedList rollback = new NamedList();
-      try {
-        ocmh.addReplica(ocmh.zkStateReader.getClusterState(), addReplicasProps, rollback, null);
-      } catch (Exception e) {
-        throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Fatal error during MOVEREPLICA of " + replica
-            + ", collection may be inconsistent!", e);
-      }
-      if (rollback.get("failure") != null) {
-        throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Fatal error during MOVEREPLICA of " + replica
-            + ", collection may be inconsistent! Failure: " + rollback.get("failure"));
-      }
-      return;
-    } else {
-      String successString = String.format(Locale.ROOT, "MOVEREPLICA action completed successfully, moved replica=%s at node=%s " +
-          "to replica=%s at node=%s", replica.getCoreName(), replica.getNodeName(), replica.getCoreName(), targetNode);
-      results.add("success", successString);
-    }
-  }
-
-  private void moveNormalReplica(ClusterState clusterState, NamedList results, String targetNode, String async,
-                                 DocCollection coll, Replica replica, Slice slice, int timeout, boolean waitForFinalState) throws Exception {
-    String newCoreName = Assign.buildSolrCoreName(ocmh.overseer.getSolrCloudManager().getDistribStateManager(), coll, slice.getName(), replica.getType());
-    ZkNodeProps addReplicasProps = new ZkNodeProps(
-        COLLECTION_PROP, coll.getName(),
-        SHARD_ID_PROP, slice.getName(),
-        CoreAdminParams.NODE, targetNode,
-        CoreAdminParams.NAME, newCoreName);
-    if (async != null) addReplicasProps.getProperties().put(ASYNC, async);
-    NamedList addResult = new NamedList();
-    SolrCloseableLatch countDownLatch = new SolrCloseableLatch(1, ocmh);
-    ActiveReplicaWatcher watcher = null;
-    ZkNodeProps props = ocmh.addReplica(clusterState, addReplicasProps, addResult, null);
-    log.debug("props " + props);
-    if (replica.equals(slice.getLeader()) || waitForFinalState) {
-      watcher = new ActiveReplicaWatcher(coll.getName(), null, Collections.singletonList(newCoreName), countDownLatch);
-      log.debug("-- registered watcher " + watcher);
-      ocmh.zkStateReader.registerCollectionStateWatcher(coll.getName(), watcher);
-    }
-    if (addResult.get("failure") != null) {
-      String errorString = String.format(Locale.ROOT, "Failed to create replica for collection=%s shard=%s" +
-          " on node=%s, failure=", coll.getName(), slice.getName(), targetNode, addResult.get("failure"));
-      log.warn(errorString);
-      results.add("failure", errorString);
-      if (watcher != null) { // unregister
-        ocmh.zkStateReader.removeCollectionStateWatcher(coll.getName(), watcher);
-      }
-      return;
-    }
-    // wait for the other replica to be active if the source replica was a leader
-    if (watcher != null) {
-      try {
-        log.debug("Waiting for leader's replica to recover.");
-        if (!countDownLatch.await(timeout, TimeUnit.SECONDS)) {
-          String errorString = String.format(Locale.ROOT, "Timed out waiting for leader's replica to recover, collection=%s shard=%s" +
-              " on node=%s", coll.getName(), slice.getName(), targetNode);
-          log.warn(errorString);
-          results.add("failure", errorString);
-          return;
-        } else {
-          log.debug("Replica " + watcher.getActiveReplicas() + " is active - deleting the source...");
-        }
-      } finally {
-        ocmh.zkStateReader.removeCollectionStateWatcher(coll.getName(), watcher);
-      }
-    }
-
-    ZkNodeProps removeReplicasProps = new ZkNodeProps(
-        COLLECTION_PROP, coll.getName(),
-        SHARD_ID_PROP, slice.getName(),
-        REPLICA_PROP, replica.getName());
-    if (async != null) removeReplicasProps.getProperties().put(ASYNC, async);
-    NamedList deleteResult = new NamedList();
-    ocmh.deleteReplica(clusterState, removeReplicasProps, deleteResult, null);
-    if (deleteResult.get("failure") != null) {
-      String errorString = String.format(Locale.ROOT, "Failed to cleanup replica collection=%s shard=%s name=%s, failure=%s",
-          coll.getName(), slice.getName(), replica.getName(), deleteResult.get("failure"));
-      log.warn(errorString);
-      results.add("failure", errorString);
-    } else {
-      String successString = String.format(Locale.ROOT, "MOVEREPLICA action completed successfully, moved replica=%s at node=%s " +
-          "to replica=%s at node=%s", replica.getCoreName(), replica.getNodeName(), newCoreName, targetNode);
-      results.add("success", successString);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/a3c4f738/solr/core/src/java/org/apache/solr/cloud/Overseer.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/Overseer.java b/solr/core/src/java/org/apache/solr/cloud/Overseer.java
index ee5fb18..edf3838 100644
--- a/solr/core/src/java/org/apache/solr/cloud/Overseer.java
+++ b/solr/core/src/java/org/apache/solr/cloud/Overseer.java
@@ -29,6 +29,7 @@ import java.util.Set;
 import com.codahale.metrics.Timer;
 import org.apache.solr.client.solrj.cloud.autoscaling.SolrCloudManager;
 import org.apache.solr.client.solrj.impl.ClusterStateProvider;
+import org.apache.solr.cloud.api.collections.OverseerCollectionMessageHandler;
 import org.apache.solr.cloud.autoscaling.OverseerTriggerThread;
 import org.apache.solr.cloud.overseer.ClusterStateMutator;
 import org.apache.solr.cloud.overseer.CollectionMutator;

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/a3c4f738/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionConfigSetProcessor.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionConfigSetProcessor.java b/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionConfigSetProcessor.java
index 570843a..e8d85ce 100644
--- a/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionConfigSetProcessor.java
+++ b/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionConfigSetProcessor.java
@@ -19,6 +19,7 @@ package org.apache.solr.cloud;
 import java.io.IOException;
 
 import org.apache.commons.io.IOUtils;
+import org.apache.solr.cloud.api.collections.OverseerCollectionMessageHandler;
 import org.apache.solr.common.cloud.ZkNodeProps;
 import org.apache.solr.common.cloud.ZkStateReader;
 import org.apache.solr.handler.component.ShardHandler;