You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by da...@apache.org on 2018/01/23 10:30:51 UTC

[21/41] lucene-solr:jira/solr-11702: SOLR-11817: Move Collections API classes to it's own package

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/a3c4f738/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionMessageHandler.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionMessageHandler.java b/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionMessageHandler.java
deleted file mode 100644
index 426c879..0000000
--- a/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionMessageHandler.java
+++ /dev/null
@@ -1,1003 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.solr.cloud;
-
-import java.io.IOException;
-import java.lang.invoke.MethodHandles;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Random;
-import java.util.Set;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.SynchronousQueue;
-import java.util.concurrent.TimeUnit;
-
-import com.google.common.collect.ImmutableMap;
-import org.apache.commons.lang.StringUtils;
-import org.apache.solr.client.solrj.SolrResponse;
-import org.apache.solr.client.solrj.SolrServerException;
-import org.apache.solr.client.solrj.cloud.DistributedQueue;
-import org.apache.solr.client.solrj.cloud.autoscaling.AlreadyExistsException;
-import org.apache.solr.client.solrj.cloud.autoscaling.BadVersionException;
-import org.apache.solr.client.solrj.cloud.autoscaling.DistribStateManager;
-import org.apache.solr.client.solrj.cloud.autoscaling.SolrCloudManager;
-import org.apache.solr.client.solrj.impl.HttpSolrClient;
-import org.apache.solr.client.solrj.impl.HttpSolrClient.RemoteSolrException;
-import org.apache.solr.client.solrj.request.AbstractUpdateRequest;
-import org.apache.solr.client.solrj.request.UpdateRequest;
-import org.apache.solr.client.solrj.response.UpdateResponse;
-import org.apache.solr.cloud.overseer.OverseerAction;
-import org.apache.solr.common.SolrCloseable;
-import org.apache.solr.common.SolrException;
-import org.apache.solr.common.SolrException.ErrorCode;
-import org.apache.solr.common.cloud.ClusterState;
-import org.apache.solr.common.cloud.DocCollection;
-import org.apache.solr.common.cloud.DocRouter;
-import org.apache.solr.common.cloud.Replica;
-import org.apache.solr.common.cloud.Slice;
-import org.apache.solr.common.cloud.SolrZkClient;
-import org.apache.solr.common.cloud.ZkConfigManager;
-import org.apache.solr.common.cloud.ZkCoreNodeProps;
-import org.apache.solr.common.cloud.ZkNodeProps;
-import org.apache.solr.common.cloud.ZkStateReader;
-import org.apache.solr.common.params.CollectionAdminParams;
-import org.apache.solr.common.params.CollectionParams.CollectionAction;
-import org.apache.solr.common.params.CoreAdminParams;
-import org.apache.solr.common.params.CoreAdminParams.CoreAdminAction;
-import org.apache.solr.common.params.ModifiableSolrParams;
-import org.apache.solr.common.util.ExecutorUtil;
-import org.apache.solr.common.util.NamedList;
-import org.apache.solr.common.util.SimpleOrderedMap;
-import org.apache.solr.common.util.StrUtils;
-import org.apache.solr.common.util.SuppressForbidden;
-import org.apache.solr.common.util.TimeSource;
-import org.apache.solr.common.util.Utils;
-import org.apache.solr.handler.component.ShardHandler;
-import org.apache.solr.handler.component.ShardHandlerFactory;
-import org.apache.solr.handler.component.ShardRequest;
-import org.apache.solr.handler.component.ShardResponse;
-import org.apache.solr.util.DefaultSolrThreadFactory;
-import org.apache.solr.util.RTimer;
-import org.apache.solr.util.TimeOut;
-import org.apache.zookeeper.CreateMode;
-import org.apache.zookeeper.KeeperException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import static org.apache.solr.client.solrj.cloud.autoscaling.Policy.POLICY;
-import static org.apache.solr.common.cloud.DocCollection.SNITCH;
-import static org.apache.solr.common.cloud.ZkStateReader.BASE_URL_PROP;
-import static org.apache.solr.common.cloud.ZkStateReader.COLLECTION_PROP;
-import static org.apache.solr.common.cloud.ZkStateReader.CORE_NAME_PROP;
-import static org.apache.solr.common.cloud.ZkStateReader.CORE_NODE_NAME_PROP;
-import static org.apache.solr.common.cloud.ZkStateReader.ELECTION_NODE_PROP;
-import static org.apache.solr.common.cloud.ZkStateReader.PROPERTY_PROP;
-import static org.apache.solr.common.cloud.ZkStateReader.PROPERTY_VALUE_PROP;
-import static org.apache.solr.common.cloud.ZkStateReader.REJOIN_AT_HEAD_PROP;
-import static org.apache.solr.common.cloud.ZkStateReader.REPLICA_PROP;
-import static org.apache.solr.common.cloud.ZkStateReader.SHARD_ID_PROP;
-import static org.apache.solr.common.params.CollectionParams.CollectionAction.*;
-import static org.apache.solr.common.params.CommonAdminParams.ASYNC;
-import static org.apache.solr.common.params.CommonParams.NAME;
-import static org.apache.solr.common.util.Utils.makeMap;
-
-/**
- * A {@link OverseerMessageHandler} that handles Collections API related
- * overseer messages.
- */
-public class OverseerCollectionMessageHandler implements OverseerMessageHandler, SolrCloseable {
-
-  public static final String NUM_SLICES = "numShards";
-
-  static final boolean CREATE_NODE_SET_SHUFFLE_DEFAULT = true;
-  public static final String CREATE_NODE_SET_SHUFFLE = CollectionAdminParams.CREATE_NODE_SET_SHUFFLE_PARAM;
-  public static final String CREATE_NODE_SET_EMPTY = "EMPTY";
-  public static final String CREATE_NODE_SET = CollectionAdminParams.CREATE_NODE_SET_PARAM;
-
-  public static final String ROUTER = "router";
-
-  public static final String SHARDS_PROP = "shards";
-
-  public static final String REQUESTID = "requestid";
-
-  public static final String COLL_CONF = "collection.configName";
-
-  public static final String COLL_PROP_PREFIX = "property.";
-
-  public static final String ONLY_IF_DOWN = "onlyIfDown";
-
-  public static final String SHARD_UNIQUE = "shardUnique";
-
-  public static final String ONLY_ACTIVE_NODES = "onlyactivenodes";
-
-  static final String SKIP_CREATE_REPLICA_IN_CLUSTER_STATE = "skipCreateReplicaInClusterState";
-
-  public static final Map<String, Object> COLL_PROPS = Collections.unmodifiableMap(makeMap(
-      ROUTER, DocRouter.DEFAULT_NAME,
-      ZkStateReader.REPLICATION_FACTOR, "1",
-      ZkStateReader.NRT_REPLICAS, "1",
-      ZkStateReader.TLOG_REPLICAS, "0",
-      ZkStateReader.PULL_REPLICAS, "0",
-      ZkStateReader.MAX_SHARDS_PER_NODE, "1",
-      ZkStateReader.AUTO_ADD_REPLICAS, "false",
-      DocCollection.RULE, null,
-      POLICY, null,
-      SNITCH, null));
-
-  private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
-
-  Overseer overseer;
-  ShardHandlerFactory shardHandlerFactory;
-  String adminPath;
-  ZkStateReader zkStateReader;
-  SolrCloudManager cloudManager;
-  String myId;
-  Stats stats;
-  TimeSource timeSource;
-
-  // Set that tracks collections that are currently being processed by a running task.
-  // This is used for handling mutual exclusion of the tasks.
-
-  final private LockTree lockTree = new LockTree();
-  ExecutorService tpe = new ExecutorUtil.MDCAwareThreadPoolExecutor(5, 10, 0L, TimeUnit.MILLISECONDS,
-      new SynchronousQueue<>(),
-      new DefaultSolrThreadFactory("OverseerCollectionMessageHandlerThreadFactory"));
-
-  static final Random RANDOM;
-  static {
-    // We try to make things reproducible in the context of our tests by initializing the random instance
-    // based on the current seed
-    String seed = System.getProperty("tests.seed");
-    if (seed == null) {
-      RANDOM = new Random();
-    } else {
-      RANDOM = new Random(seed.hashCode());
-    }
-  }
-
-  final Map<CollectionAction, Cmd> commandMap;
-
-  private volatile boolean isClosed;
-
-  public OverseerCollectionMessageHandler(ZkStateReader zkStateReader, String myId,
-                                        final ShardHandlerFactory shardHandlerFactory,
-                                        String adminPath,
-                                        Stats stats,
-                                        Overseer overseer,
-                                        OverseerNodePrioritizer overseerPrioritizer) {
-    this.zkStateReader = zkStateReader;
-    this.shardHandlerFactory = shardHandlerFactory;
-    this.adminPath = adminPath;
-    this.myId = myId;
-    this.stats = stats;
-    this.overseer = overseer;
-    this.cloudManager = overseer.getSolrCloudManager();
-    this.timeSource = cloudManager.getTimeSource();
-    this.isClosed = false;
-    commandMap = new ImmutableMap.Builder<CollectionAction, Cmd>()
-        .put(REPLACENODE, new ReplaceNodeCmd(this))
-        .put(DELETENODE, new DeleteNodeCmd(this))
-        .put(BACKUP, new BackupCmd(this))
-        .put(RESTORE, new RestoreCmd(this))
-        .put(CREATESNAPSHOT, new CreateSnapshotCmd(this))
-        .put(DELETESNAPSHOT, new DeleteSnapshotCmd(this))
-        .put(SPLITSHARD, new SplitShardCmd(this))
-        .put(ADDROLE, new OverseerRoleCmd(this, ADDROLE, overseerPrioritizer))
-        .put(REMOVEROLE, new OverseerRoleCmd(this, REMOVEROLE, overseerPrioritizer))
-        .put(MOCK_COLL_TASK, this::mockOperation)
-        .put(MOCK_SHARD_TASK, this::mockOperation)
-        .put(MOCK_REPLICA_TASK, this::mockOperation)
-        .put(MIGRATESTATEFORMAT, this::migrateStateFormat)
-        .put(CREATESHARD, new CreateShardCmd(this))
-        .put(MIGRATE, new MigrateCmd(this))
-        .put(CREATE, new CreateCollectionCmd(this))
-        .put(MODIFYCOLLECTION, this::modifyCollection)
-        .put(ADDREPLICAPROP, this::processReplicaAddPropertyCommand)
-        .put(DELETEREPLICAPROP, this::processReplicaDeletePropertyCommand)
-        .put(BALANCESHARDUNIQUE, this::balanceProperty)
-        .put(REBALANCELEADERS, this::processRebalanceLeaders)
-        .put(RELOAD, this::reloadCollection)
-        .put(DELETE, new DeleteCollectionCmd(this))
-        .put(CREATEALIAS, new CreateAliasCmd(this))
-        .put(DELETEALIAS, new DeleteAliasCmd(this))
-        .put(ROUTEDALIAS_CREATECOLL, new RoutedAliasCreateCollectionCmd(this))
-        .put(OVERSEERSTATUS, new OverseerStatusCmd(this))
-        .put(DELETESHARD, new DeleteShardCmd(this))
-        .put(DELETEREPLICA, new DeleteReplicaCmd(this))
-        .put(ADDREPLICA, new AddReplicaCmd(this))
-        .put(MOVEREPLICA, new MoveReplicaCmd(this))
-        .put(UTILIZENODE, new UtilizeNodeCmd(this))
-        .build()
-    ;
-  }
-
-  @Override
-  @SuppressWarnings("unchecked")
-  public SolrResponse processMessage(ZkNodeProps message, String operation) {
-    log.debug("OverseerCollectionMessageHandler.processMessage : {} , {}", operation, message);
-
-    NamedList results = new NamedList();
-    try {
-      CollectionAction action = getCollectionAction(operation);
-      Cmd command = commandMap.get(action);
-      if (command != null) {
-        command.call(cloudManager.getClusterStateProvider().getClusterState(), message, results);
-      } else {
-        throw new SolrException(ErrorCode.BAD_REQUEST, "Unknown operation:"
-            + operation);
-      }
-    } catch (Exception e) {
-      String collName = message.getStr("collection");
-      if (collName == null) collName = message.getStr(NAME);
-
-      if (collName == null) {
-        SolrException.log(log, "Operation " + operation + " failed", e);
-      } else  {
-        SolrException.log(log, "Collection: " + collName + " operation: " + operation
-            + " failed", e);
-      }
-
-      results.add("Operation " + operation + " caused exception:", e);
-      SimpleOrderedMap nl = new SimpleOrderedMap();
-      nl.add("msg", e.getMessage());
-      nl.add("rspCode", e instanceof SolrException ? ((SolrException)e).code() : -1);
-      results.add("exception", nl);
-    }
-    return new OverseerSolrResponse(results);
-  }
-
-  @SuppressForbidden(reason = "Needs currentTimeMillis for mock requests")
-  private void mockOperation(ClusterState state, ZkNodeProps message, NamedList results) throws InterruptedException {
-    //only for test purposes
-    Thread.sleep(message.getInt("sleep", 1));
-    log.info("MOCK_TASK_EXECUTED time {} data {}", System.currentTimeMillis(), Utils.toJSONString(message));
-    results.add("MOCK_FINISHED", System.currentTimeMillis());
-  }
-
-  private CollectionAction getCollectionAction(String operation) {
-    CollectionAction action = CollectionAction.get(operation);
-    if (action == null) {
-      throw new SolrException(ErrorCode.BAD_REQUEST, "Unknown operation:" + operation);
-    }
-    return action;
-  }
-
-  private void reloadCollection(ClusterState clusterState, ZkNodeProps message, NamedList results) {
-    ModifiableSolrParams params = new ModifiableSolrParams();
-    params.set(CoreAdminParams.ACTION, CoreAdminAction.RELOAD.toString());
-
-    String asyncId = message.getStr(ASYNC);
-    Map<String, String> requestMap = null;
-    if (asyncId != null) {
-      requestMap = new HashMap<>();
-    }
-    collectionCmd(message, params, results, Replica.State.ACTIVE, asyncId, requestMap);
-  }
-
-  @SuppressWarnings("unchecked")
-  private void processRebalanceLeaders(ClusterState clusterState, ZkNodeProps message, NamedList results)
-      throws Exception {
-    checkRequired(message, COLLECTION_PROP, SHARD_ID_PROP, CORE_NAME_PROP, ELECTION_NODE_PROP,
-        CORE_NODE_NAME_PROP, BASE_URL_PROP, REJOIN_AT_HEAD_PROP);
-
-    ModifiableSolrParams params = new ModifiableSolrParams();
-    params.set(COLLECTION_PROP, message.getStr(COLLECTION_PROP));
-    params.set(SHARD_ID_PROP, message.getStr(SHARD_ID_PROP));
-    params.set(REJOIN_AT_HEAD_PROP, message.getStr(REJOIN_AT_HEAD_PROP));
-    params.set(CoreAdminParams.ACTION, CoreAdminAction.REJOINLEADERELECTION.toString());
-    params.set(CORE_NAME_PROP, message.getStr(CORE_NAME_PROP));
-    params.set(CORE_NODE_NAME_PROP, message.getStr(CORE_NODE_NAME_PROP));
-    params.set(ELECTION_NODE_PROP, message.getStr(ELECTION_NODE_PROP));
-    params.set(BASE_URL_PROP, message.getStr(BASE_URL_PROP));
-
-    String baseUrl = message.getStr(BASE_URL_PROP);
-    ShardRequest sreq = new ShardRequest();
-    sreq.nodeName = message.getStr(ZkStateReader.CORE_NAME_PROP);
-    // yes, they must use same admin handler path everywhere...
-    params.set("qt", adminPath);
-    sreq.purpose = ShardRequest.PURPOSE_PRIVATE;
-    sreq.shards = new String[] {baseUrl};
-    sreq.actualShards = sreq.shards;
-    sreq.params = params;
-    ShardHandler shardHandler = shardHandlerFactory.getShardHandler();
-    shardHandler.submit(sreq, baseUrl, sreq.params);
-  }
-
-  @SuppressWarnings("unchecked")
-  private void processReplicaAddPropertyCommand(ClusterState clusterState, ZkNodeProps message, NamedList results)
-      throws Exception {
-    checkRequired(message, COLLECTION_PROP, SHARD_ID_PROP, REPLICA_PROP, PROPERTY_PROP, PROPERTY_VALUE_PROP);
-    SolrZkClient zkClient = zkStateReader.getZkClient();
-    DistributedQueue inQueue = Overseer.getStateUpdateQueue(zkClient);
-    Map<String, Object> propMap = new HashMap<>();
-    propMap.put(Overseer.QUEUE_OPERATION, ADDREPLICAPROP.toLower());
-    propMap.putAll(message.getProperties());
-    ZkNodeProps m = new ZkNodeProps(propMap);
-    inQueue.offer(Utils.toJSON(m));
-  }
-
-  private void processReplicaDeletePropertyCommand(ClusterState clusterState, ZkNodeProps message, NamedList results)
-      throws Exception {
-    checkRequired(message, COLLECTION_PROP, SHARD_ID_PROP, REPLICA_PROP, PROPERTY_PROP);
-    SolrZkClient zkClient = zkStateReader.getZkClient();
-    DistributedQueue inQueue = Overseer.getStateUpdateQueue(zkClient);
-    Map<String, Object> propMap = new HashMap<>();
-    propMap.put(Overseer.QUEUE_OPERATION, DELETEREPLICAPROP.toLower());
-    propMap.putAll(message.getProperties());
-    ZkNodeProps m = new ZkNodeProps(propMap);
-    inQueue.offer(Utils.toJSON(m));
-  }
-
-  private void balanceProperty(ClusterState clusterState, ZkNodeProps message, NamedList results) throws Exception {
-    if (StringUtils.isBlank(message.getStr(COLLECTION_PROP)) || StringUtils.isBlank(message.getStr(PROPERTY_PROP))) {
-      throw new SolrException(ErrorCode.BAD_REQUEST,
-          "The '" + COLLECTION_PROP + "' and '" + PROPERTY_PROP +
-              "' parameters are required for the BALANCESHARDUNIQUE operation, no action taken");
-    }
-    SolrZkClient zkClient = zkStateReader.getZkClient();
-    DistributedQueue inQueue = Overseer.getStateUpdateQueue(zkClient);
-    Map<String, Object> propMap = new HashMap<>();
-    propMap.put(Overseer.QUEUE_OPERATION, BALANCESHARDUNIQUE.toLower());
-    propMap.putAll(message.getProperties());
-    inQueue.offer(Utils.toJSON(new ZkNodeProps(propMap)));
-  }
-
-  /**
-   * Walks the tree of collection status to verify that any replicas not reporting a "down" status is
-   * on a live node, if any replicas reporting their status as "active" but the node is not live is
-   * marked as "down"; used by CLUSTERSTATUS.
-   * @param liveNodes List of currently live node names.
-   * @param collectionProps Map of collection status information pulled directly from ZooKeeper.
-   */
-
-  @SuppressWarnings("unchecked")
-  protected void crossCheckReplicaStateWithLiveNodes(List<String> liveNodes, NamedList<Object> collectionProps) {
-    Iterator<Map.Entry<String,Object>> colls = collectionProps.iterator();
-    while (colls.hasNext()) {
-      Map.Entry<String,Object> next = colls.next();
-      Map<String,Object> collMap = (Map<String,Object>)next.getValue();
-      Map<String,Object> shards = (Map<String,Object>)collMap.get("shards");
-      for (Object nextShard : shards.values()) {
-        Map<String,Object> shardMap = (Map<String,Object>)nextShard;
-        Map<String,Object> replicas = (Map<String,Object>)shardMap.get("replicas");
-        for (Object nextReplica : replicas.values()) {
-          Map<String,Object> replicaMap = (Map<String,Object>)nextReplica;
-          if (Replica.State.getState((String) replicaMap.get(ZkStateReader.STATE_PROP)) != Replica.State.DOWN) {
-            // not down, so verify the node is live
-            String node_name = (String)replicaMap.get(ZkStateReader.NODE_NAME_PROP);
-            if (!liveNodes.contains(node_name)) {
-              // node is not live, so this replica is actually down
-              replicaMap.put(ZkStateReader.STATE_PROP, Replica.State.DOWN.toString());
-            }
-          }
-        }
-      }
-    }
-  }
-
-  /**
-   * Get collection status from cluster state.
-   * Can return collection status by given shard name.
-   *
-   *
-   * @param collection collection map parsed from JSON-serialized {@link ClusterState}
-   * @param name  collection name
-   * @param requestedShards a set of shards to be returned in the status.
-   *                        An empty or null values indicates <b>all</b> shards.
-   * @return map of collection properties
-   */
-  @SuppressWarnings("unchecked")
-  private Map<String, Object> getCollectionStatus(Map<String, Object> collection, String name, Set<String> requestedShards) {
-    if (collection == null)  {
-      throw new SolrException(ErrorCode.BAD_REQUEST, "Collection: " + name + " not found");
-    }
-    if (requestedShards == null || requestedShards.isEmpty()) {
-      return collection;
-    } else {
-      Map<String, Object> shards = (Map<String, Object>) collection.get("shards");
-      Map<String, Object>  selected = new HashMap<>();
-      for (String selectedShard : requestedShards) {
-        if (!shards.containsKey(selectedShard)) {
-          throw new SolrException(ErrorCode.BAD_REQUEST, "Collection: " + name + " shard: " + selectedShard + " not found");
-        }
-        selected.put(selectedShard, shards.get(selectedShard));
-        collection.put("shards", selected);
-      }
-      return collection;
-    }
-  }
-
-  @SuppressWarnings("unchecked")
-  void deleteReplica(ClusterState clusterState, ZkNodeProps message, NamedList results, Runnable onComplete)
-      throws Exception {
-    ((DeleteReplicaCmd) commandMap.get(DELETEREPLICA)).deleteReplica(clusterState, message, results, onComplete);
-
-  }
-
-  boolean waitForCoreNodeGone(String collectionName, String shard, String replicaName, int timeoutms) throws InterruptedException {
-    TimeOut timeout = new TimeOut(timeoutms, TimeUnit.MILLISECONDS, timeSource);
-    while (! timeout.hasTimedOut()) {
-      timeout.sleep(100);
-      DocCollection docCollection = zkStateReader.getClusterState().getCollection(collectionName);
-      if (docCollection == null) { // someone already deleted the collection
-        return true;
-      }
-      Slice slice = docCollection.getSlice(shard);
-      if(slice == null || slice.getReplica(replicaName) == null) {
-        return true;
-      }
-    }
-    // replica still exists after the timeout
-    return false;
-  }
-
-  void deleteCoreNode(String collectionName, String replicaName, Replica replica, String core) throws Exception {
-    ZkNodeProps m = new ZkNodeProps(
-        Overseer.QUEUE_OPERATION, OverseerAction.DELETECORE.toLower(),
-        ZkStateReader.CORE_NAME_PROP, core,
-        ZkStateReader.NODE_NAME_PROP, replica.getStr(ZkStateReader.NODE_NAME_PROP),
-        ZkStateReader.COLLECTION_PROP, collectionName,
-        ZkStateReader.CORE_NODE_NAME_PROP, replicaName,
-        ZkStateReader.BASE_URL_PROP, replica.getStr(ZkStateReader.BASE_URL_PROP));
-    Overseer.getStateUpdateQueue(zkStateReader.getZkClient()).offer(Utils.toJSON(m));
-  }
-
-  void checkRequired(ZkNodeProps message, String... props) {
-    for (String prop : props) {
-      if(message.get(prop) == null){
-        throw new SolrException(ErrorCode.BAD_REQUEST, StrUtils.join(Arrays.asList(props),',') +" are required params" );
-      }
-    }
-
-  }
-
-  //TODO should we not remove in the next release ?
-  private void migrateStateFormat(ClusterState state, ZkNodeProps message, NamedList results) throws Exception {
-    final String collectionName = message.getStr(COLLECTION_PROP);
-
-    boolean firstLoop = true;
-    // wait for a while until the state format changes
-    TimeOut timeout = new TimeOut(30, TimeUnit.SECONDS, timeSource);
-    while (! timeout.hasTimedOut()) {
-      DocCollection collection = zkStateReader.getClusterState().getCollection(collectionName);
-      if (collection == null) {
-        throw new SolrException(ErrorCode.BAD_REQUEST, "Collection: " + collectionName + " not found");
-      }
-      if (collection.getStateFormat() == 2) {
-        // Done.
-        results.add("success", new SimpleOrderedMap<>());
-        return;
-      }
-
-      if (firstLoop) {
-        // Actually queue the migration command.
-        firstLoop = false;
-        ZkNodeProps m = new ZkNodeProps(Overseer.QUEUE_OPERATION, MIGRATESTATEFORMAT.toLower(), COLLECTION_PROP, collectionName);
-        Overseer.getStateUpdateQueue(zkStateReader.getZkClient()).offer(Utils.toJSON(m));
-      }
-      timeout.sleep(100);
-    }
-    throw new SolrException(ErrorCode.SERVER_ERROR, "Could not migrate state format for collection: " + collectionName);
-  }
-
-  void commit(NamedList results, String slice, Replica parentShardLeader) {
-    log.debug("Calling soft commit to make sub shard updates visible");
-    String coreUrl = new ZkCoreNodeProps(parentShardLeader).getCoreUrl();
-    // HttpShardHandler is hard coded to send a QueryRequest hence we go direct
-    // and we force open a searcher so that we have documents to show upon switching states
-    UpdateResponse updateResponse = null;
-    try {
-      updateResponse = softCommit(coreUrl);
-      processResponse(results, null, coreUrl, updateResponse, slice, Collections.emptySet());
-    } catch (Exception e) {
-      processResponse(results, e, coreUrl, updateResponse, slice, Collections.emptySet());
-      throw new SolrException(ErrorCode.SERVER_ERROR, "Unable to call distrib softCommit on: " + coreUrl, e);
-    }
-  }
-
-
-  static UpdateResponse softCommit(String url) throws SolrServerException, IOException {
-
-    try (HttpSolrClient client = new HttpSolrClient.Builder(url)
-        .withConnectionTimeout(30000)
-        .withSocketTimeout(120000)
-        .build()) {
-      UpdateRequest ureq = new UpdateRequest();
-      ureq.setParams(new ModifiableSolrParams());
-      ureq.setAction(AbstractUpdateRequest.ACTION.COMMIT, false, true, true);
-      return ureq.process(client);
-    }
-  }
-
-  String waitForCoreNodeName(String collectionName, String msgNodeName, String msgCore) {
-    int retryCount = 320;
-    while (retryCount-- > 0) {
-      final DocCollection docCollection = zkStateReader.getClusterState().getCollectionOrNull(collectionName);
-      if (docCollection != null && docCollection.getSlicesMap() != null) {
-        Map<String,Slice> slicesMap = docCollection.getSlicesMap();
-        for (Slice slice : slicesMap.values()) {
-          for (Replica replica : slice.getReplicas()) {
-            // TODO: for really large clusters, we could 'index' on this
-
-            String nodeName = replica.getStr(ZkStateReader.NODE_NAME_PROP);
-            String core = replica.getStr(ZkStateReader.CORE_NAME_PROP);
-
-            if (nodeName.equals(msgNodeName) && core.equals(msgCore)) {
-              return replica.getName();
-            }
-          }
-        }
-      }
-      try {
-        Thread.sleep(1000);
-      } catch (InterruptedException e) {
-        Thread.currentThread().interrupt();
-      }
-    }
-    throw new SolrException(ErrorCode.SERVER_ERROR, "Could not find coreNodeName");
-  }
-
-  void waitForNewShard(String collectionName, String sliceName) throws KeeperException, InterruptedException {
-    log.debug("Waiting for slice {} of collection {} to be available", sliceName, collectionName);
-    RTimer timer = new RTimer();
-    int retryCount = 320;
-    while (retryCount-- > 0) {
-      DocCollection collection = zkStateReader.getClusterState().getCollection(collectionName);
-      if (collection == null) {
-        throw new SolrException(ErrorCode.SERVER_ERROR,
-            "Unable to find collection: " + collectionName + " in clusterstate");
-      }
-      Slice slice = collection.getSlice(sliceName);
-      if (slice != null) {
-        log.debug("Waited for {}ms for slice {} of collection {} to be available",
-            timer.getTime(), sliceName, collectionName);
-        return;
-      }
-      Thread.sleep(1000);
-    }
-    throw new SolrException(ErrorCode.SERVER_ERROR,
-        "Could not find new slice " + sliceName + " in collection " + collectionName
-            + " even after waiting for " + timer.getTime() + "ms"
-    );
-  }
-
-  DocRouter.Range intersect(DocRouter.Range a, DocRouter.Range b) {
-    if (a == null || b == null || !a.overlaps(b)) {
-      return null;
-    } else if (a.isSubsetOf(b))
-      return a;
-    else if (b.isSubsetOf(a))
-      return b;
-    else if (b.includes(a.max)) {
-      return new DocRouter.Range(b.min, a.max);
-    } else  {
-      return new DocRouter.Range(a.min, b.max);
-    }
-  }
-
-  void sendShardRequest(String nodeName, ModifiableSolrParams params,
-                        ShardHandler shardHandler, String asyncId,
-                        Map<String, String> requestMap) {
-    sendShardRequest(nodeName, params, shardHandler, asyncId, requestMap, adminPath, zkStateReader);
-
-  }
-
-  public static void sendShardRequest(String nodeName, ModifiableSolrParams params, ShardHandler shardHandler,
-                                      String asyncId, Map<String, String> requestMap, String adminPath,
-                                      ZkStateReader zkStateReader) {
-    if (asyncId != null) {
-      String coreAdminAsyncId = asyncId + Math.abs(System.nanoTime());
-      params.set(ASYNC, coreAdminAsyncId);
-      requestMap.put(nodeName, coreAdminAsyncId);
-    }
-
-    ShardRequest sreq = new ShardRequest();
-    params.set("qt", adminPath);
-    sreq.purpose = 1;
-    String replica = zkStateReader.getBaseUrlForNodeName(nodeName);
-    sreq.shards = new String[]{replica};
-    sreq.actualShards = sreq.shards;
-    sreq.nodeName = nodeName;
-    sreq.params = params;
-
-    shardHandler.submit(sreq, replica, sreq.params);
-  }
-
-  void addPropertyParams(ZkNodeProps message, ModifiableSolrParams params) {
-    // Now add the property.key=value pairs
-    for (String key : message.keySet()) {
-      if (key.startsWith(COLL_PROP_PREFIX)) {
-        params.set(key, message.getStr(key));
-      }
-    }
-  }
-
-  void addPropertyParams(ZkNodeProps message, Map<String, Object> map) {
-    // Now add the property.key=value pairs
-    for (String key : message.keySet()) {
-      if (key.startsWith(COLL_PROP_PREFIX)) {
-        map.put(key, message.getStr(key));
-      }
-    }
-  }
-
-
-  private void modifyCollection(ClusterState clusterState, ZkNodeProps message, NamedList results)
-      throws Exception {
-    
-    final String collectionName = message.getStr(ZkStateReader.COLLECTION_PROP);
-    //the rest of the processing is based on writing cluster state properties
-    //remove the property here to avoid any errors down the pipeline due to this property appearing
-    String configName = (String) message.getProperties().remove(COLL_CONF);
-    
-    if(configName != null) {
-      validateConfigOrThrowSolrException(configName);
-      
-      boolean isLegacyCloud =  Overseer.isLegacy(zkStateReader);
-      createConfNode(cloudManager.getDistribStateManager(), configName, collectionName, isLegacyCloud);
-      reloadCollection(null, new ZkNodeProps(NAME, collectionName), results);
-    }
-    
-    overseer.getStateUpdateQueue(zkStateReader.getZkClient()).offer(Utils.toJSON(message));
-
-    TimeOut timeout = new TimeOut(30, TimeUnit.SECONDS, timeSource);
-    boolean areChangesVisible = true;
-    while (!timeout.hasTimedOut()) {
-      DocCollection collection = cloudManager.getClusterStateProvider().getClusterState().getCollection(collectionName);
-      areChangesVisible = true;
-      for (Map.Entry<String,Object> updateEntry : message.getProperties().entrySet()) {
-        String updateKey = updateEntry.getKey();
-        if (!updateKey.equals(ZkStateReader.COLLECTION_PROP)
-            && !updateKey.equals(Overseer.QUEUE_OPERATION)
-            && !collection.get(updateKey).equals(updateEntry.getValue())){
-          areChangesVisible = false;
-          break;
-        }
-      }
-      if (areChangesVisible) break;
-      timeout.sleep(100);
-    }
-
-    if (!areChangesVisible)
-      throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Could not modify collection " + message);
-  }
-
-  void cleanupCollection(String collectionName, NamedList results) throws Exception {
-    log.error("Cleaning up collection [" + collectionName + "]." );
-    Map<String, Object> props = makeMap(
-        Overseer.QUEUE_OPERATION, DELETE.toLower(),
-        NAME, collectionName);
-    commandMap.get(DELETE).call(zkStateReader.getClusterState(), new ZkNodeProps(props), results);
-  }
-
-  Map<String, Replica> waitToSeeReplicasInState(String collectionName, Collection<String> coreNames) throws InterruptedException {
-    Map<String, Replica> result = new HashMap<>();
-    TimeOut timeout = new TimeOut(30, TimeUnit.SECONDS, timeSource);
-    while (true) {
-      DocCollection coll = zkStateReader.getClusterState().getCollection(collectionName);
-      for (String coreName : coreNames) {
-        if (result.containsKey(coreName)) continue;
-        for (Slice slice : coll.getSlices()) {
-          for (Replica replica : slice.getReplicas()) {
-            if (coreName.equals(replica.getStr(ZkStateReader.CORE_NAME_PROP))) {
-              result.put(coreName, replica);
-              break;
-            }
-          }
-        }
-      }
-      
-      if (result.size() == coreNames.size()) {
-        return result;
-      } else {
-        log.debug("Expecting {} cores but found {}", coreNames, result);
-      }
-      if (timeout.hasTimedOut()) {
-        throw new SolrException(ErrorCode.SERVER_ERROR, "Timed out waiting to see all replicas: " + coreNames + " in cluster state. Last state: " + coll);
-      }
-      
-      Thread.sleep(100);
-    }
-  }
-
-  ZkNodeProps addReplica(ClusterState clusterState, ZkNodeProps message, NamedList results, Runnable onComplete)
-      throws Exception {
-
-    return ((AddReplicaCmd) commandMap.get(ADDREPLICA)).addReplica(clusterState, message, results, onComplete);
-  }
-
-  void processResponses(NamedList results, ShardHandler shardHandler, boolean abortOnError, String msgOnError,
-                        String asyncId, Map<String, String> requestMap) {
-    processResponses(results, shardHandler, abortOnError, msgOnError, asyncId, requestMap, Collections.emptySet());
-  }
-
-  void processResponses(NamedList results, ShardHandler shardHandler, boolean abortOnError, String msgOnError,
-                                String asyncId, Map<String, String> requestMap, Set<String> okayExceptions) {
-    //Processes all shard responses
-    ShardResponse srsp;
-    do {
-      srsp = shardHandler.takeCompletedOrError();
-      if (srsp != null) {
-        processResponse(results, srsp, okayExceptions);
-        Throwable exception = srsp.getException();
-        if (abortOnError && exception != null)  {
-          // drain pending requests
-          while (srsp != null)  {
-            srsp = shardHandler.takeCompletedOrError();
-          }
-          throw new SolrException(ErrorCode.SERVER_ERROR, msgOnError, exception);
-        }
-      }
-    } while (srsp != null);
-
-    //If request is async wait for the core admin to complete before returning
-    if (asyncId != null) {
-      waitForAsyncCallsToComplete(requestMap, results);
-      requestMap.clear();
-    }
-  }
-
-
-  void validateConfigOrThrowSolrException(String configName) throws IOException, KeeperException, InterruptedException {
-    boolean isValid = cloudManager.getDistribStateManager().hasData(ZkConfigManager.CONFIGS_ZKNODE + "/" + configName);
-    if(!isValid) {
-      throw new SolrException(ErrorCode.BAD_REQUEST, "Can not find the specified config set: " + configName);
-    }
-  }
-
-  /**
-   * This doesn't validate the config (path) itself and is just responsible for creating the confNode.
-   * That check should be done before the config node is created.
-   */
-  public static void createConfNode(DistribStateManager stateManager, String configName, String coll, boolean isLegacyCloud) throws IOException, AlreadyExistsException, BadVersionException, KeeperException, InterruptedException {
-    
-    if (configName != null) {
-      String collDir = ZkStateReader.COLLECTIONS_ZKNODE + "/" + coll;
-      log.debug("creating collections conf node {} ", collDir);
-      byte[] data = Utils.toJSON(makeMap(ZkController.CONFIGNAME_PROP, configName));
-      if (stateManager.hasData(collDir)) {
-        stateManager.setData(collDir, data, -1);
-      } else {
-        stateManager.makePath(collDir, data, CreateMode.PERSISTENT, false);
-      }
-    } else {
-      if(isLegacyCloud){
-        log.warn("Could not obtain config name");
-      } else {
-        throw new SolrException(ErrorCode.BAD_REQUEST,"Unable to get config name");
-      }
-    }
-  }
-  
-  private void collectionCmd(ZkNodeProps message, ModifiableSolrParams params,
-                             NamedList results, Replica.State stateMatcher, String asyncId, Map<String, String> requestMap) {
-    collectionCmd( message, params, results, stateMatcher, asyncId, requestMap, Collections.emptySet());
-  }
-
-
-  void collectionCmd(ZkNodeProps message, ModifiableSolrParams params,
-                     NamedList results, Replica.State stateMatcher, String asyncId, Map<String, String> requestMap, Set<String> okayExceptions) {
-    log.info("Executing Collection Cmd : " + params);
-    String collectionName = message.getStr(NAME);
-    ShardHandler shardHandler = shardHandlerFactory.getShardHandler();
-
-    ClusterState clusterState = zkStateReader.getClusterState();
-    DocCollection coll = clusterState.getCollection(collectionName);
-    
-    for (Slice slice : coll.getSlices()) {
-      sliceCmd(clusterState, params, stateMatcher, slice, shardHandler, asyncId, requestMap);
-    }
-
-    processResponses(results, shardHandler, false, null, asyncId, requestMap, okayExceptions);
-
-  }
-
-  void sliceCmd(ClusterState clusterState, ModifiableSolrParams params, Replica.State stateMatcher,
-                Slice slice, ShardHandler shardHandler, String asyncId, Map<String, String> requestMap) {
-
-    for (Replica replica : slice.getReplicas()) {
-      if (clusterState.liveNodesContain(replica.getStr(ZkStateReader.NODE_NAME_PROP))
-          && (stateMatcher == null || Replica.State.getState(replica.getStr(ZkStateReader.STATE_PROP)) == stateMatcher)) {
-
-        // For thread safety, only simple clone the ModifiableSolrParams
-        ModifiableSolrParams cloneParams = new ModifiableSolrParams();
-        cloneParams.add(params);
-        cloneParams.set(CoreAdminParams.CORE, replica.getStr(ZkStateReader.CORE_NAME_PROP));
-
-        sendShardRequest(replica.getStr(ZkStateReader.NODE_NAME_PROP), cloneParams, shardHandler, asyncId, requestMap);
-      }
-    }
-  }
-  
-  private void processResponse(NamedList results, ShardResponse srsp, Set<String> okayExceptions) {
-    Throwable e = srsp.getException();
-    String nodeName = srsp.getNodeName();
-    SolrResponse solrResponse = srsp.getSolrResponse();
-    String shard = srsp.getShard();
-
-    processResponse(results, e, nodeName, solrResponse, shard, okayExceptions);
-  }
-
-  @SuppressWarnings("unchecked")
-  private void processResponse(NamedList results, Throwable e, String nodeName, SolrResponse solrResponse, String shard, Set<String> okayExceptions) {
-    String rootThrowable = null;
-    if (e instanceof RemoteSolrException) {
-      rootThrowable = ((RemoteSolrException) e).getRootThrowable();
-    }
-
-    if (e != null && (rootThrowable == null || !okayExceptions.contains(rootThrowable))) {
-      log.error("Error from shard: " + shard, e);
-
-      SimpleOrderedMap failure = (SimpleOrderedMap) results.get("failure");
-      if (failure == null) {
-        failure = new SimpleOrderedMap();
-        results.add("failure", failure);
-      }
-
-      failure.add(nodeName, e.getClass().getName() + ":" + e.getMessage());
-
-    } else {
-
-      SimpleOrderedMap success = (SimpleOrderedMap) results.get("success");
-      if (success == null) {
-        success = new SimpleOrderedMap();
-        results.add("success", success);
-      }
-
-      success.add(nodeName, solrResponse.getResponse());
-    }
-  }
-
-  @SuppressWarnings("unchecked")
-  private void waitForAsyncCallsToComplete(Map<String, String> requestMap, NamedList results) {
-    for (String k:requestMap.keySet()) {
-      log.debug("I am Waiting for :{}/{}", k, requestMap.get(k));
-      results.add(requestMap.get(k), waitForCoreAdminAsyncCallToComplete(k, requestMap.get(k)));
-    }
-  }
-
-  private NamedList waitForCoreAdminAsyncCallToComplete(String nodeName, String requestId) {
-    ShardHandler shardHandler = shardHandlerFactory.getShardHandler();
-    ModifiableSolrParams params = new ModifiableSolrParams();
-    params.set(CoreAdminParams.ACTION, CoreAdminAction.REQUESTSTATUS.toString());
-    params.set(CoreAdminParams.REQUESTID, requestId);
-    int counter = 0;
-    ShardRequest sreq;
-    do {
-      sreq = new ShardRequest();
-      params.set("qt", adminPath);
-      sreq.purpose = 1;
-      String replica = zkStateReader.getBaseUrlForNodeName(nodeName);
-      sreq.shards = new String[] {replica};
-      sreq.actualShards = sreq.shards;
-      sreq.params = params;
-
-      shardHandler.submit(sreq, replica, sreq.params);
-
-      ShardResponse srsp;
-      do {
-        srsp = shardHandler.takeCompletedOrError();
-        if (srsp != null) {
-          NamedList results = new NamedList();
-          processResponse(results, srsp, Collections.emptySet());
-          if (srsp.getSolrResponse().getResponse() == null) {
-            NamedList response = new NamedList();
-            response.add("STATUS", "failed");
-            return response;
-          }
-          
-          String r = (String) srsp.getSolrResponse().getResponse().get("STATUS");
-          if (r.equals("running")) {
-            log.debug("The task is still RUNNING, continuing to wait.");
-            try {
-              Thread.sleep(1000);
-            } catch (InterruptedException e) {
-              Thread.currentThread().interrupt();
-            }
-            continue;
-
-          } else if (r.equals("completed")) {
-            log.debug("The task is COMPLETED, returning");
-            return srsp.getSolrResponse().getResponse();
-          } else if (r.equals("failed")) {
-            // TODO: Improve this. Get more information.
-            log.debug("The task is FAILED, returning");
-            return srsp.getSolrResponse().getResponse();
-          } else if (r.equals("notfound")) {
-            log.debug("The task is notfound, retry");
-            if (counter++ < 5) {
-              try {
-                Thread.sleep(1000);
-              } catch (InterruptedException e) {
-              }
-              break;
-            }
-            throw new SolrException(ErrorCode.BAD_REQUEST, "Invalid status request for requestId: " + requestId + "" + srsp.getSolrResponse().getResponse().get("STATUS") +
-                "retried " + counter + "times");
-          } else {
-            throw new SolrException(ErrorCode.BAD_REQUEST, "Invalid status request " + srsp.getSolrResponse().getResponse().get("STATUS"));
-          }
-        }
-      } while (srsp != null);
-    } while(true);
-  }
-
-  @Override
-  public String getName() {
-    return "Overseer Collection Message Handler";
-  }
-
-  @Override
-  public String getTimerName(String operation) {
-    return "collection_" + operation;
-  }
-
-  @Override
-  public String getTaskKey(ZkNodeProps message) {
-    return message.containsKey(COLLECTION_PROP) ?
-      message.getStr(COLLECTION_PROP) : message.getStr(NAME);
-  }
-
-
-  private long sessionId = -1;
-  private LockTree.Session lockSession;
-
-  @Override
-  public Lock lockTask(ZkNodeProps message, OverseerTaskProcessor.TaskBatch taskBatch) {
-    if (lockSession == null || sessionId != taskBatch.getId()) {
-      //this is always called in the same thread.
-      //Each batch is supposed to have a new taskBatch
-      //So if taskBatch changes we must create a new Session
-      // also check if the running tasks are empty. If yes, clear lockTree
-      // this will ensure that locks are not 'leaked'
-      if(taskBatch.getRunningTasks() == 0) lockTree.clear();
-      lockSession = lockTree.getSession();
-    }
-    return lockSession.lock(getCollectionAction(message.getStr(Overseer.QUEUE_OPERATION)),
-        Arrays.asList(
-            getTaskKey(message),
-            message.getStr(ZkStateReader.SHARD_ID_PROP),
-            message.getStr(ZkStateReader.REPLICA_PROP))
-
-    );
-  }
-
-
-  @Override
-  public void close() throws IOException {
-    this.isClosed = true;
-    if (tpe != null) {
-      if (!tpe.isShutdown()) {
-        ExecutorUtil.shutdownAndAwaitTermination(tpe);
-      }
-    }
-  }
-
-  @Override
-  public boolean isClosed() {
-    return isClosed;
-  }
-
-  interface Cmd {
-    void call(ClusterState state, ZkNodeProps message, NamedList results) throws Exception;
-  }
-}

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/a3c4f738/solr/core/src/java/org/apache/solr/cloud/OverseerRoleCmd.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/OverseerRoleCmd.java b/solr/core/src/java/org/apache/solr/cloud/OverseerRoleCmd.java
deleted file mode 100644
index 0f450bd..0000000
--- a/solr/core/src/java/org/apache/solr/cloud/OverseerRoleCmd.java
+++ /dev/null
@@ -1,102 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.solr.cloud;
-
-
-import java.lang.invoke.MethodHandles;
-import java.util.ArrayList;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.solr.cloud.OverseerCollectionMessageHandler.Cmd;
-import org.apache.solr.common.cloud.ClusterState;
-import org.apache.solr.common.cloud.SolrZkClient;
-import org.apache.solr.common.cloud.ZkNodeProps;
-import org.apache.solr.common.cloud.ZkStateReader;
-import org.apache.solr.common.params.CollectionParams.CollectionAction;
-import org.apache.solr.common.util.NamedList;
-import org.apache.solr.common.util.Utils;
-import org.apache.zookeeper.CreateMode;
-import org.apache.zookeeper.data.Stat;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import static org.apache.solr.common.params.CollectionParams.CollectionAction.ADDROLE;
-import static org.apache.solr.common.params.CollectionParams.CollectionAction.REMOVEROLE;
-
-public class OverseerRoleCmd implements Cmd {
-  private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
-
-  private final OverseerCollectionMessageHandler ocmh;
-  private final CollectionAction operation;
-  private final OverseerNodePrioritizer overseerPrioritizer;
-
-
-
-  public OverseerRoleCmd(OverseerCollectionMessageHandler ocmh, CollectionAction operation, OverseerNodePrioritizer prioritizer) {
-    this.ocmh = ocmh;
-    this.operation = operation;
-    this.overseerPrioritizer = prioritizer;
-  }
-
-  @Override
-  @SuppressWarnings("unchecked")
-  public void call(ClusterState state, ZkNodeProps message, NamedList results) throws Exception {
-    ZkStateReader zkStateReader = ocmh.zkStateReader;
-    SolrZkClient zkClient = zkStateReader.getZkClient();
-    Map roles = null;
-    String node = message.getStr("node");
-
-    String roleName = message.getStr("role");
-    boolean nodeExists = false;
-    if (nodeExists = zkClient.exists(ZkStateReader.ROLES, true)) {
-      roles = (Map) Utils.fromJSON(zkClient.getData(ZkStateReader.ROLES, null, new Stat(), true));
-    } else {
-      roles = new LinkedHashMap(1);
-    }
-
-    List nodeList = (List) roles.get(roleName);
-    if (nodeList == null) roles.put(roleName, nodeList = new ArrayList());
-    if (ADDROLE == operation) {
-      log.info("Overseer role added to {}", node);
-      if (!nodeList.contains(node)) nodeList.add(node);
-    } else if (REMOVEROLE == operation) {
-      log.info("Overseer role removed from {}", node);
-      nodeList.remove(node);
-    }
-
-    if (nodeExists) {
-      zkClient.setData(ZkStateReader.ROLES, Utils.toJSON(roles), true);
-    } else {
-      zkClient.create(ZkStateReader.ROLES, Utils.toJSON(roles), CreateMode.PERSISTENT, true);
-    }
-    //if there are too many nodes this command may time out. And most likely dedicated
-    // overseers are created when there are too many nodes  . So , do this operation in a separate thread
-    new Thread(() -> {
-      try {
-        overseerPrioritizer.prioritizeOverseerNodes(ocmh.myId);
-      } catch (Exception e) {
-        log.error("Error in prioritizing Overseer", e);
-      }
-
-    }).start();
-
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/a3c4f738/solr/core/src/java/org/apache/solr/cloud/OverseerStatusCmd.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/OverseerStatusCmd.java b/solr/core/src/java/org/apache/solr/cloud/OverseerStatusCmd.java
deleted file mode 100644
index aba4872..0000000
--- a/solr/core/src/java/org/apache/solr/cloud/OverseerStatusCmd.java
+++ /dev/null
@@ -1,112 +0,0 @@
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.solr.cloud;
-
-import java.lang.invoke.MethodHandles;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-
-import com.codahale.metrics.Timer;
-import org.apache.solr.cloud.OverseerCollectionMessageHandler.Cmd;
-import org.apache.solr.common.cloud.ClusterState;
-import org.apache.solr.common.cloud.ZkNodeProps;
-import org.apache.solr.common.cloud.ZkStateReader;
-import org.apache.solr.common.util.NamedList;
-import org.apache.solr.common.util.SimpleOrderedMap;
-import org.apache.solr.util.stats.MetricUtils;
-import org.apache.zookeeper.data.Stat;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class OverseerStatusCmd implements Cmd {
-  private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
-  private final OverseerCollectionMessageHandler ocmh;
-
-  public OverseerStatusCmd(OverseerCollectionMessageHandler ocmh) {
-    this.ocmh = ocmh;
-  }
-
-  @Override
-  @SuppressWarnings("unchecked")
-  public void call(ClusterState state, ZkNodeProps message, NamedList results) throws Exception {
-    ZkStateReader zkStateReader = ocmh.zkStateReader;
-    String leaderNode = OverseerTaskProcessor.getLeaderNode(zkStateReader.getZkClient());
-    results.add("leader", leaderNode);
-    Stat stat = new Stat();
-    zkStateReader.getZkClient().getData("/overseer/queue",null, stat, true);
-    results.add("overseer_queue_size", stat.getNumChildren());
-    stat = new Stat();
-    zkStateReader.getZkClient().getData("/overseer/queue-work",null, stat, true);
-    results.add("overseer_work_queue_size", stat.getNumChildren());
-    stat = new Stat();
-    zkStateReader.getZkClient().getData("/overseer/collection-queue-work",null, stat, true);
-    results.add("overseer_collection_queue_size", stat.getNumChildren());
-
-    NamedList overseerStats = new NamedList();
-    NamedList collectionStats = new NamedList();
-    NamedList stateUpdateQueueStats = new NamedList();
-    NamedList workQueueStats = new NamedList();
-    NamedList collectionQueueStats = new NamedList();
-    Stats stats = ocmh.stats;
-    for (Map.Entry<String, Stats.Stat> entry : stats.getStats().entrySet()) {
-      String key = entry.getKey();
-      NamedList<Object> lst = new SimpleOrderedMap<>();
-      if (key.startsWith("collection_"))  {
-        collectionStats.add(key.substring(11), lst);
-        int successes = stats.getSuccessCount(entry.getKey());
-        int errors = stats.getErrorCount(entry.getKey());
-        lst.add("requests", successes);
-        lst.add("errors", errors);
-        List<Stats.FailedOp> failureDetails = stats.getFailureDetails(key);
-        if (failureDetails != null) {
-          List<SimpleOrderedMap<Object>> failures = new ArrayList<>();
-          for (Stats.FailedOp failedOp : failureDetails) {
-            SimpleOrderedMap<Object> fail = new SimpleOrderedMap<>();
-            fail.add("request", failedOp.req.getProperties());
-            fail.add("response", failedOp.resp.getResponse());
-            failures.add(fail);
-          }
-          lst.add("recent_failures", failures);
-        }
-      } else if (key.startsWith("/overseer/queue_"))  {
-        stateUpdateQueueStats.add(key.substring(16), lst);
-      } else if (key.startsWith("/overseer/queue-work_"))  {
-        workQueueStats.add(key.substring(21), lst);
-      } else if (key.startsWith("/overseer/collection-queue-work_"))  {
-        collectionQueueStats.add(key.substring(32), lst);
-      } else  {
-        // overseer stats
-        overseerStats.add(key, lst);
-        int successes = stats.getSuccessCount(entry.getKey());
-        int errors = stats.getErrorCount(entry.getKey());
-        lst.add("requests", successes);
-        lst.add("errors", errors);
-      }
-      Timer timer = entry.getValue().requestTime;
-      MetricUtils.addMetrics(lst, timer);
-    }
-    results.add("overseer_operations", overseerStats);
-    results.add("collection_operations", collectionStats);
-    results.add("overseer_queue", stateUpdateQueueStats);
-    results.add("overseer_internal_queue", workQueueStats);
-    results.add("collection_queue", collectionQueueStats);
-
-  }
-}

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/a3c4f738/solr/core/src/java/org/apache/solr/cloud/ReplaceNodeCmd.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/ReplaceNodeCmd.java b/solr/core/src/java/org/apache/solr/cloud/ReplaceNodeCmd.java
deleted file mode 100644
index e903091..0000000
--- a/solr/core/src/java/org/apache/solr/cloud/ReplaceNodeCmd.java
+++ /dev/null
@@ -1,226 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.solr.cloud;
-
-
-import java.lang.invoke.MethodHandles;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Locale;
-import java.util.Map;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import org.apache.solr.common.SolrCloseableLatch;
-import org.apache.solr.common.SolrException;
-import org.apache.solr.common.cloud.ClusterState;
-import org.apache.solr.common.cloud.CollectionStateWatcher;
-import org.apache.solr.common.cloud.DocCollection;
-import org.apache.solr.common.cloud.Replica;
-import org.apache.solr.common.cloud.Slice;
-import org.apache.solr.common.cloud.ZkNodeProps;
-import org.apache.solr.common.cloud.ZkStateReader;
-import org.apache.solr.common.params.CollectionParams;
-import org.apache.solr.common.params.CommonAdminParams;
-import org.apache.solr.common.params.CoreAdminParams;
-import org.apache.solr.common.util.NamedList;
-import org.apache.zookeeper.KeeperException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import static org.apache.solr.common.cloud.ZkStateReader.COLLECTION_PROP;
-import static org.apache.solr.common.cloud.ZkStateReader.SHARD_ID_PROP;
-import static org.apache.solr.common.params.CommonAdminParams.ASYNC;
-
-public class ReplaceNodeCmd implements OverseerCollectionMessageHandler.Cmd {
-  private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
-
-  private final OverseerCollectionMessageHandler ocmh;
-
-  public ReplaceNodeCmd(OverseerCollectionMessageHandler ocmh) {
-    this.ocmh = ocmh;
-  }
-
-  @Override
-  public void call(ClusterState state, ZkNodeProps message, NamedList results) throws Exception {
-    ZkStateReader zkStateReader = ocmh.zkStateReader;
-    String source = message.getStr(CollectionParams.SOURCE_NODE, message.getStr("source"));
-    String target = message.getStr(CollectionParams.TARGET_NODE, message.getStr("target"));
-    boolean waitForFinalState = message.getBool(CommonAdminParams.WAIT_FOR_FINAL_STATE, false);
-    if (source == null || target == null) {
-      throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "sourceNode and targetNode are required params" );
-    }
-    String async = message.getStr("async");
-    int timeout = message.getInt("timeout", 10 * 60); // 10 minutes
-    boolean parallel = message.getBool("parallel", false);
-    ClusterState clusterState = zkStateReader.getClusterState();
-
-    if (!clusterState.liveNodesContain(source)) {
-      throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Source Node: " + source + " is not live");
-    }
-    if (!clusterState.liveNodesContain(target)) {
-      throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Target Node: " + target + " is not live");
-    }
-    List<ZkNodeProps> sourceReplicas = getReplicasOfNode(source, clusterState);
-    // how many leaders are we moving? for these replicas we have to make sure that either:
-    // * another existing replica can become a leader, or
-    // * we wait until the newly created replica completes recovery (and can become the new leader)
-    // If waitForFinalState=true we wait for all replicas
-    int numLeaders = 0;
-    for (ZkNodeProps props : sourceReplicas) {
-      if (props.getBool(ZkStateReader.LEADER_PROP, false) || waitForFinalState) {
-        numLeaders++;
-      }
-    }
-    // map of collectionName_coreNodeName to watchers
-    Map<String, CollectionStateWatcher> watchers = new HashMap<>();
-    List<ZkNodeProps> createdReplicas = new ArrayList<>();
-
-    AtomicBoolean anyOneFailed = new AtomicBoolean(false);
-    SolrCloseableLatch countDownLatch = new SolrCloseableLatch(sourceReplicas.size(), ocmh);
-
-    SolrCloseableLatch replicasToRecover = new SolrCloseableLatch(numLeaders, ocmh);
-
-    for (ZkNodeProps sourceReplica : sourceReplicas) {
-      NamedList nl = new NamedList();
-      log.info("Going to create replica for collection={} shard={} on node={}", sourceReplica.getStr(COLLECTION_PROP), sourceReplica.getStr(SHARD_ID_PROP), target);
-      ZkNodeProps msg = sourceReplica.plus("parallel", String.valueOf(parallel)).plus(CoreAdminParams.NODE, target);
-      if(async!=null) msg.getProperties().put(ASYNC, async);
-      final ZkNodeProps addedReplica = ocmh.addReplica(clusterState,
-          msg, nl, () -> {
-            countDownLatch.countDown();
-            if (nl.get("failure") != null) {
-              String errorString = String.format(Locale.ROOT, "Failed to create replica for collection=%s shard=%s" +
-                  " on node=%s", sourceReplica.getStr(COLLECTION_PROP), sourceReplica.getStr(SHARD_ID_PROP), target);
-              log.warn(errorString);
-              // one replica creation failed. Make the best attempt to
-              // delete all the replicas created so far in the target
-              // and exit
-              synchronized (results) {
-                results.add("failure", errorString);
-                anyOneFailed.set(true);
-              }
-            } else {
-              log.debug("Successfully created replica for collection={} shard={} on node={}",
-                  sourceReplica.getStr(COLLECTION_PROP), sourceReplica.getStr(SHARD_ID_PROP), target);
-            }
-          });
-
-      if (addedReplica != null) {
-        createdReplicas.add(addedReplica);
-        if (sourceReplica.getBool(ZkStateReader.LEADER_PROP, false) || waitForFinalState) {
-          String shardName = sourceReplica.getStr(SHARD_ID_PROP);
-          String replicaName = sourceReplica.getStr(ZkStateReader.REPLICA_PROP);
-          String collectionName = sourceReplica.getStr(COLLECTION_PROP);
-          String key = collectionName + "_" + replicaName;
-          CollectionStateWatcher watcher;
-          if (waitForFinalState) {
-            watcher = new ActiveReplicaWatcher(collectionName, null,
-                Collections.singletonList(addedReplica.getStr(ZkStateReader.CORE_NAME_PROP)), replicasToRecover);
-          } else {
-            watcher = new LeaderRecoveryWatcher(collectionName, shardName, replicaName,
-                addedReplica.getStr(ZkStateReader.CORE_NAME_PROP), replicasToRecover);
-          }
-          watchers.put(key, watcher);
-          log.debug("--- adding " + key + ", " + watcher);
-          zkStateReader.registerCollectionStateWatcher(collectionName, watcher);
-        } else {
-          log.debug("--- not waiting for " + addedReplica);
-        }
-      }
-    }
-
-    log.debug("Waiting for replicas to be added");
-    if (!countDownLatch.await(timeout, TimeUnit.SECONDS)) {
-      log.info("Timed out waiting for replicas to be added");
-      anyOneFailed.set(true);
-    } else {
-      log.debug("Finished waiting for replicas to be added");
-    }
-
-    // now wait for leader replicas to recover
-    log.debug("Waiting for " + numLeaders + " leader replicas to recover");
-    if (!replicasToRecover.await(timeout, TimeUnit.SECONDS)) {
-      log.info("Timed out waiting for " + replicasToRecover.getCount() + " leader replicas to recover");
-      anyOneFailed.set(true);
-    } else {
-      log.debug("Finished waiting for leader replicas to recover");
-    }
-    // remove the watchers, we're done either way
-    for (Map.Entry<String, CollectionStateWatcher> e : watchers.entrySet()) {
-      zkStateReader.removeCollectionStateWatcher(e.getKey(), e.getValue());
-    }
-    if (anyOneFailed.get()) {
-      log.info("Failed to create some replicas. Cleaning up all replicas on target node");
-      SolrCloseableLatch cleanupLatch = new SolrCloseableLatch(createdReplicas.size(), ocmh);
-      for (ZkNodeProps createdReplica : createdReplicas) {
-        NamedList deleteResult = new NamedList();
-        try {
-          ocmh.deleteReplica(zkStateReader.getClusterState(), createdReplica.plus("parallel", "true"), deleteResult, () -> {
-            cleanupLatch.countDown();
-            if (deleteResult.get("failure") != null) {
-              synchronized (results) {
-                results.add("failure", "Could not cleanup, because of : " + deleteResult.get("failure"));
-              }
-            }
-          });
-        } catch (KeeperException e) {
-          cleanupLatch.countDown();
-          log.warn("Error deleting replica ", e);
-        } catch (Exception e) {
-          log.warn("Error deleting replica ", e);
-          cleanupLatch.countDown();
-          throw e;
-        }
-      }
-      cleanupLatch.await(5, TimeUnit.MINUTES);
-      return;
-    }
-
-
-    // we have reached this far means all replicas could be recreated
-    //now cleanup the replicas in the source node
-    DeleteNodeCmd.cleanupReplicas(results, state, sourceReplicas, ocmh, source, async);
-    results.add("success", "REPLACENODE action completed successfully from  : " + source + " to : " + target);
-  }
-
-  static List<ZkNodeProps> getReplicasOfNode(String source, ClusterState state) {
-    List<ZkNodeProps> sourceReplicas = new ArrayList<>();
-    for (Map.Entry<String, DocCollection> e : state.getCollectionsMap().entrySet()) {
-      for (Slice slice : e.getValue().getSlices()) {
-        for (Replica replica : slice.getReplicas()) {
-          if (source.equals(replica.getNodeName())) {
-            ZkNodeProps props = new ZkNodeProps(
-                COLLECTION_PROP, e.getKey(),
-                SHARD_ID_PROP, slice.getName(),
-                ZkStateReader.CORE_NAME_PROP, replica.getCoreName(),
-                ZkStateReader.REPLICA_PROP, replica.getName(),
-                ZkStateReader.REPLICA_TYPE, replica.getType().name(),
-                ZkStateReader.LEADER_PROP, String.valueOf(replica.equals(slice.getLeader())),
-                CoreAdminParams.NODE, source);
-            sourceReplicas.add(props);
-          }
-        }
-      }
-    }
-    return sourceReplicas;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/a3c4f738/solr/core/src/java/org/apache/solr/cloud/RestoreCmd.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/RestoreCmd.java b/solr/core/src/java/org/apache/solr/cloud/RestoreCmd.java
deleted file mode 100644
index 9c9a5c9..0000000
--- a/solr/core/src/java/org/apache/solr/cloud/RestoreCmd.java
+++ /dev/null
@@ -1,363 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.solr.cloud;
-
-
-import java.lang.invoke.MethodHandles;
-import java.net.URI;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Locale;
-import java.util.Map;
-import java.util.Objects;
-import java.util.Optional;
-import java.util.Properties;
-import java.util.Set;
-
-import org.apache.solr.client.solrj.cloud.DistributedQueue;
-import org.apache.solr.client.solrj.cloud.autoscaling.PolicyHelper;
-import org.apache.solr.cloud.overseer.OverseerAction;
-import org.apache.solr.common.SolrException;
-import org.apache.solr.common.SolrException.ErrorCode;
-import org.apache.solr.common.cloud.ClusterState;
-import org.apache.solr.common.cloud.DocCollection;
-import org.apache.solr.common.cloud.ImplicitDocRouter;
-import org.apache.solr.common.cloud.Replica;
-import org.apache.solr.common.cloud.ReplicaPosition;
-import org.apache.solr.common.cloud.Slice;
-import org.apache.solr.common.cloud.ZkNodeProps;
-import org.apache.solr.common.cloud.ZkStateReader;
-import org.apache.solr.common.params.CoreAdminParams;
-import org.apache.solr.common.params.ModifiableSolrParams;
-import org.apache.solr.common.util.NamedList;
-import org.apache.solr.common.util.StrUtils;
-import org.apache.solr.common.util.Utils;
-import org.apache.solr.core.CoreContainer;
-import org.apache.solr.core.backup.BackupManager;
-import org.apache.solr.core.backup.repository.BackupRepository;
-import org.apache.solr.handler.component.ShardHandler;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import static org.apache.solr.cloud.OverseerCollectionMessageHandler.COLL_CONF;
-import static org.apache.solr.cloud.OverseerCollectionMessageHandler.COLL_PROPS;
-import static org.apache.solr.cloud.OverseerCollectionMessageHandler.CREATE_NODE_SET;
-import static org.apache.solr.cloud.OverseerCollectionMessageHandler.CREATE_NODE_SET_EMPTY;
-import static org.apache.solr.cloud.OverseerCollectionMessageHandler.NUM_SLICES;
-import static org.apache.solr.cloud.OverseerCollectionMessageHandler.RANDOM;
-import static org.apache.solr.cloud.OverseerCollectionMessageHandler.SHARDS_PROP;
-import static org.apache.solr.common.cloud.DocCollection.STATE_FORMAT;
-import static org.apache.solr.common.cloud.ZkStateReader.COLLECTION_PROP;
-import static org.apache.solr.common.cloud.ZkStateReader.MAX_SHARDS_PER_NODE;
-import static org.apache.solr.common.cloud.ZkStateReader.NRT_REPLICAS;
-import static org.apache.solr.common.cloud.ZkStateReader.PULL_REPLICAS;
-import static org.apache.solr.common.cloud.ZkStateReader.REPLICATION_FACTOR;
-import static org.apache.solr.common.cloud.ZkStateReader.REPLICA_TYPE;
-import static org.apache.solr.common.cloud.ZkStateReader.SHARD_ID_PROP;
-import static org.apache.solr.common.cloud.ZkStateReader.TLOG_REPLICAS;
-import static org.apache.solr.common.params.CollectionParams.CollectionAction.CREATE;
-import static org.apache.solr.common.params.CollectionParams.CollectionAction.CREATESHARD;
-import static org.apache.solr.common.params.CommonAdminParams.ASYNC;
-import static org.apache.solr.common.params.CommonParams.NAME;
-
-public class RestoreCmd implements OverseerCollectionMessageHandler.Cmd {
-  private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
-
-  private final OverseerCollectionMessageHandler ocmh;
-
-  public RestoreCmd(OverseerCollectionMessageHandler ocmh) {
-    this.ocmh = ocmh;
-  }
-
-  @Override
-  public void call(ClusterState state, ZkNodeProps message, NamedList results) throws Exception {
-    // TODO maybe we can inherit createCollection's options/code
-
-    String restoreCollectionName = message.getStr(COLLECTION_PROP);
-    String backupName = message.getStr(NAME); // of backup
-    ShardHandler shardHandler = ocmh.shardHandlerFactory.getShardHandler();
-    String asyncId = message.getStr(ASYNC);
-    String repo = message.getStr(CoreAdminParams.BACKUP_REPOSITORY);
-    Map<String, String> requestMap = new HashMap<>();
-
-    CoreContainer cc = ocmh.overseer.getZkController().getCoreContainer();
-    BackupRepository repository = cc.newBackupRepository(Optional.ofNullable(repo));
-
-    URI location = repository.createURI(message.getStr(CoreAdminParams.BACKUP_LOCATION));
-    URI backupPath = repository.resolve(location, backupName);
-    ZkStateReader zkStateReader = ocmh.zkStateReader;
-    BackupManager backupMgr = new BackupManager(repository, zkStateReader);
-
-    Properties properties = backupMgr.readBackupProperties(location, backupName);
-    String backupCollection = properties.getProperty(BackupManager.COLLECTION_NAME_PROP);
-    DocCollection backupCollectionState = backupMgr.readCollectionState(location, backupName, backupCollection);
-
-    // Get the Solr nodes to restore a collection.
-    final List<String> nodeList = Assign.getLiveOrLiveAndCreateNodeSetList(
-        zkStateReader.getClusterState().getLiveNodes(), message, RANDOM);
-
-    int numShards = backupCollectionState.getActiveSlices().size();
-    
-    int numNrtReplicas = getInt(message, NRT_REPLICAS, backupCollectionState.getNumNrtReplicas(), 0);
-    if (numNrtReplicas == 0) {
-      numNrtReplicas = getInt(message, REPLICATION_FACTOR, backupCollectionState.getReplicationFactor(), 0);
-    }
-    int numTlogReplicas = getInt(message, TLOG_REPLICAS, backupCollectionState.getNumTlogReplicas(), 0);
-    int numPullReplicas = getInt(message, PULL_REPLICAS, backupCollectionState.getNumPullReplicas(), 0);
-    int totalReplicasPerShard = numNrtReplicas + numTlogReplicas + numPullReplicas;
-    
-    int maxShardsPerNode = message.getInt(MAX_SHARDS_PER_NODE, backupCollectionState.getMaxShardsPerNode());
-    int availableNodeCount = nodeList.size();
-    if ((numShards * totalReplicasPerShard) > (availableNodeCount * maxShardsPerNode)) {
-      throw new SolrException(ErrorCode.BAD_REQUEST,
-          String.format(Locale.ROOT, "Solr cloud with available number of nodes:%d is insufficient for"
-              + " restoring a collection with %d shards, total replicas per shard %d and maxShardsPerNode %d."
-              + " Consider increasing maxShardsPerNode value OR number of available nodes.",
-              availableNodeCount, numShards, totalReplicasPerShard, maxShardsPerNode));
-    }
-
-    //Upload the configs
-    String configName = (String) properties.get(COLL_CONF);
-    String restoreConfigName = message.getStr(COLL_CONF, configName);
-    if (zkStateReader.getConfigManager().configExists(restoreConfigName)) {
-      log.info("Using existing config {}", restoreConfigName);
-      //TODO add overwrite option?
-    } else {
-      log.info("Uploading config {}", restoreConfigName);
-      backupMgr.uploadConfigDir(location, backupName, configName, restoreConfigName);
-    }
-
-    log.info("Starting restore into collection={} with backup_name={} at location={}", restoreCollectionName, backupName,
-        location);
-
-    //Create core-less collection
-    {
-      Map<String, Object> propMap = new HashMap<>();
-      propMap.put(Overseer.QUEUE_OPERATION, CREATE.toString());
-      propMap.put("fromApi", "true"); // mostly true.  Prevents autoCreated=true in the collection state.
-      if (properties.get(STATE_FORMAT) == null) {
-        propMap.put(STATE_FORMAT, "2");
-      }
-
-      // inherit settings from input API, defaulting to the backup's setting.  Ex: replicationFactor
-      for (String collProp : COLL_PROPS.keySet()) {
-        Object val = message.getProperties().getOrDefault(collProp, backupCollectionState.get(collProp));
-        if (val != null) {
-          propMap.put(collProp, val);
-        }
-      }
-
-      propMap.put(NAME, restoreCollectionName);
-      propMap.put(CREATE_NODE_SET, CREATE_NODE_SET_EMPTY); //no cores
-      propMap.put(COLL_CONF, restoreConfigName);
-
-      // router.*
-      @SuppressWarnings("unchecked")
-      Map<String, Object> routerProps = (Map<String, Object>) backupCollectionState.getProperties().get(DocCollection.DOC_ROUTER);
-      for (Map.Entry<String, Object> pair : routerProps.entrySet()) {
-        propMap.put(DocCollection.DOC_ROUTER + "." + pair.getKey(), pair.getValue());
-      }
-
-      Set<String> sliceNames = backupCollectionState.getActiveSlicesMap().keySet();
-      if (backupCollectionState.getRouter() instanceof ImplicitDocRouter) {
-        propMap.put(SHARDS_PROP, StrUtils.join(sliceNames, ','));
-      } else {
-        propMap.put(NUM_SLICES, sliceNames.size());
-        // ClusterStateMutator.createCollection detects that "slices" is in fact a slice structure instead of a
-        //   list of names, and if so uses this instead of building it.  We clear the replica list.
-        Collection<Slice> backupSlices = backupCollectionState.getActiveSlices();
-        Map<String, Slice> newSlices = new LinkedHashMap<>(backupSlices.size());
-        for (Slice backupSlice : backupSlices) {
-          newSlices.put(backupSlice.getName(),
-              new Slice(backupSlice.getName(), Collections.emptyMap(), backupSlice.getProperties()));
-        }
-        propMap.put(SHARDS_PROP, newSlices);
-      }
-
-      ocmh.commandMap.get(CREATE).call(zkStateReader.getClusterState(), new ZkNodeProps(propMap), new NamedList());
-      // note: when createCollection() returns, the collection exists (no race)
-    }
-
-    DocCollection restoreCollection = zkStateReader.getClusterState().getCollection(restoreCollectionName);
-
-    DistributedQueue inQueue = Overseer.getStateUpdateQueue(zkStateReader.getZkClient());
-
-    //Mark all shards in CONSTRUCTION STATE while we restore the data
-    {
-      //TODO might instead createCollection accept an initial state?  Is there a race?
-      Map<String, Object> propMap = new HashMap<>();
-      propMap.put(Overseer.QUEUE_OPERATION, OverseerAction.UPDATESHARDSTATE.toLower());
-      for (Slice shard : restoreCollection.getSlices()) {
-        propMap.put(shard.getName(), Slice.State.CONSTRUCTION.toString());
-      }
-      propMap.put(ZkStateReader.COLLECTION_PROP, restoreCollectionName);
-      inQueue.offer(Utils.toJSON(new ZkNodeProps(propMap)));
-    }
-
-    // TODO how do we leverage the RULE / SNITCH logic in createCollection?
-
-    ClusterState clusterState = zkStateReader.getClusterState();
-
-    List<String> sliceNames = new ArrayList<>();
-    restoreCollection.getSlices().forEach(x -> sliceNames.add(x.getName()));
-    PolicyHelper.SessionWrapper sessionWrapper = null;
-
-    try {
-      List<ReplicaPosition> replicaPositions = Assign.identifyNodes(
-          ocmh.cloudManager, clusterState,
-          nodeList, restoreCollectionName,
-          message, sliceNames,
-          numNrtReplicas, numTlogReplicas, numPullReplicas);
-      sessionWrapper = PolicyHelper.getLastSessionWrapper(true);
-      //Create one replica per shard and copy backed up data to it
-      for (Slice slice : restoreCollection.getSlices()) {
-        log.debug("Adding replica for shard={} collection={} ", slice.getName(), restoreCollection);
-        HashMap<String, Object> propMap = new HashMap<>();
-        propMap.put(Overseer.QUEUE_OPERATION, CREATESHARD);
-        propMap.put(COLLECTION_PROP, restoreCollectionName);
-        propMap.put(SHARD_ID_PROP, slice.getName());
-
-        if (numNrtReplicas >= 1) {
-          propMap.put(REPLICA_TYPE, Replica.Type.NRT.name());
-        } else if (numTlogReplicas >= 1) {
-          propMap.put(REPLICA_TYPE, Replica.Type.TLOG.name());
-        } else {
-          throw new SolrException(ErrorCode.BAD_REQUEST, "Unexpected number of replicas, replicationFactor, " +
-              Replica.Type.NRT + " or " + Replica.Type.TLOG + " must be greater than 0");
-        }
-
-        // Get the first node matching the shard to restore in
-        String node;
-        for (ReplicaPosition replicaPosition : replicaPositions) {
-          if (Objects.equals(replicaPosition.shard, slice.getName())) {
-            node = replicaPosition.node;
-            propMap.put(CoreAdminParams.NODE, node);
-            replicaPositions.remove(replicaPosition);
-            break;
-          }
-        }
-
-        // add async param
-        if (asyncId != null) {
-          propMap.put(ASYNC, asyncId);
-        }
-        ocmh.addPropertyParams(message, propMap);
-
-        ocmh.addReplica(clusterState, new ZkNodeProps(propMap), new NamedList(), null);
-      }
-
-      //refresh the location copy of collection state
-      restoreCollection = zkStateReader.getClusterState().getCollection(restoreCollectionName);
-
-      //Copy data from backed up index to each replica
-      for (Slice slice : restoreCollection.getSlices()) {
-        ModifiableSolrParams params = new ModifiableSolrParams();
-        params.set(CoreAdminParams.ACTION, CoreAdminParams.CoreAdminAction.RESTORECORE.toString());
-        params.set(NAME, "snapshot." + slice.getName());
-        params.set(CoreAdminParams.BACKUP_LOCATION, backupPath.toASCIIString());
-        params.set(CoreAdminParams.BACKUP_REPOSITORY, repo);
-
-        ocmh.sliceCmd(clusterState, params, null, slice, shardHandler, asyncId, requestMap);
-      }
-      ocmh.processResponses(new NamedList(), shardHandler, true, "Could not restore core", asyncId, requestMap);
-
-      //Mark all shards in ACTIVE STATE
-      {
-        HashMap<String, Object> propMap = new HashMap<>();
-        propMap.put(Overseer.QUEUE_OPERATION, OverseerAction.UPDATESHARDSTATE.toLower());
-        propMap.put(ZkStateReader.COLLECTION_PROP, restoreCollectionName);
-        for (Slice shard : restoreCollection.getSlices()) {
-          propMap.put(shard.getName(), Slice.State.ACTIVE.toString());
-        }
-        inQueue.offer(Utils.toJSON(new ZkNodeProps(propMap)));
-      }
-
-      //refresh the location copy of collection state
-      restoreCollection = zkStateReader.getClusterState().getCollection(restoreCollectionName);
-
-      if (totalReplicasPerShard > 1) {
-        log.info("Adding replicas to restored collection={}", restoreCollection);
-        for (Slice slice : restoreCollection.getSlices()) {
-
-          //Add the remaining replicas for each shard, considering it's type
-          int createdNrtReplicas = 0, createdTlogReplicas = 0, createdPullReplicas = 0;
-
-          // We already created either a NRT or an TLOG replica as leader
-          if (numNrtReplicas > 0) {
-            createdNrtReplicas++;
-          } else if (createdTlogReplicas > 0) {
-            createdTlogReplicas++;
-          }
-
-          for (int i = 1; i < totalReplicasPerShard; i++) {
-            Replica.Type typeToCreate;
-            if (createdNrtReplicas < numNrtReplicas) {
-              createdNrtReplicas++;
-              typeToCreate = Replica.Type.NRT;
-            } else if (createdTlogReplicas < numTlogReplicas) {
-              createdTlogReplicas++;
-              typeToCreate = Replica.Type.TLOG;
-            } else {
-              createdPullReplicas++;
-              typeToCreate = Replica.Type.PULL;
-              assert createdPullReplicas <= numPullReplicas: "Unexpected number of replicas";
-            }
-
-            log.debug("Adding replica for shard={} collection={} of type {} ", slice.getName(), restoreCollection, typeToCreate);
-            HashMap<String, Object> propMap = new HashMap<>();
-            propMap.put(COLLECTION_PROP, restoreCollectionName);
-            propMap.put(SHARD_ID_PROP, slice.getName());
-            propMap.put(REPLICA_TYPE, typeToCreate.name());
-
-            // Get the first node matching the shard to restore in
-            String node;
-            for (ReplicaPosition replicaPosition : replicaPositions) {
-              if (Objects.equals(replicaPosition.shard, slice.getName())) {
-                node = replicaPosition.node;
-                propMap.put(CoreAdminParams.NODE, node);
-                replicaPositions.remove(replicaPosition);
-                break;
-              }
-            }
-
-            // add async param
-            if (asyncId != null) {
-              propMap.put(ASYNC, asyncId);
-            }
-            ocmh.addPropertyParams(message, propMap);
-
-            ocmh.addReplica(zkStateReader.getClusterState(), new ZkNodeProps(propMap), results, null);
-          }
-        }
-      }
-
-      log.info("Completed restoring collection={} backupName={}", restoreCollection, backupName);
-    } finally {
-      if (sessionWrapper != null) sessionWrapper.release();
-    }
-  }
-
-  private int getInt(ZkNodeProps message, String propertyName, Integer count, int defaultValue) {
-    Integer value = message.getInt(propertyName, count);
-    return value!=null ? value:defaultValue;
-  }
-}