You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by va...@apache.org on 2018/01/16 19:05:14 UTC

[08/15] lucene-solr:master: SOLR-11817: Move Collections API classes to it's own package

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/a3c4f738/solr/core/src/java/org/apache/solr/cloud/api/collections/OverseerCollectionMessageHandler.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/OverseerCollectionMessageHandler.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/OverseerCollectionMessageHandler.java
new file mode 100644
index 0000000..9529ee1
--- /dev/null
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/OverseerCollectionMessageHandler.java
@@ -0,0 +1,1011 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.cloud.api.collections;
+
+import java.io.IOException;
+import java.lang.invoke.MethodHandles;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.TimeUnit;
+
+import com.google.common.collect.ImmutableMap;
+import org.apache.commons.lang.StringUtils;
+import org.apache.solr.client.solrj.SolrResponse;
+import org.apache.solr.client.solrj.SolrServerException;
+import org.apache.solr.client.solrj.cloud.DistributedQueue;
+import org.apache.solr.client.solrj.cloud.autoscaling.AlreadyExistsException;
+import org.apache.solr.client.solrj.cloud.autoscaling.BadVersionException;
+import org.apache.solr.client.solrj.cloud.autoscaling.DistribStateManager;
+import org.apache.solr.client.solrj.cloud.autoscaling.SolrCloudManager;
+import org.apache.solr.client.solrj.impl.HttpSolrClient;
+import org.apache.solr.client.solrj.impl.HttpSolrClient.RemoteSolrException;
+import org.apache.solr.client.solrj.request.AbstractUpdateRequest;
+import org.apache.solr.client.solrj.request.UpdateRequest;
+import org.apache.solr.client.solrj.response.UpdateResponse;
+import org.apache.solr.cloud.LockTree;
+import org.apache.solr.cloud.Overseer;
+import org.apache.solr.cloud.OverseerMessageHandler;
+import org.apache.solr.cloud.OverseerNodePrioritizer;
+import org.apache.solr.cloud.OverseerSolrResponse;
+import org.apache.solr.cloud.OverseerTaskProcessor;
+import org.apache.solr.cloud.Stats;
+import org.apache.solr.cloud.ZkController;
+import org.apache.solr.cloud.overseer.OverseerAction;
+import org.apache.solr.common.SolrCloseable;
+import org.apache.solr.common.SolrException;
+import org.apache.solr.common.SolrException.ErrorCode;
+import org.apache.solr.common.cloud.ClusterState;
+import org.apache.solr.common.cloud.DocCollection;
+import org.apache.solr.common.cloud.DocRouter;
+import org.apache.solr.common.cloud.Replica;
+import org.apache.solr.common.cloud.Slice;
+import org.apache.solr.common.cloud.SolrZkClient;
+import org.apache.solr.common.cloud.ZkConfigManager;
+import org.apache.solr.common.cloud.ZkCoreNodeProps;
+import org.apache.solr.common.cloud.ZkNodeProps;
+import org.apache.solr.common.cloud.ZkStateReader;
+import org.apache.solr.common.params.CollectionAdminParams;
+import org.apache.solr.common.params.CollectionParams.CollectionAction;
+import org.apache.solr.common.params.CoreAdminParams;
+import org.apache.solr.common.params.CoreAdminParams.CoreAdminAction;
+import org.apache.solr.common.params.ModifiableSolrParams;
+import org.apache.solr.common.util.ExecutorUtil;
+import org.apache.solr.common.util.NamedList;
+import org.apache.solr.common.util.SimpleOrderedMap;
+import org.apache.solr.common.util.StrUtils;
+import org.apache.solr.common.util.SuppressForbidden;
+import org.apache.solr.common.util.TimeSource;
+import org.apache.solr.common.util.Utils;
+import org.apache.solr.handler.component.ShardHandler;
+import org.apache.solr.handler.component.ShardHandlerFactory;
+import org.apache.solr.handler.component.ShardRequest;
+import org.apache.solr.handler.component.ShardResponse;
+import org.apache.solr.util.DefaultSolrThreadFactory;
+import org.apache.solr.util.RTimer;
+import org.apache.solr.util.TimeOut;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.solr.client.solrj.cloud.autoscaling.Policy.POLICY;
+import static org.apache.solr.common.cloud.DocCollection.SNITCH;
+import static org.apache.solr.common.cloud.ZkStateReader.BASE_URL_PROP;
+import static org.apache.solr.common.cloud.ZkStateReader.COLLECTION_PROP;
+import static org.apache.solr.common.cloud.ZkStateReader.CORE_NAME_PROP;
+import static org.apache.solr.common.cloud.ZkStateReader.CORE_NODE_NAME_PROP;
+import static org.apache.solr.common.cloud.ZkStateReader.ELECTION_NODE_PROP;
+import static org.apache.solr.common.cloud.ZkStateReader.PROPERTY_PROP;
+import static org.apache.solr.common.cloud.ZkStateReader.PROPERTY_VALUE_PROP;
+import static org.apache.solr.common.cloud.ZkStateReader.REJOIN_AT_HEAD_PROP;
+import static org.apache.solr.common.cloud.ZkStateReader.REPLICA_PROP;
+import static org.apache.solr.common.cloud.ZkStateReader.SHARD_ID_PROP;
+import static org.apache.solr.common.params.CollectionParams.CollectionAction.*;
+import static org.apache.solr.common.params.CommonAdminParams.ASYNC;
+import static org.apache.solr.common.params.CommonParams.NAME;
+import static org.apache.solr.common.util.Utils.makeMap;
+
+/**
+ * A {@link OverseerMessageHandler} that handles Collections API related
+ * overseer messages.
+ */
+public class OverseerCollectionMessageHandler implements OverseerMessageHandler, SolrCloseable {
+
+  public static final String NUM_SLICES = "numShards";
+
+  public static final boolean CREATE_NODE_SET_SHUFFLE_DEFAULT = true;
+  public static final String CREATE_NODE_SET_SHUFFLE = CollectionAdminParams.CREATE_NODE_SET_SHUFFLE_PARAM;
+  public static final String CREATE_NODE_SET_EMPTY = "EMPTY";
+  public static final String CREATE_NODE_SET = CollectionAdminParams.CREATE_NODE_SET_PARAM;
+
+  public static final String ROUTER = "router";
+
+  public static final String SHARDS_PROP = "shards";
+
+  public static final String REQUESTID = "requestid";
+
+  public static final String COLL_CONF = "collection.configName";
+
+  public static final String COLL_PROP_PREFIX = "property.";
+
+  public static final String ONLY_IF_DOWN = "onlyIfDown";
+
+  public static final String SHARD_UNIQUE = "shardUnique";
+
+  public static final String ONLY_ACTIVE_NODES = "onlyactivenodes";
+
+  static final String SKIP_CREATE_REPLICA_IN_CLUSTER_STATE = "skipCreateReplicaInClusterState";
+
+  public static final Map<String, Object> COLL_PROPS = Collections.unmodifiableMap(makeMap(
+      ROUTER, DocRouter.DEFAULT_NAME,
+      ZkStateReader.REPLICATION_FACTOR, "1",
+      ZkStateReader.NRT_REPLICAS, "1",
+      ZkStateReader.TLOG_REPLICAS, "0",
+      ZkStateReader.PULL_REPLICAS, "0",
+      ZkStateReader.MAX_SHARDS_PER_NODE, "1",
+      ZkStateReader.AUTO_ADD_REPLICAS, "false",
+      DocCollection.RULE, null,
+      POLICY, null,
+      SNITCH, null));
+
+  private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+  Overseer overseer;
+  ShardHandlerFactory shardHandlerFactory;
+  String adminPath;
+  ZkStateReader zkStateReader;
+  SolrCloudManager cloudManager;
+  String myId;
+  Stats stats;
+  TimeSource timeSource;
+
+  // Set that tracks collections that are currently being processed by a running task.
+  // This is used for handling mutual exclusion of the tasks.
+
+  final private LockTree lockTree = new LockTree();
+  ExecutorService tpe = new ExecutorUtil.MDCAwareThreadPoolExecutor(5, 10, 0L, TimeUnit.MILLISECONDS,
+      new SynchronousQueue<>(),
+      new DefaultSolrThreadFactory("OverseerCollectionMessageHandlerThreadFactory"));
+
+  protected static final Random RANDOM;
+  static {
+    // We try to make things reproducible in the context of our tests by initializing the random instance
+    // based on the current seed
+    String seed = System.getProperty("tests.seed");
+    if (seed == null) {
+      RANDOM = new Random();
+    } else {
+      RANDOM = new Random(seed.hashCode());
+    }
+  }
+
+  final Map<CollectionAction, Cmd> commandMap;
+
+  private volatile boolean isClosed;
+
+  public OverseerCollectionMessageHandler(ZkStateReader zkStateReader, String myId,
+                                        final ShardHandlerFactory shardHandlerFactory,
+                                        String adminPath,
+                                        Stats stats,
+                                        Overseer overseer,
+                                        OverseerNodePrioritizer overseerPrioritizer) {
+    this.zkStateReader = zkStateReader;
+    this.shardHandlerFactory = shardHandlerFactory;
+    this.adminPath = adminPath;
+    this.myId = myId;
+    this.stats = stats;
+    this.overseer = overseer;
+    this.cloudManager = overseer.getSolrCloudManager();
+    this.timeSource = cloudManager.getTimeSource();
+    this.isClosed = false;
+    commandMap = new ImmutableMap.Builder<CollectionAction, Cmd>()
+        .put(REPLACENODE, new ReplaceNodeCmd(this))
+        .put(DELETENODE, new DeleteNodeCmd(this))
+        .put(BACKUP, new BackupCmd(this))
+        .put(RESTORE, new RestoreCmd(this))
+        .put(CREATESNAPSHOT, new CreateSnapshotCmd(this))
+        .put(DELETESNAPSHOT, new DeleteSnapshotCmd(this))
+        .put(SPLITSHARD, new SplitShardCmd(this))
+        .put(ADDROLE, new OverseerRoleCmd(this, ADDROLE, overseerPrioritizer))
+        .put(REMOVEROLE, new OverseerRoleCmd(this, REMOVEROLE, overseerPrioritizer))
+        .put(MOCK_COLL_TASK, this::mockOperation)
+        .put(MOCK_SHARD_TASK, this::mockOperation)
+        .put(MOCK_REPLICA_TASK, this::mockOperation)
+        .put(MIGRATESTATEFORMAT, this::migrateStateFormat)
+        .put(CREATESHARD, new CreateShardCmd(this))
+        .put(MIGRATE, new MigrateCmd(this))
+        .put(CREATE, new CreateCollectionCmd(this))
+        .put(MODIFYCOLLECTION, this::modifyCollection)
+        .put(ADDREPLICAPROP, this::processReplicaAddPropertyCommand)
+        .put(DELETEREPLICAPROP, this::processReplicaDeletePropertyCommand)
+        .put(BALANCESHARDUNIQUE, this::balanceProperty)
+        .put(REBALANCELEADERS, this::processRebalanceLeaders)
+        .put(RELOAD, this::reloadCollection)
+        .put(DELETE, new DeleteCollectionCmd(this))
+        .put(CREATEALIAS, new CreateAliasCmd(this))
+        .put(DELETEALIAS, new DeleteAliasCmd(this))
+        .put(ROUTEDALIAS_CREATECOLL, new RoutedAliasCreateCollectionCmd(this))
+        .put(OVERSEERSTATUS, new OverseerStatusCmd(this))
+        .put(DELETESHARD, new DeleteShardCmd(this))
+        .put(DELETEREPLICA, new DeleteReplicaCmd(this))
+        .put(ADDREPLICA, new AddReplicaCmd(this))
+        .put(MOVEREPLICA, new MoveReplicaCmd(this))
+        .put(UTILIZENODE, new UtilizeNodeCmd(this))
+        .build()
+    ;
+  }
+
+  @Override
+  @SuppressWarnings("unchecked")
+  public SolrResponse processMessage(ZkNodeProps message, String operation) {
+    log.debug("OverseerCollectionMessageHandler.processMessage : {} , {}", operation, message);
+
+    NamedList results = new NamedList();
+    try {
+      CollectionAction action = getCollectionAction(operation);
+      Cmd command = commandMap.get(action);
+      if (command != null) {
+        command.call(cloudManager.getClusterStateProvider().getClusterState(), message, results);
+      } else {
+        throw new SolrException(ErrorCode.BAD_REQUEST, "Unknown operation:"
+            + operation);
+      }
+    } catch (Exception e) {
+      String collName = message.getStr("collection");
+      if (collName == null) collName = message.getStr(NAME);
+
+      if (collName == null) {
+        SolrException.log(log, "Operation " + operation + " failed", e);
+      } else  {
+        SolrException.log(log, "Collection: " + collName + " operation: " + operation
+            + " failed", e);
+      }
+
+      results.add("Operation " + operation + " caused exception:", e);
+      SimpleOrderedMap nl = new SimpleOrderedMap();
+      nl.add("msg", e.getMessage());
+      nl.add("rspCode", e instanceof SolrException ? ((SolrException)e).code() : -1);
+      results.add("exception", nl);
+    }
+    return new OverseerSolrResponse(results);
+  }
+
+  @SuppressForbidden(reason = "Needs currentTimeMillis for mock requests")
+  private void mockOperation(ClusterState state, ZkNodeProps message, NamedList results) throws InterruptedException {
+    //only for test purposes
+    Thread.sleep(message.getInt("sleep", 1));
+    log.info("MOCK_TASK_EXECUTED time {} data {}", System.currentTimeMillis(), Utils.toJSONString(message));
+    results.add("MOCK_FINISHED", System.currentTimeMillis());
+  }
+
+  private CollectionAction getCollectionAction(String operation) {
+    CollectionAction action = CollectionAction.get(operation);
+    if (action == null) {
+      throw new SolrException(ErrorCode.BAD_REQUEST, "Unknown operation:" + operation);
+    }
+    return action;
+  }
+
+  private void reloadCollection(ClusterState clusterState, ZkNodeProps message, NamedList results) {
+    ModifiableSolrParams params = new ModifiableSolrParams();
+    params.set(CoreAdminParams.ACTION, CoreAdminAction.RELOAD.toString());
+
+    String asyncId = message.getStr(ASYNC);
+    Map<String, String> requestMap = null;
+    if (asyncId != null) {
+      requestMap = new HashMap<>();
+    }
+    collectionCmd(message, params, results, Replica.State.ACTIVE, asyncId, requestMap);
+  }
+
+  @SuppressWarnings("unchecked")
+  private void processRebalanceLeaders(ClusterState clusterState, ZkNodeProps message, NamedList results)
+      throws Exception {
+    checkRequired(message, COLLECTION_PROP, SHARD_ID_PROP, CORE_NAME_PROP, ELECTION_NODE_PROP,
+        CORE_NODE_NAME_PROP, BASE_URL_PROP, REJOIN_AT_HEAD_PROP);
+
+    ModifiableSolrParams params = new ModifiableSolrParams();
+    params.set(COLLECTION_PROP, message.getStr(COLLECTION_PROP));
+    params.set(SHARD_ID_PROP, message.getStr(SHARD_ID_PROP));
+    params.set(REJOIN_AT_HEAD_PROP, message.getStr(REJOIN_AT_HEAD_PROP));
+    params.set(CoreAdminParams.ACTION, CoreAdminAction.REJOINLEADERELECTION.toString());
+    params.set(CORE_NAME_PROP, message.getStr(CORE_NAME_PROP));
+    params.set(CORE_NODE_NAME_PROP, message.getStr(CORE_NODE_NAME_PROP));
+    params.set(ELECTION_NODE_PROP, message.getStr(ELECTION_NODE_PROP));
+    params.set(BASE_URL_PROP, message.getStr(BASE_URL_PROP));
+
+    String baseUrl = message.getStr(BASE_URL_PROP);
+    ShardRequest sreq = new ShardRequest();
+    sreq.nodeName = message.getStr(ZkStateReader.CORE_NAME_PROP);
+    // yes, they must use same admin handler path everywhere...
+    params.set("qt", adminPath);
+    sreq.purpose = ShardRequest.PURPOSE_PRIVATE;
+    sreq.shards = new String[] {baseUrl};
+    sreq.actualShards = sreq.shards;
+    sreq.params = params;
+    ShardHandler shardHandler = shardHandlerFactory.getShardHandler();
+    shardHandler.submit(sreq, baseUrl, sreq.params);
+  }
+
+  @SuppressWarnings("unchecked")
+  private void processReplicaAddPropertyCommand(ClusterState clusterState, ZkNodeProps message, NamedList results)
+      throws Exception {
+    checkRequired(message, COLLECTION_PROP, SHARD_ID_PROP, REPLICA_PROP, PROPERTY_PROP, PROPERTY_VALUE_PROP);
+    SolrZkClient zkClient = zkStateReader.getZkClient();
+    DistributedQueue inQueue = Overseer.getStateUpdateQueue(zkClient);
+    Map<String, Object> propMap = new HashMap<>();
+    propMap.put(Overseer.QUEUE_OPERATION, ADDREPLICAPROP.toLower());
+    propMap.putAll(message.getProperties());
+    ZkNodeProps m = new ZkNodeProps(propMap);
+    inQueue.offer(Utils.toJSON(m));
+  }
+
+  private void processReplicaDeletePropertyCommand(ClusterState clusterState, ZkNodeProps message, NamedList results)
+      throws Exception {
+    checkRequired(message, COLLECTION_PROP, SHARD_ID_PROP, REPLICA_PROP, PROPERTY_PROP);
+    SolrZkClient zkClient = zkStateReader.getZkClient();
+    DistributedQueue inQueue = Overseer.getStateUpdateQueue(zkClient);
+    Map<String, Object> propMap = new HashMap<>();
+    propMap.put(Overseer.QUEUE_OPERATION, DELETEREPLICAPROP.toLower());
+    propMap.putAll(message.getProperties());
+    ZkNodeProps m = new ZkNodeProps(propMap);
+    inQueue.offer(Utils.toJSON(m));
+  }
+
+  private void balanceProperty(ClusterState clusterState, ZkNodeProps message, NamedList results) throws Exception {
+    if (StringUtils.isBlank(message.getStr(COLLECTION_PROP)) || StringUtils.isBlank(message.getStr(PROPERTY_PROP))) {
+      throw new SolrException(ErrorCode.BAD_REQUEST,
+          "The '" + COLLECTION_PROP + "' and '" + PROPERTY_PROP +
+              "' parameters are required for the BALANCESHARDUNIQUE operation, no action taken");
+    }
+    SolrZkClient zkClient = zkStateReader.getZkClient();
+    DistributedQueue inQueue = Overseer.getStateUpdateQueue(zkClient);
+    Map<String, Object> propMap = new HashMap<>();
+    propMap.put(Overseer.QUEUE_OPERATION, BALANCESHARDUNIQUE.toLower());
+    propMap.putAll(message.getProperties());
+    inQueue.offer(Utils.toJSON(new ZkNodeProps(propMap)));
+  }
+
+  /**
+   * Walks the tree of collection status to verify that any replicas not reporting a "down" status is
+   * on a live node, if any replicas reporting their status as "active" but the node is not live is
+   * marked as "down"; used by CLUSTERSTATUS.
+   * @param liveNodes List of currently live node names.
+   * @param collectionProps Map of collection status information pulled directly from ZooKeeper.
+   */
+
+  @SuppressWarnings("unchecked")
+  protected void crossCheckReplicaStateWithLiveNodes(List<String> liveNodes, NamedList<Object> collectionProps) {
+    Iterator<Map.Entry<String,Object>> colls = collectionProps.iterator();
+    while (colls.hasNext()) {
+      Map.Entry<String,Object> next = colls.next();
+      Map<String,Object> collMap = (Map<String,Object>)next.getValue();
+      Map<String,Object> shards = (Map<String,Object>)collMap.get("shards");
+      for (Object nextShard : shards.values()) {
+        Map<String,Object> shardMap = (Map<String,Object>)nextShard;
+        Map<String,Object> replicas = (Map<String,Object>)shardMap.get("replicas");
+        for (Object nextReplica : replicas.values()) {
+          Map<String,Object> replicaMap = (Map<String,Object>)nextReplica;
+          if (Replica.State.getState((String) replicaMap.get(ZkStateReader.STATE_PROP)) != Replica.State.DOWN) {
+            // not down, so verify the node is live
+            String node_name = (String)replicaMap.get(ZkStateReader.NODE_NAME_PROP);
+            if (!liveNodes.contains(node_name)) {
+              // node is not live, so this replica is actually down
+              replicaMap.put(ZkStateReader.STATE_PROP, Replica.State.DOWN.toString());
+            }
+          }
+        }
+      }
+    }
+  }
+
+  /**
+   * Get collection status from cluster state.
+   * Can return collection status by given shard name.
+   *
+   *
+   * @param collection collection map parsed from JSON-serialized {@link ClusterState}
+   * @param name  collection name
+   * @param requestedShards a set of shards to be returned in the status.
+   *                        An empty or null values indicates <b>all</b> shards.
+   * @return map of collection properties
+   */
+  @SuppressWarnings("unchecked")
+  private Map<String, Object> getCollectionStatus(Map<String, Object> collection, String name, Set<String> requestedShards) {
+    if (collection == null)  {
+      throw new SolrException(ErrorCode.BAD_REQUEST, "Collection: " + name + " not found");
+    }
+    if (requestedShards == null || requestedShards.isEmpty()) {
+      return collection;
+    } else {
+      Map<String, Object> shards = (Map<String, Object>) collection.get("shards");
+      Map<String, Object>  selected = new HashMap<>();
+      for (String selectedShard : requestedShards) {
+        if (!shards.containsKey(selectedShard)) {
+          throw new SolrException(ErrorCode.BAD_REQUEST, "Collection: " + name + " shard: " + selectedShard + " not found");
+        }
+        selected.put(selectedShard, shards.get(selectedShard));
+        collection.put("shards", selected);
+      }
+      return collection;
+    }
+  }
+
+  @SuppressWarnings("unchecked")
+  void deleteReplica(ClusterState clusterState, ZkNodeProps message, NamedList results, Runnable onComplete)
+      throws Exception {
+    ((DeleteReplicaCmd) commandMap.get(DELETEREPLICA)).deleteReplica(clusterState, message, results, onComplete);
+
+  }
+
+  boolean waitForCoreNodeGone(String collectionName, String shard, String replicaName, int timeoutms) throws InterruptedException {
+    TimeOut timeout = new TimeOut(timeoutms, TimeUnit.MILLISECONDS, timeSource);
+    while (! timeout.hasTimedOut()) {
+      timeout.sleep(100);
+      DocCollection docCollection = zkStateReader.getClusterState().getCollection(collectionName);
+      if (docCollection == null) { // someone already deleted the collection
+        return true;
+      }
+      Slice slice = docCollection.getSlice(shard);
+      if(slice == null || slice.getReplica(replicaName) == null) {
+        return true;
+      }
+    }
+    // replica still exists after the timeout
+    return false;
+  }
+
+  void deleteCoreNode(String collectionName, String replicaName, Replica replica, String core) throws Exception {
+    ZkNodeProps m = new ZkNodeProps(
+        Overseer.QUEUE_OPERATION, OverseerAction.DELETECORE.toLower(),
+        ZkStateReader.CORE_NAME_PROP, core,
+        ZkStateReader.NODE_NAME_PROP, replica.getStr(ZkStateReader.NODE_NAME_PROP),
+        ZkStateReader.COLLECTION_PROP, collectionName,
+        ZkStateReader.CORE_NODE_NAME_PROP, replicaName,
+        ZkStateReader.BASE_URL_PROP, replica.getStr(ZkStateReader.BASE_URL_PROP));
+    Overseer.getStateUpdateQueue(zkStateReader.getZkClient()).offer(Utils.toJSON(m));
+  }
+
+  void checkRequired(ZkNodeProps message, String... props) {
+    for (String prop : props) {
+      if(message.get(prop) == null){
+        throw new SolrException(ErrorCode.BAD_REQUEST, StrUtils.join(Arrays.asList(props),',') +" are required params" );
+      }
+    }
+
+  }
+
+  //TODO should we not remove in the next release ?
+  private void migrateStateFormat(ClusterState state, ZkNodeProps message, NamedList results) throws Exception {
+    final String collectionName = message.getStr(COLLECTION_PROP);
+
+    boolean firstLoop = true;
+    // wait for a while until the state format changes
+    TimeOut timeout = new TimeOut(30, TimeUnit.SECONDS, timeSource);
+    while (! timeout.hasTimedOut()) {
+      DocCollection collection = zkStateReader.getClusterState().getCollection(collectionName);
+      if (collection == null) {
+        throw new SolrException(ErrorCode.BAD_REQUEST, "Collection: " + collectionName + " not found");
+      }
+      if (collection.getStateFormat() == 2) {
+        // Done.
+        results.add("success", new SimpleOrderedMap<>());
+        return;
+      }
+
+      if (firstLoop) {
+        // Actually queue the migration command.
+        firstLoop = false;
+        ZkNodeProps m = new ZkNodeProps(Overseer.QUEUE_OPERATION, MIGRATESTATEFORMAT.toLower(), COLLECTION_PROP, collectionName);
+        Overseer.getStateUpdateQueue(zkStateReader.getZkClient()).offer(Utils.toJSON(m));
+      }
+      timeout.sleep(100);
+    }
+    throw new SolrException(ErrorCode.SERVER_ERROR, "Could not migrate state format for collection: " + collectionName);
+  }
+
+  void commit(NamedList results, String slice, Replica parentShardLeader) {
+    log.debug("Calling soft commit to make sub shard updates visible");
+    String coreUrl = new ZkCoreNodeProps(parentShardLeader).getCoreUrl();
+    // HttpShardHandler is hard coded to send a QueryRequest hence we go direct
+    // and we force open a searcher so that we have documents to show upon switching states
+    UpdateResponse updateResponse = null;
+    try {
+      updateResponse = softCommit(coreUrl);
+      processResponse(results, null, coreUrl, updateResponse, slice, Collections.emptySet());
+    } catch (Exception e) {
+      processResponse(results, e, coreUrl, updateResponse, slice, Collections.emptySet());
+      throw new SolrException(ErrorCode.SERVER_ERROR, "Unable to call distrib softCommit on: " + coreUrl, e);
+    }
+  }
+
+
+  static UpdateResponse softCommit(String url) throws SolrServerException, IOException {
+
+    try (HttpSolrClient client = new HttpSolrClient.Builder(url)
+        .withConnectionTimeout(30000)
+        .withSocketTimeout(120000)
+        .build()) {
+      UpdateRequest ureq = new UpdateRequest();
+      ureq.setParams(new ModifiableSolrParams());
+      ureq.setAction(AbstractUpdateRequest.ACTION.COMMIT, false, true, true);
+      return ureq.process(client);
+    }
+  }
+
+  String waitForCoreNodeName(String collectionName, String msgNodeName, String msgCore) {
+    int retryCount = 320;
+    while (retryCount-- > 0) {
+      final DocCollection docCollection = zkStateReader.getClusterState().getCollectionOrNull(collectionName);
+      if (docCollection != null && docCollection.getSlicesMap() != null) {
+        Map<String,Slice> slicesMap = docCollection.getSlicesMap();
+        for (Slice slice : slicesMap.values()) {
+          for (Replica replica : slice.getReplicas()) {
+            // TODO: for really large clusters, we could 'index' on this
+
+            String nodeName = replica.getStr(ZkStateReader.NODE_NAME_PROP);
+            String core = replica.getStr(ZkStateReader.CORE_NAME_PROP);
+
+            if (nodeName.equals(msgNodeName) && core.equals(msgCore)) {
+              return replica.getName();
+            }
+          }
+        }
+      }
+      try {
+        Thread.sleep(1000);
+      } catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
+      }
+    }
+    throw new SolrException(ErrorCode.SERVER_ERROR, "Could not find coreNodeName");
+  }
+
+  void waitForNewShard(String collectionName, String sliceName) throws KeeperException, InterruptedException {
+    log.debug("Waiting for slice {} of collection {} to be available", sliceName, collectionName);
+    RTimer timer = new RTimer();
+    int retryCount = 320;
+    while (retryCount-- > 0) {
+      DocCollection collection = zkStateReader.getClusterState().getCollection(collectionName);
+      if (collection == null) {
+        throw new SolrException(ErrorCode.SERVER_ERROR,
+            "Unable to find collection: " + collectionName + " in clusterstate");
+      }
+      Slice slice = collection.getSlice(sliceName);
+      if (slice != null) {
+        log.debug("Waited for {}ms for slice {} of collection {} to be available",
+            timer.getTime(), sliceName, collectionName);
+        return;
+      }
+      Thread.sleep(1000);
+    }
+    throw new SolrException(ErrorCode.SERVER_ERROR,
+        "Could not find new slice " + sliceName + " in collection " + collectionName
+            + " even after waiting for " + timer.getTime() + "ms"
+    );
+  }
+
+  DocRouter.Range intersect(DocRouter.Range a, DocRouter.Range b) {
+    if (a == null || b == null || !a.overlaps(b)) {
+      return null;
+    } else if (a.isSubsetOf(b))
+      return a;
+    else if (b.isSubsetOf(a))
+      return b;
+    else if (b.includes(a.max)) {
+      return new DocRouter.Range(b.min, a.max);
+    } else  {
+      return new DocRouter.Range(a.min, b.max);
+    }
+  }
+
+  void sendShardRequest(String nodeName, ModifiableSolrParams params,
+                        ShardHandler shardHandler, String asyncId,
+                        Map<String, String> requestMap) {
+    sendShardRequest(nodeName, params, shardHandler, asyncId, requestMap, adminPath, zkStateReader);
+
+  }
+
+  public static void sendShardRequest(String nodeName, ModifiableSolrParams params, ShardHandler shardHandler,
+                                      String asyncId, Map<String, String> requestMap, String adminPath,
+                                      ZkStateReader zkStateReader) {
+    if (asyncId != null) {
+      String coreAdminAsyncId = asyncId + Math.abs(System.nanoTime());
+      params.set(ASYNC, coreAdminAsyncId);
+      requestMap.put(nodeName, coreAdminAsyncId);
+    }
+
+    ShardRequest sreq = new ShardRequest();
+    params.set("qt", adminPath);
+    sreq.purpose = 1;
+    String replica = zkStateReader.getBaseUrlForNodeName(nodeName);
+    sreq.shards = new String[]{replica};
+    sreq.actualShards = sreq.shards;
+    sreq.nodeName = nodeName;
+    sreq.params = params;
+
+    shardHandler.submit(sreq, replica, sreq.params);
+  }
+
+  void addPropertyParams(ZkNodeProps message, ModifiableSolrParams params) {
+    // Now add the property.key=value pairs
+    for (String key : message.keySet()) {
+      if (key.startsWith(COLL_PROP_PREFIX)) {
+        params.set(key, message.getStr(key));
+      }
+    }
+  }
+
+  void addPropertyParams(ZkNodeProps message, Map<String, Object> map) {
+    // Now add the property.key=value pairs
+    for (String key : message.keySet()) {
+      if (key.startsWith(COLL_PROP_PREFIX)) {
+        map.put(key, message.getStr(key));
+      }
+    }
+  }
+
+
+  private void modifyCollection(ClusterState clusterState, ZkNodeProps message, NamedList results)
+      throws Exception {
+    
+    final String collectionName = message.getStr(ZkStateReader.COLLECTION_PROP);
+    //the rest of the processing is based on writing cluster state properties
+    //remove the property here to avoid any errors down the pipeline due to this property appearing
+    String configName = (String) message.getProperties().remove(COLL_CONF);
+    
+    if(configName != null) {
+      validateConfigOrThrowSolrException(configName);
+      
+      boolean isLegacyCloud =  Overseer.isLegacy(zkStateReader);
+      createConfNode(cloudManager.getDistribStateManager(), configName, collectionName, isLegacyCloud);
+      reloadCollection(null, new ZkNodeProps(NAME, collectionName), results);
+    }
+    
+    overseer.getStateUpdateQueue(zkStateReader.getZkClient()).offer(Utils.toJSON(message));
+
+    TimeOut timeout = new TimeOut(30, TimeUnit.SECONDS, timeSource);
+    boolean areChangesVisible = true;
+    while (!timeout.hasTimedOut()) {
+      DocCollection collection = cloudManager.getClusterStateProvider().getClusterState().getCollection(collectionName);
+      areChangesVisible = true;
+      for (Map.Entry<String,Object> updateEntry : message.getProperties().entrySet()) {
+        String updateKey = updateEntry.getKey();
+        if (!updateKey.equals(ZkStateReader.COLLECTION_PROP)
+            && !updateKey.equals(Overseer.QUEUE_OPERATION)
+            && !collection.get(updateKey).equals(updateEntry.getValue())){
+          areChangesVisible = false;
+          break;
+        }
+      }
+      if (areChangesVisible) break;
+      timeout.sleep(100);
+    }
+
+    if (!areChangesVisible)
+      throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Could not modify collection " + message);
+  }
+
+  void cleanupCollection(String collectionName, NamedList results) throws Exception {
+    log.error("Cleaning up collection [" + collectionName + "]." );
+    Map<String, Object> props = makeMap(
+        Overseer.QUEUE_OPERATION, DELETE.toLower(),
+        NAME, collectionName);
+    commandMap.get(DELETE).call(zkStateReader.getClusterState(), new ZkNodeProps(props), results);
+  }
+
+  Map<String, Replica> waitToSeeReplicasInState(String collectionName, Collection<String> coreNames) throws InterruptedException {
+    Map<String, Replica> result = new HashMap<>();
+    TimeOut timeout = new TimeOut(30, TimeUnit.SECONDS, timeSource);
+    while (true) {
+      DocCollection coll = zkStateReader.getClusterState().getCollection(collectionName);
+      for (String coreName : coreNames) {
+        if (result.containsKey(coreName)) continue;
+        for (Slice slice : coll.getSlices()) {
+          for (Replica replica : slice.getReplicas()) {
+            if (coreName.equals(replica.getStr(ZkStateReader.CORE_NAME_PROP))) {
+              result.put(coreName, replica);
+              break;
+            }
+          }
+        }
+      }
+      
+      if (result.size() == coreNames.size()) {
+        return result;
+      } else {
+        log.debug("Expecting {} cores but found {}", coreNames, result);
+      }
+      if (timeout.hasTimedOut()) {
+        throw new SolrException(ErrorCode.SERVER_ERROR, "Timed out waiting to see all replicas: " + coreNames + " in cluster state. Last state: " + coll);
+      }
+      
+      Thread.sleep(100);
+    }
+  }
+
+  ZkNodeProps addReplica(ClusterState clusterState, ZkNodeProps message, NamedList results, Runnable onComplete)
+      throws Exception {
+
+    return ((AddReplicaCmd) commandMap.get(ADDREPLICA)).addReplica(clusterState, message, results, onComplete);
+  }
+
+  void processResponses(NamedList results, ShardHandler shardHandler, boolean abortOnError, String msgOnError,
+                        String asyncId, Map<String, String> requestMap) {
+    processResponses(results, shardHandler, abortOnError, msgOnError, asyncId, requestMap, Collections.emptySet());
+  }
+
+  void processResponses(NamedList results, ShardHandler shardHandler, boolean abortOnError, String msgOnError,
+                                String asyncId, Map<String, String> requestMap, Set<String> okayExceptions) {
+    //Processes all shard responses
+    ShardResponse srsp;
+    do {
+      srsp = shardHandler.takeCompletedOrError();
+      if (srsp != null) {
+        processResponse(results, srsp, okayExceptions);
+        Throwable exception = srsp.getException();
+        if (abortOnError && exception != null)  {
+          // drain pending requests
+          while (srsp != null)  {
+            srsp = shardHandler.takeCompletedOrError();
+          }
+          throw new SolrException(ErrorCode.SERVER_ERROR, msgOnError, exception);
+        }
+      }
+    } while (srsp != null);
+
+    //If request is async wait for the core admin to complete before returning
+    if (asyncId != null) {
+      waitForAsyncCallsToComplete(requestMap, results);
+      requestMap.clear();
+    }
+  }
+
+
+  void validateConfigOrThrowSolrException(String configName) throws IOException, KeeperException, InterruptedException {
+    boolean isValid = cloudManager.getDistribStateManager().hasData(ZkConfigManager.CONFIGS_ZKNODE + "/" + configName);
+    if(!isValid) {
+      throw new SolrException(ErrorCode.BAD_REQUEST, "Can not find the specified config set: " + configName);
+    }
+  }
+
+  /**
+   * This doesn't validate the config (path) itself and is just responsible for creating the confNode.
+   * That check should be done before the config node is created.
+   */
+  public static void createConfNode(DistribStateManager stateManager, String configName, String coll, boolean isLegacyCloud) throws IOException, AlreadyExistsException, BadVersionException, KeeperException, InterruptedException {
+    
+    if (configName != null) {
+      String collDir = ZkStateReader.COLLECTIONS_ZKNODE + "/" + coll;
+      log.debug("creating collections conf node {} ", collDir);
+      byte[] data = Utils.toJSON(makeMap(ZkController.CONFIGNAME_PROP, configName));
+      if (stateManager.hasData(collDir)) {
+        stateManager.setData(collDir, data, -1);
+      } else {
+        stateManager.makePath(collDir, data, CreateMode.PERSISTENT, false);
+      }
+    } else {
+      if(isLegacyCloud){
+        log.warn("Could not obtain config name");
+      } else {
+        throw new SolrException(ErrorCode.BAD_REQUEST,"Unable to get config name");
+      }
+    }
+  }
+  
+  private void collectionCmd(ZkNodeProps message, ModifiableSolrParams params,
+                             NamedList results, Replica.State stateMatcher, String asyncId, Map<String, String> requestMap) {
+    collectionCmd( message, params, results, stateMatcher, asyncId, requestMap, Collections.emptySet());
+  }
+
+
+  void collectionCmd(ZkNodeProps message, ModifiableSolrParams params,
+                     NamedList results, Replica.State stateMatcher, String asyncId, Map<String, String> requestMap, Set<String> okayExceptions) {
+    log.info("Executing Collection Cmd : " + params);
+    String collectionName = message.getStr(NAME);
+    ShardHandler shardHandler = shardHandlerFactory.getShardHandler();
+
+    ClusterState clusterState = zkStateReader.getClusterState();
+    DocCollection coll = clusterState.getCollection(collectionName);
+    
+    for (Slice slice : coll.getSlices()) {
+      sliceCmd(clusterState, params, stateMatcher, slice, shardHandler, asyncId, requestMap);
+    }
+
+    processResponses(results, shardHandler, false, null, asyncId, requestMap, okayExceptions);
+
+  }
+
+  void sliceCmd(ClusterState clusterState, ModifiableSolrParams params, Replica.State stateMatcher,
+                Slice slice, ShardHandler shardHandler, String asyncId, Map<String, String> requestMap) {
+
+    for (Replica replica : slice.getReplicas()) {
+      if (clusterState.liveNodesContain(replica.getStr(ZkStateReader.NODE_NAME_PROP))
+          && (stateMatcher == null || Replica.State.getState(replica.getStr(ZkStateReader.STATE_PROP)) == stateMatcher)) {
+
+        // For thread safety, only simple clone the ModifiableSolrParams
+        ModifiableSolrParams cloneParams = new ModifiableSolrParams();
+        cloneParams.add(params);
+        cloneParams.set(CoreAdminParams.CORE, replica.getStr(ZkStateReader.CORE_NAME_PROP));
+
+        sendShardRequest(replica.getStr(ZkStateReader.NODE_NAME_PROP), cloneParams, shardHandler, asyncId, requestMap);
+      }
+    }
+  }
+  
+  private void processResponse(NamedList results, ShardResponse srsp, Set<String> okayExceptions) {
+    Throwable e = srsp.getException();
+    String nodeName = srsp.getNodeName();
+    SolrResponse solrResponse = srsp.getSolrResponse();
+    String shard = srsp.getShard();
+
+    processResponse(results, e, nodeName, solrResponse, shard, okayExceptions);
+  }
+
+  @SuppressWarnings("unchecked")
+  private void processResponse(NamedList results, Throwable e, String nodeName, SolrResponse solrResponse, String shard, Set<String> okayExceptions) {
+    String rootThrowable = null;
+    if (e instanceof RemoteSolrException) {
+      rootThrowable = ((RemoteSolrException) e).getRootThrowable();
+    }
+
+    if (e != null && (rootThrowable == null || !okayExceptions.contains(rootThrowable))) {
+      log.error("Error from shard: " + shard, e);
+
+      SimpleOrderedMap failure = (SimpleOrderedMap) results.get("failure");
+      if (failure == null) {
+        failure = new SimpleOrderedMap();
+        results.add("failure", failure);
+      }
+
+      failure.add(nodeName, e.getClass().getName() + ":" + e.getMessage());
+
+    } else {
+
+      SimpleOrderedMap success = (SimpleOrderedMap) results.get("success");
+      if (success == null) {
+        success = new SimpleOrderedMap();
+        results.add("success", success);
+      }
+
+      success.add(nodeName, solrResponse.getResponse());
+    }
+  }
+
+  @SuppressWarnings("unchecked")
+  private void waitForAsyncCallsToComplete(Map<String, String> requestMap, NamedList results) {
+    for (String k:requestMap.keySet()) {
+      log.debug("I am Waiting for :{}/{}", k, requestMap.get(k));
+      results.add(requestMap.get(k), waitForCoreAdminAsyncCallToComplete(k, requestMap.get(k)));
+    }
+  }
+
+  private NamedList waitForCoreAdminAsyncCallToComplete(String nodeName, String requestId) {
+    ShardHandler shardHandler = shardHandlerFactory.getShardHandler();
+    ModifiableSolrParams params = new ModifiableSolrParams();
+    params.set(CoreAdminParams.ACTION, CoreAdminAction.REQUESTSTATUS.toString());
+    params.set(CoreAdminParams.REQUESTID, requestId);
+    int counter = 0;
+    ShardRequest sreq;
+    do {
+      sreq = new ShardRequest();
+      params.set("qt", adminPath);
+      sreq.purpose = 1;
+      String replica = zkStateReader.getBaseUrlForNodeName(nodeName);
+      sreq.shards = new String[] {replica};
+      sreq.actualShards = sreq.shards;
+      sreq.params = params;
+
+      shardHandler.submit(sreq, replica, sreq.params);
+
+      ShardResponse srsp;
+      do {
+        srsp = shardHandler.takeCompletedOrError();
+        if (srsp != null) {
+          NamedList results = new NamedList();
+          processResponse(results, srsp, Collections.emptySet());
+          if (srsp.getSolrResponse().getResponse() == null) {
+            NamedList response = new NamedList();
+            response.add("STATUS", "failed");
+            return response;
+          }
+          
+          String r = (String) srsp.getSolrResponse().getResponse().get("STATUS");
+          if (r.equals("running")) {
+            log.debug("The task is still RUNNING, continuing to wait.");
+            try {
+              Thread.sleep(1000);
+            } catch (InterruptedException e) {
+              Thread.currentThread().interrupt();
+            }
+            continue;
+
+          } else if (r.equals("completed")) {
+            log.debug("The task is COMPLETED, returning");
+            return srsp.getSolrResponse().getResponse();
+          } else if (r.equals("failed")) {
+            // TODO: Improve this. Get more information.
+            log.debug("The task is FAILED, returning");
+            return srsp.getSolrResponse().getResponse();
+          } else if (r.equals("notfound")) {
+            log.debug("The task is notfound, retry");
+            if (counter++ < 5) {
+              try {
+                Thread.sleep(1000);
+              } catch (InterruptedException e) {
+              }
+              break;
+            }
+            throw new SolrException(ErrorCode.BAD_REQUEST, "Invalid status request for requestId: " + requestId + "" + srsp.getSolrResponse().getResponse().get("STATUS") +
+                "retried " + counter + "times");
+          } else {
+            throw new SolrException(ErrorCode.BAD_REQUEST, "Invalid status request " + srsp.getSolrResponse().getResponse().get("STATUS"));
+          }
+        }
+      } while (srsp != null);
+    } while(true);
+  }
+
+  @Override
+  public String getName() {
+    return "Overseer Collection Message Handler";
+  }
+
+  @Override
+  public String getTimerName(String operation) {
+    return "collection_" + operation;
+  }
+
+  @Override
+  public String getTaskKey(ZkNodeProps message) {
+    return message.containsKey(COLLECTION_PROP) ?
+      message.getStr(COLLECTION_PROP) : message.getStr(NAME);
+  }
+
+
+  private long sessionId = -1;
+  private LockTree.Session lockSession;
+
+  @Override
+  public Lock lockTask(ZkNodeProps message, OverseerTaskProcessor.TaskBatch taskBatch) {
+    if (lockSession == null || sessionId != taskBatch.getId()) {
+      //this is always called in the same thread.
+      //Each batch is supposed to have a new taskBatch
+      //So if taskBatch changes we must create a new Session
+      // also check if the running tasks are empty. If yes, clear lockTree
+      // this will ensure that locks are not 'leaked'
+      if(taskBatch.getRunningTasks() == 0) lockTree.clear();
+      lockSession = lockTree.getSession();
+    }
+    return lockSession.lock(getCollectionAction(message.getStr(Overseer.QUEUE_OPERATION)),
+        Arrays.asList(
+            getTaskKey(message),
+            message.getStr(ZkStateReader.SHARD_ID_PROP),
+            message.getStr(ZkStateReader.REPLICA_PROP))
+
+    );
+  }
+
+
+  @Override
+  public void close() throws IOException {
+    this.isClosed = true;
+    if (tpe != null) {
+      if (!tpe.isShutdown()) {
+        ExecutorUtil.shutdownAndAwaitTermination(tpe);
+      }
+    }
+  }
+
+  @Override
+  public boolean isClosed() {
+    return isClosed;
+  }
+
+  protected interface Cmd {
+    void call(ClusterState state, ZkNodeProps message, NamedList results) throws Exception;
+  }
+}

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/a3c4f738/solr/core/src/java/org/apache/solr/cloud/api/collections/OverseerRoleCmd.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/OverseerRoleCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/OverseerRoleCmd.java
new file mode 100644
index 0000000..16f9327
--- /dev/null
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/OverseerRoleCmd.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.cloud.api.collections;
+
+
+import java.lang.invoke.MethodHandles;
+import java.util.ArrayList;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.solr.cloud.OverseerNodePrioritizer;
+import org.apache.solr.common.cloud.ClusterState;
+import org.apache.solr.common.cloud.SolrZkClient;
+import org.apache.solr.common.cloud.ZkNodeProps;
+import org.apache.solr.common.cloud.ZkStateReader;
+import org.apache.solr.common.params.CollectionParams.CollectionAction;
+import org.apache.solr.common.util.NamedList;
+import org.apache.solr.common.util.Utils;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.data.Stat;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.solr.common.params.CollectionParams.CollectionAction.ADDROLE;
+import static org.apache.solr.common.params.CollectionParams.CollectionAction.REMOVEROLE;
+
+public class OverseerRoleCmd implements OverseerCollectionMessageHandler.Cmd {
+  private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+  private final OverseerCollectionMessageHandler ocmh;
+  private final CollectionAction operation;
+  private final OverseerNodePrioritizer overseerPrioritizer;
+
+
+
+  public OverseerRoleCmd(OverseerCollectionMessageHandler ocmh, CollectionAction operation, OverseerNodePrioritizer prioritizer) {
+    this.ocmh = ocmh;
+    this.operation = operation;
+    this.overseerPrioritizer = prioritizer;
+  }
+
+  @Override
+  @SuppressWarnings("unchecked")
+  public void call(ClusterState state, ZkNodeProps message, NamedList results) throws Exception {
+    ZkStateReader zkStateReader = ocmh.zkStateReader;
+    SolrZkClient zkClient = zkStateReader.getZkClient();
+    Map roles = null;
+    String node = message.getStr("node");
+
+    String roleName = message.getStr("role");
+    boolean nodeExists = false;
+    if (nodeExists = zkClient.exists(ZkStateReader.ROLES, true)) {
+      roles = (Map) Utils.fromJSON(zkClient.getData(ZkStateReader.ROLES, null, new Stat(), true));
+    } else {
+      roles = new LinkedHashMap(1);
+    }
+
+    List nodeList = (List) roles.get(roleName);
+    if (nodeList == null) roles.put(roleName, nodeList = new ArrayList());
+    if (ADDROLE == operation) {
+      log.info("Overseer role added to {}", node);
+      if (!nodeList.contains(node)) nodeList.add(node);
+    } else if (REMOVEROLE == operation) {
+      log.info("Overseer role removed from {}", node);
+      nodeList.remove(node);
+    }
+
+    if (nodeExists) {
+      zkClient.setData(ZkStateReader.ROLES, Utils.toJSON(roles), true);
+    } else {
+      zkClient.create(ZkStateReader.ROLES, Utils.toJSON(roles), CreateMode.PERSISTENT, true);
+    }
+    //if there are too many nodes this command may time out. And most likely dedicated
+    // overseers are created when there are too many nodes  . So , do this operation in a separate thread
+    new Thread(() -> {
+      try {
+        overseerPrioritizer.prioritizeOverseerNodes(ocmh.myId);
+      } catch (Exception e) {
+        log.error("Error in prioritizing Overseer", e);
+      }
+
+    }).start();
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/a3c4f738/solr/core/src/java/org/apache/solr/cloud/api/collections/OverseerStatusCmd.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/OverseerStatusCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/OverseerStatusCmd.java
new file mode 100644
index 0000000..6f0bbfd
--- /dev/null
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/OverseerStatusCmd.java
@@ -0,0 +1,113 @@
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.cloud.api.collections;
+
+import java.lang.invoke.MethodHandles;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import com.codahale.metrics.Timer;
+import org.apache.solr.cloud.OverseerTaskProcessor;
+import org.apache.solr.cloud.Stats;
+import org.apache.solr.common.cloud.ClusterState;
+import org.apache.solr.common.cloud.ZkNodeProps;
+import org.apache.solr.common.cloud.ZkStateReader;
+import org.apache.solr.common.util.NamedList;
+import org.apache.solr.common.util.SimpleOrderedMap;
+import org.apache.solr.util.stats.MetricUtils;
+import org.apache.zookeeper.data.Stat;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class OverseerStatusCmd implements OverseerCollectionMessageHandler.Cmd {
+  private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+  private final OverseerCollectionMessageHandler ocmh;
+
+  public OverseerStatusCmd(OverseerCollectionMessageHandler ocmh) {
+    this.ocmh = ocmh;
+  }
+
+  @Override
+  @SuppressWarnings("unchecked")
+  public void call(ClusterState state, ZkNodeProps message, NamedList results) throws Exception {
+    ZkStateReader zkStateReader = ocmh.zkStateReader;
+    String leaderNode = OverseerTaskProcessor.getLeaderNode(zkStateReader.getZkClient());
+    results.add("leader", leaderNode);
+    Stat stat = new Stat();
+    zkStateReader.getZkClient().getData("/overseer/queue",null, stat, true);
+    results.add("overseer_queue_size", stat.getNumChildren());
+    stat = new Stat();
+    zkStateReader.getZkClient().getData("/overseer/queue-work",null, stat, true);
+    results.add("overseer_work_queue_size", stat.getNumChildren());
+    stat = new Stat();
+    zkStateReader.getZkClient().getData("/overseer/collection-queue-work",null, stat, true);
+    results.add("overseer_collection_queue_size", stat.getNumChildren());
+
+    NamedList overseerStats = new NamedList();
+    NamedList collectionStats = new NamedList();
+    NamedList stateUpdateQueueStats = new NamedList();
+    NamedList workQueueStats = new NamedList();
+    NamedList collectionQueueStats = new NamedList();
+    Stats stats = ocmh.stats;
+    for (Map.Entry<String, Stats.Stat> entry : stats.getStats().entrySet()) {
+      String key = entry.getKey();
+      NamedList<Object> lst = new SimpleOrderedMap<>();
+      if (key.startsWith("collection_"))  {
+        collectionStats.add(key.substring(11), lst);
+        int successes = stats.getSuccessCount(entry.getKey());
+        int errors = stats.getErrorCount(entry.getKey());
+        lst.add("requests", successes);
+        lst.add("errors", errors);
+        List<Stats.FailedOp> failureDetails = stats.getFailureDetails(key);
+        if (failureDetails != null) {
+          List<SimpleOrderedMap<Object>> failures = new ArrayList<>();
+          for (Stats.FailedOp failedOp : failureDetails) {
+            SimpleOrderedMap<Object> fail = new SimpleOrderedMap<>();
+            fail.add("request", failedOp.req.getProperties());
+            fail.add("response", failedOp.resp.getResponse());
+            failures.add(fail);
+          }
+          lst.add("recent_failures", failures);
+        }
+      } else if (key.startsWith("/overseer/queue_"))  {
+        stateUpdateQueueStats.add(key.substring(16), lst);
+      } else if (key.startsWith("/overseer/queue-work_"))  {
+        workQueueStats.add(key.substring(21), lst);
+      } else if (key.startsWith("/overseer/collection-queue-work_"))  {
+        collectionQueueStats.add(key.substring(32), lst);
+      } else  {
+        // overseer stats
+        overseerStats.add(key, lst);
+        int successes = stats.getSuccessCount(entry.getKey());
+        int errors = stats.getErrorCount(entry.getKey());
+        lst.add("requests", successes);
+        lst.add("errors", errors);
+      }
+      Timer timer = entry.getValue().requestTime;
+      MetricUtils.addMetrics(lst, timer);
+    }
+    results.add("overseer_operations", overseerStats);
+    results.add("collection_operations", collectionStats);
+    results.add("overseer_queue", stateUpdateQueueStats);
+    results.add("overseer_internal_queue", workQueueStats);
+    results.add("collection_queue", collectionQueueStats);
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/a3c4f738/solr/core/src/java/org/apache/solr/cloud/api/collections/ReplaceNodeCmd.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/ReplaceNodeCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/ReplaceNodeCmd.java
new file mode 100644
index 0000000..35d2379
--- /dev/null
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/ReplaceNodeCmd.java
@@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.cloud.api.collections;
+
+
+import java.lang.invoke.MethodHandles;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.solr.cloud.ActiveReplicaWatcher;
+import org.apache.solr.common.SolrCloseableLatch;
+import org.apache.solr.common.SolrException;
+import org.apache.solr.common.cloud.ClusterState;
+import org.apache.solr.common.cloud.CollectionStateWatcher;
+import org.apache.solr.common.cloud.DocCollection;
+import org.apache.solr.common.cloud.Replica;
+import org.apache.solr.common.cloud.Slice;
+import org.apache.solr.common.cloud.ZkNodeProps;
+import org.apache.solr.common.cloud.ZkStateReader;
+import org.apache.solr.common.params.CollectionParams;
+import org.apache.solr.common.params.CommonAdminParams;
+import org.apache.solr.common.params.CoreAdminParams;
+import org.apache.solr.common.util.NamedList;
+import org.apache.zookeeper.KeeperException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.solr.common.cloud.ZkStateReader.COLLECTION_PROP;
+import static org.apache.solr.common.cloud.ZkStateReader.SHARD_ID_PROP;
+import static org.apache.solr.common.params.CommonAdminParams.ASYNC;
+
+public class ReplaceNodeCmd implements OverseerCollectionMessageHandler.Cmd {
+  private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+  private final OverseerCollectionMessageHandler ocmh;
+
+  public ReplaceNodeCmd(OverseerCollectionMessageHandler ocmh) {
+    this.ocmh = ocmh;
+  }
+
+  @Override
+  public void call(ClusterState state, ZkNodeProps message, NamedList results) throws Exception {
+    ZkStateReader zkStateReader = ocmh.zkStateReader;
+    String source = message.getStr(CollectionParams.SOURCE_NODE, message.getStr("source"));
+    String target = message.getStr(CollectionParams.TARGET_NODE, message.getStr("target"));
+    boolean waitForFinalState = message.getBool(CommonAdminParams.WAIT_FOR_FINAL_STATE, false);
+    if (source == null || target == null) {
+      throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "sourceNode and targetNode are required params" );
+    }
+    String async = message.getStr("async");
+    int timeout = message.getInt("timeout", 10 * 60); // 10 minutes
+    boolean parallel = message.getBool("parallel", false);
+    ClusterState clusterState = zkStateReader.getClusterState();
+
+    if (!clusterState.liveNodesContain(source)) {
+      throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Source Node: " + source + " is not live");
+    }
+    if (!clusterState.liveNodesContain(target)) {
+      throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Target Node: " + target + " is not live");
+    }
+    List<ZkNodeProps> sourceReplicas = getReplicasOfNode(source, clusterState);
+    // how many leaders are we moving? for these replicas we have to make sure that either:
+    // * another existing replica can become a leader, or
+    // * we wait until the newly created replica completes recovery (and can become the new leader)
+    // If waitForFinalState=true we wait for all replicas
+    int numLeaders = 0;
+    for (ZkNodeProps props : sourceReplicas) {
+      if (props.getBool(ZkStateReader.LEADER_PROP, false) || waitForFinalState) {
+        numLeaders++;
+      }
+    }
+    // map of collectionName_coreNodeName to watchers
+    Map<String, CollectionStateWatcher> watchers = new HashMap<>();
+    List<ZkNodeProps> createdReplicas = new ArrayList<>();
+
+    AtomicBoolean anyOneFailed = new AtomicBoolean(false);
+    SolrCloseableLatch countDownLatch = new SolrCloseableLatch(sourceReplicas.size(), ocmh);
+
+    SolrCloseableLatch replicasToRecover = new SolrCloseableLatch(numLeaders, ocmh);
+
+    for (ZkNodeProps sourceReplica : sourceReplicas) {
+      NamedList nl = new NamedList();
+      log.info("Going to create replica for collection={} shard={} on node={}", sourceReplica.getStr(COLLECTION_PROP), sourceReplica.getStr(SHARD_ID_PROP), target);
+      ZkNodeProps msg = sourceReplica.plus("parallel", String.valueOf(parallel)).plus(CoreAdminParams.NODE, target);
+      if(async!=null) msg.getProperties().put(ASYNC, async);
+      final ZkNodeProps addedReplica = ocmh.addReplica(clusterState,
+          msg, nl, () -> {
+            countDownLatch.countDown();
+            if (nl.get("failure") != null) {
+              String errorString = String.format(Locale.ROOT, "Failed to create replica for collection=%s shard=%s" +
+                  " on node=%s", sourceReplica.getStr(COLLECTION_PROP), sourceReplica.getStr(SHARD_ID_PROP), target);
+              log.warn(errorString);
+              // one replica creation failed. Make the best attempt to
+              // delete all the replicas created so far in the target
+              // and exit
+              synchronized (results) {
+                results.add("failure", errorString);
+                anyOneFailed.set(true);
+              }
+            } else {
+              log.debug("Successfully created replica for collection={} shard={} on node={}",
+                  sourceReplica.getStr(COLLECTION_PROP), sourceReplica.getStr(SHARD_ID_PROP), target);
+            }
+          });
+
+      if (addedReplica != null) {
+        createdReplicas.add(addedReplica);
+        if (sourceReplica.getBool(ZkStateReader.LEADER_PROP, false) || waitForFinalState) {
+          String shardName = sourceReplica.getStr(SHARD_ID_PROP);
+          String replicaName = sourceReplica.getStr(ZkStateReader.REPLICA_PROP);
+          String collectionName = sourceReplica.getStr(COLLECTION_PROP);
+          String key = collectionName + "_" + replicaName;
+          CollectionStateWatcher watcher;
+          if (waitForFinalState) {
+            watcher = new ActiveReplicaWatcher(collectionName, null,
+                Collections.singletonList(addedReplica.getStr(ZkStateReader.CORE_NAME_PROP)), replicasToRecover);
+          } else {
+            watcher = new LeaderRecoveryWatcher(collectionName, shardName, replicaName,
+                addedReplica.getStr(ZkStateReader.CORE_NAME_PROP), replicasToRecover);
+          }
+          watchers.put(key, watcher);
+          log.debug("--- adding " + key + ", " + watcher);
+          zkStateReader.registerCollectionStateWatcher(collectionName, watcher);
+        } else {
+          log.debug("--- not waiting for " + addedReplica);
+        }
+      }
+    }
+
+    log.debug("Waiting for replicas to be added");
+    if (!countDownLatch.await(timeout, TimeUnit.SECONDS)) {
+      log.info("Timed out waiting for replicas to be added");
+      anyOneFailed.set(true);
+    } else {
+      log.debug("Finished waiting for replicas to be added");
+    }
+
+    // now wait for leader replicas to recover
+    log.debug("Waiting for " + numLeaders + " leader replicas to recover");
+    if (!replicasToRecover.await(timeout, TimeUnit.SECONDS)) {
+      log.info("Timed out waiting for " + replicasToRecover.getCount() + " leader replicas to recover");
+      anyOneFailed.set(true);
+    } else {
+      log.debug("Finished waiting for leader replicas to recover");
+    }
+    // remove the watchers, we're done either way
+    for (Map.Entry<String, CollectionStateWatcher> e : watchers.entrySet()) {
+      zkStateReader.removeCollectionStateWatcher(e.getKey(), e.getValue());
+    }
+    if (anyOneFailed.get()) {
+      log.info("Failed to create some replicas. Cleaning up all replicas on target node");
+      SolrCloseableLatch cleanupLatch = new SolrCloseableLatch(createdReplicas.size(), ocmh);
+      for (ZkNodeProps createdReplica : createdReplicas) {
+        NamedList deleteResult = new NamedList();
+        try {
+          ocmh.deleteReplica(zkStateReader.getClusterState(), createdReplica.plus("parallel", "true"), deleteResult, () -> {
+            cleanupLatch.countDown();
+            if (deleteResult.get("failure") != null) {
+              synchronized (results) {
+                results.add("failure", "Could not cleanup, because of : " + deleteResult.get("failure"));
+              }
+            }
+          });
+        } catch (KeeperException e) {
+          cleanupLatch.countDown();
+          log.warn("Error deleting replica ", e);
+        } catch (Exception e) {
+          log.warn("Error deleting replica ", e);
+          cleanupLatch.countDown();
+          throw e;
+        }
+      }
+      cleanupLatch.await(5, TimeUnit.MINUTES);
+      return;
+    }
+
+
+    // we have reached this far means all replicas could be recreated
+    //now cleanup the replicas in the source node
+    DeleteNodeCmd.cleanupReplicas(results, state, sourceReplicas, ocmh, source, async);
+    results.add("success", "REPLACENODE action completed successfully from  : " + source + " to : " + target);
+  }
+
+  static List<ZkNodeProps> getReplicasOfNode(String source, ClusterState state) {
+    List<ZkNodeProps> sourceReplicas = new ArrayList<>();
+    for (Map.Entry<String, DocCollection> e : state.getCollectionsMap().entrySet()) {
+      for (Slice slice : e.getValue().getSlices()) {
+        for (Replica replica : slice.getReplicas()) {
+          if (source.equals(replica.getNodeName())) {
+            ZkNodeProps props = new ZkNodeProps(
+                COLLECTION_PROP, e.getKey(),
+                SHARD_ID_PROP, slice.getName(),
+                ZkStateReader.CORE_NAME_PROP, replica.getCoreName(),
+                ZkStateReader.REPLICA_PROP, replica.getName(),
+                ZkStateReader.REPLICA_TYPE, replica.getType().name(),
+                ZkStateReader.LEADER_PROP, String.valueOf(replica.equals(slice.getLeader())),
+                CoreAdminParams.NODE, source);
+            sourceReplicas.add(props);
+          }
+        }
+      }
+    }
+    return sourceReplicas;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/a3c4f738/solr/core/src/java/org/apache/solr/cloud/api/collections/RestoreCmd.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/RestoreCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/RestoreCmd.java
new file mode 100644
index 0000000..09ceb55
--- /dev/null
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/RestoreCmd.java
@@ -0,0 +1,357 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.cloud.api.collections;
+
+
+import java.lang.invoke.MethodHandles;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.Set;
+
+import org.apache.solr.client.solrj.cloud.DistributedQueue;
+import org.apache.solr.client.solrj.cloud.autoscaling.PolicyHelper;
+import org.apache.solr.cloud.Overseer;
+import org.apache.solr.cloud.overseer.OverseerAction;
+import org.apache.solr.common.SolrException;
+import org.apache.solr.common.SolrException.ErrorCode;
+import org.apache.solr.common.cloud.ClusterState;
+import org.apache.solr.common.cloud.DocCollection;
+import org.apache.solr.common.cloud.ImplicitDocRouter;
+import org.apache.solr.common.cloud.Replica;
+import org.apache.solr.common.cloud.ReplicaPosition;
+import org.apache.solr.common.cloud.Slice;
+import org.apache.solr.common.cloud.ZkNodeProps;
+import org.apache.solr.common.cloud.ZkStateReader;
+import org.apache.solr.common.params.CoreAdminParams;
+import org.apache.solr.common.params.ModifiableSolrParams;
+import org.apache.solr.common.util.NamedList;
+import org.apache.solr.common.util.StrUtils;
+import org.apache.solr.common.util.Utils;
+import org.apache.solr.core.CoreContainer;
+import org.apache.solr.core.backup.BackupManager;
+import org.apache.solr.core.backup.repository.BackupRepository;
+import org.apache.solr.handler.component.ShardHandler;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.solr.common.cloud.DocCollection.STATE_FORMAT;
+import static org.apache.solr.common.cloud.ZkStateReader.COLLECTION_PROP;
+import static org.apache.solr.common.cloud.ZkStateReader.MAX_SHARDS_PER_NODE;
+import static org.apache.solr.common.cloud.ZkStateReader.NRT_REPLICAS;
+import static org.apache.solr.common.cloud.ZkStateReader.PULL_REPLICAS;
+import static org.apache.solr.common.cloud.ZkStateReader.REPLICATION_FACTOR;
+import static org.apache.solr.common.cloud.ZkStateReader.REPLICA_TYPE;
+import static org.apache.solr.common.cloud.ZkStateReader.SHARD_ID_PROP;
+import static org.apache.solr.common.cloud.ZkStateReader.TLOG_REPLICAS;
+import static org.apache.solr.common.params.CollectionParams.CollectionAction.CREATE;
+import static org.apache.solr.common.params.CollectionParams.CollectionAction.CREATESHARD;
+import static org.apache.solr.common.params.CommonAdminParams.ASYNC;
+import static org.apache.solr.common.params.CommonParams.NAME;
+
+public class RestoreCmd implements OverseerCollectionMessageHandler.Cmd {
+  private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+  private final OverseerCollectionMessageHandler ocmh;
+
+  public RestoreCmd(OverseerCollectionMessageHandler ocmh) {
+    this.ocmh = ocmh;
+  }
+
+  @Override
+  public void call(ClusterState state, ZkNodeProps message, NamedList results) throws Exception {
+    // TODO maybe we can inherit createCollection's options/code
+
+    String restoreCollectionName = message.getStr(COLLECTION_PROP);
+    String backupName = message.getStr(NAME); // of backup
+    ShardHandler shardHandler = ocmh.shardHandlerFactory.getShardHandler();
+    String asyncId = message.getStr(ASYNC);
+    String repo = message.getStr(CoreAdminParams.BACKUP_REPOSITORY);
+    Map<String, String> requestMap = new HashMap<>();
+
+    CoreContainer cc = ocmh.overseer.getCoreContainer();
+    BackupRepository repository = cc.newBackupRepository(Optional.ofNullable(repo));
+
+    URI location = repository.createURI(message.getStr(CoreAdminParams.BACKUP_LOCATION));
+    URI backupPath = repository.resolve(location, backupName);
+    ZkStateReader zkStateReader = ocmh.zkStateReader;
+    BackupManager backupMgr = new BackupManager(repository, zkStateReader);
+
+    Properties properties = backupMgr.readBackupProperties(location, backupName);
+    String backupCollection = properties.getProperty(BackupManager.COLLECTION_NAME_PROP);
+    DocCollection backupCollectionState = backupMgr.readCollectionState(location, backupName, backupCollection);
+
+    // Get the Solr nodes to restore a collection.
+    final List<String> nodeList = Assign.getLiveOrLiveAndCreateNodeSetList(
+        zkStateReader.getClusterState().getLiveNodes(), message, OverseerCollectionMessageHandler.RANDOM);
+
+    int numShards = backupCollectionState.getActiveSlices().size();
+    
+    int numNrtReplicas = getInt(message, NRT_REPLICAS, backupCollectionState.getNumNrtReplicas(), 0);
+    if (numNrtReplicas == 0) {
+      numNrtReplicas = getInt(message, REPLICATION_FACTOR, backupCollectionState.getReplicationFactor(), 0);
+    }
+    int numTlogReplicas = getInt(message, TLOG_REPLICAS, backupCollectionState.getNumTlogReplicas(), 0);
+    int numPullReplicas = getInt(message, PULL_REPLICAS, backupCollectionState.getNumPullReplicas(), 0);
+    int totalReplicasPerShard = numNrtReplicas + numTlogReplicas + numPullReplicas;
+    
+    int maxShardsPerNode = message.getInt(MAX_SHARDS_PER_NODE, backupCollectionState.getMaxShardsPerNode());
+    int availableNodeCount = nodeList.size();
+    if ((numShards * totalReplicasPerShard) > (availableNodeCount * maxShardsPerNode)) {
+      throw new SolrException(ErrorCode.BAD_REQUEST,
+          String.format(Locale.ROOT, "Solr cloud with available number of nodes:%d is insufficient for"
+              + " restoring a collection with %d shards, total replicas per shard %d and maxShardsPerNode %d."
+              + " Consider increasing maxShardsPerNode value OR number of available nodes.",
+              availableNodeCount, numShards, totalReplicasPerShard, maxShardsPerNode));
+    }
+
+    //Upload the configs
+    String configName = (String) properties.get(OverseerCollectionMessageHandler.COLL_CONF);
+    String restoreConfigName = message.getStr(OverseerCollectionMessageHandler.COLL_CONF, configName);
+    if (zkStateReader.getConfigManager().configExists(restoreConfigName)) {
+      log.info("Using existing config {}", restoreConfigName);
+      //TODO add overwrite option?
+    } else {
+      log.info("Uploading config {}", restoreConfigName);
+      backupMgr.uploadConfigDir(location, backupName, configName, restoreConfigName);
+    }
+
+    log.info("Starting restore into collection={} with backup_name={} at location={}", restoreCollectionName, backupName,
+        location);
+
+    //Create core-less collection
+    {
+      Map<String, Object> propMap = new HashMap<>();
+      propMap.put(Overseer.QUEUE_OPERATION, CREATE.toString());
+      propMap.put("fromApi", "true"); // mostly true.  Prevents autoCreated=true in the collection state.
+      if (properties.get(STATE_FORMAT) == null) {
+        propMap.put(STATE_FORMAT, "2");
+      }
+
+      // inherit settings from input API, defaulting to the backup's setting.  Ex: replicationFactor
+      for (String collProp : OverseerCollectionMessageHandler.COLL_PROPS.keySet()) {
+        Object val = message.getProperties().getOrDefault(collProp, backupCollectionState.get(collProp));
+        if (val != null) {
+          propMap.put(collProp, val);
+        }
+      }
+
+      propMap.put(NAME, restoreCollectionName);
+      propMap.put(OverseerCollectionMessageHandler.CREATE_NODE_SET, OverseerCollectionMessageHandler.CREATE_NODE_SET_EMPTY); //no cores
+      propMap.put(OverseerCollectionMessageHandler.COLL_CONF, restoreConfigName);
+
+      // router.*
+      @SuppressWarnings("unchecked")
+      Map<String, Object> routerProps = (Map<String, Object>) backupCollectionState.getProperties().get(DocCollection.DOC_ROUTER);
+      for (Map.Entry<String, Object> pair : routerProps.entrySet()) {
+        propMap.put(DocCollection.DOC_ROUTER + "." + pair.getKey(), pair.getValue());
+      }
+
+      Set<String> sliceNames = backupCollectionState.getActiveSlicesMap().keySet();
+      if (backupCollectionState.getRouter() instanceof ImplicitDocRouter) {
+        propMap.put(OverseerCollectionMessageHandler.SHARDS_PROP, StrUtils.join(sliceNames, ','));
+      } else {
+        propMap.put(OverseerCollectionMessageHandler.NUM_SLICES, sliceNames.size());
+        // ClusterStateMutator.createCollection detects that "slices" is in fact a slice structure instead of a
+        //   list of names, and if so uses this instead of building it.  We clear the replica list.
+        Collection<Slice> backupSlices = backupCollectionState.getActiveSlices();
+        Map<String, Slice> newSlices = new LinkedHashMap<>(backupSlices.size());
+        for (Slice backupSlice : backupSlices) {
+          newSlices.put(backupSlice.getName(),
+              new Slice(backupSlice.getName(), Collections.emptyMap(), backupSlice.getProperties()));
+        }
+        propMap.put(OverseerCollectionMessageHandler.SHARDS_PROP, newSlices);
+      }
+
+      ocmh.commandMap.get(CREATE).call(zkStateReader.getClusterState(), new ZkNodeProps(propMap), new NamedList());
+      // note: when createCollection() returns, the collection exists (no race)
+    }
+
+    DocCollection restoreCollection = zkStateReader.getClusterState().getCollection(restoreCollectionName);
+
+    DistributedQueue inQueue = Overseer.getStateUpdateQueue(zkStateReader.getZkClient());
+
+    //Mark all shards in CONSTRUCTION STATE while we restore the data
+    {
+      //TODO might instead createCollection accept an initial state?  Is there a race?
+      Map<String, Object> propMap = new HashMap<>();
+      propMap.put(Overseer.QUEUE_OPERATION, OverseerAction.UPDATESHARDSTATE.toLower());
+      for (Slice shard : restoreCollection.getSlices()) {
+        propMap.put(shard.getName(), Slice.State.CONSTRUCTION.toString());
+      }
+      propMap.put(ZkStateReader.COLLECTION_PROP, restoreCollectionName);
+      inQueue.offer(Utils.toJSON(new ZkNodeProps(propMap)));
+    }
+
+    // TODO how do we leverage the RULE / SNITCH logic in createCollection?
+
+    ClusterState clusterState = zkStateReader.getClusterState();
+
+    List<String> sliceNames = new ArrayList<>();
+    restoreCollection.getSlices().forEach(x -> sliceNames.add(x.getName()));
+    PolicyHelper.SessionWrapper sessionWrapper = null;
+
+    try {
+      List<ReplicaPosition> replicaPositions = Assign.identifyNodes(
+          ocmh.cloudManager, clusterState,
+          nodeList, restoreCollectionName,
+          message, sliceNames,
+          numNrtReplicas, numTlogReplicas, numPullReplicas);
+      sessionWrapper = PolicyHelper.getLastSessionWrapper(true);
+      //Create one replica per shard and copy backed up data to it
+      for (Slice slice : restoreCollection.getSlices()) {
+        log.debug("Adding replica for shard={} collection={} ", slice.getName(), restoreCollection);
+        HashMap<String, Object> propMap = new HashMap<>();
+        propMap.put(Overseer.QUEUE_OPERATION, CREATESHARD);
+        propMap.put(COLLECTION_PROP, restoreCollectionName);
+        propMap.put(SHARD_ID_PROP, slice.getName());
+
+        if (numNrtReplicas >= 1) {
+          propMap.put(REPLICA_TYPE, Replica.Type.NRT.name());
+        } else if (numTlogReplicas >= 1) {
+          propMap.put(REPLICA_TYPE, Replica.Type.TLOG.name());
+        } else {
+          throw new SolrException(ErrorCode.BAD_REQUEST, "Unexpected number of replicas, replicationFactor, " +
+              Replica.Type.NRT + " or " + Replica.Type.TLOG + " must be greater than 0");
+        }
+
+        // Get the first node matching the shard to restore in
+        String node;
+        for (ReplicaPosition replicaPosition : replicaPositions) {
+          if (Objects.equals(replicaPosition.shard, slice.getName())) {
+            node = replicaPosition.node;
+            propMap.put(CoreAdminParams.NODE, node);
+            replicaPositions.remove(replicaPosition);
+            break;
+          }
+        }
+
+        // add async param
+        if (asyncId != null) {
+          propMap.put(ASYNC, asyncId);
+        }
+        ocmh.addPropertyParams(message, propMap);
+
+        ocmh.addReplica(clusterState, new ZkNodeProps(propMap), new NamedList(), null);
+      }
+
+      //refresh the location copy of collection state
+      restoreCollection = zkStateReader.getClusterState().getCollection(restoreCollectionName);
+
+      //Copy data from backed up index to each replica
+      for (Slice slice : restoreCollection.getSlices()) {
+        ModifiableSolrParams params = new ModifiableSolrParams();
+        params.set(CoreAdminParams.ACTION, CoreAdminParams.CoreAdminAction.RESTORECORE.toString());
+        params.set(NAME, "snapshot." + slice.getName());
+        params.set(CoreAdminParams.BACKUP_LOCATION, backupPath.toASCIIString());
+        params.set(CoreAdminParams.BACKUP_REPOSITORY, repo);
+
+        ocmh.sliceCmd(clusterState, params, null, slice, shardHandler, asyncId, requestMap);
+      }
+      ocmh.processResponses(new NamedList(), shardHandler, true, "Could not restore core", asyncId, requestMap);
+
+      //Mark all shards in ACTIVE STATE
+      {
+        HashMap<String, Object> propMap = new HashMap<>();
+        propMap.put(Overseer.QUEUE_OPERATION, OverseerAction.UPDATESHARDSTATE.toLower());
+        propMap.put(ZkStateReader.COLLECTION_PROP, restoreCollectionName);
+        for (Slice shard : restoreCollection.getSlices()) {
+          propMap.put(shard.getName(), Slice.State.ACTIVE.toString());
+        }
+        inQueue.offer(Utils.toJSON(new ZkNodeProps(propMap)));
+      }
+
+      //refresh the location copy of collection state
+      restoreCollection = zkStateReader.getClusterState().getCollection(restoreCollectionName);
+
+      if (totalReplicasPerShard > 1) {
+        log.info("Adding replicas to restored collection={}", restoreCollection);
+        for (Slice slice : restoreCollection.getSlices()) {
+
+          //Add the remaining replicas for each shard, considering it's type
+          int createdNrtReplicas = 0, createdTlogReplicas = 0, createdPullReplicas = 0;
+
+          // We already created either a NRT or an TLOG replica as leader
+          if (numNrtReplicas > 0) {
+            createdNrtReplicas++;
+          } else if (createdTlogReplicas > 0) {
+            createdTlogReplicas++;
+          }
+
+          for (int i = 1; i < totalReplicasPerShard; i++) {
+            Replica.Type typeToCreate;
+            if (createdNrtReplicas < numNrtReplicas) {
+              createdNrtReplicas++;
+              typeToCreate = Replica.Type.NRT;
+            } else if (createdTlogReplicas < numTlogReplicas) {
+              createdTlogReplicas++;
+              typeToCreate = Replica.Type.TLOG;
+            } else {
+              createdPullReplicas++;
+              typeToCreate = Replica.Type.PULL;
+              assert createdPullReplicas <= numPullReplicas: "Unexpected number of replicas";
+            }
+
+            log.debug("Adding replica for shard={} collection={} of type {} ", slice.getName(), restoreCollection, typeToCreate);
+            HashMap<String, Object> propMap = new HashMap<>();
+            propMap.put(COLLECTION_PROP, restoreCollectionName);
+            propMap.put(SHARD_ID_PROP, slice.getName());
+            propMap.put(REPLICA_TYPE, typeToCreate.name());
+
+            // Get the first node matching the shard to restore in
+            String node;
+            for (ReplicaPosition replicaPosition : replicaPositions) {
+              if (Objects.equals(replicaPosition.shard, slice.getName())) {
+                node = replicaPosition.node;
+                propMap.put(CoreAdminParams.NODE, node);
+                replicaPositions.remove(replicaPosition);
+                break;
+              }
+            }
+
+            // add async param
+            if (asyncId != null) {
+              propMap.put(ASYNC, asyncId);
+            }
+            ocmh.addPropertyParams(message, propMap);
+
+            ocmh.addReplica(zkStateReader.getClusterState(), new ZkNodeProps(propMap), results, null);
+          }
+        }
+      }
+
+      log.info("Completed restoring collection={} backupName={}", restoreCollection, backupName);
+    } finally {
+      if (sessionWrapper != null) sessionWrapper.release();
+    }
+  }
+
+  private int getInt(ZkNodeProps message, String propertyName, Integer count, int defaultValue) {
+    Integer value = message.getInt(propertyName, count);
+    return value!=null ? value:defaultValue;
+  }
+}