You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by no...@apache.org on 2015/05/14 18:25:53 UTC

svn commit: r1679397 - in /lucene/dev/trunk/solr: ./ core/src/java/org/apache/solr/handler/admin/ solrj/src/java/org/apache/solr/common/params/

Author: noble
Date: Thu May 14 16:25:52 2015
New Revision: 1679397

URL: http://svn.apache.org/r1679397
Log:
SOLR-7544: CollectionsHandler refactored to be more modular

Added:
    lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/admin/RebalanceLeaders.java   (with props)
Modified:
    lucene/dev/trunk/solr/CHANGES.txt
    lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java
    lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/common/params/SolrParams.java

Modified: lucene/dev/trunk/solr/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/CHANGES.txt?rev=1679397&r1=1679396&r2=1679397&view=diff
==============================================================================
--- lucene/dev/trunk/solr/CHANGES.txt (original)
+++ lucene/dev/trunk/solr/CHANGES.txt Thu May 14 16:25:52 2015
@@ -360,6 +360,8 @@ Other Changes
   the actual ErrorCode is used when available.
   (Hrishikesh Gadre via Shawn Heisey)
 
+* SOLR-7544: CollectionsHandler refactored to be more modular (Noble Paul)
+
 ==================  5.1.0 ==================
 
 Consult the LUCENE_CHANGES.txt file for additional, low level, changes in this release

Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java?rev=1679397&r1=1679396&r2=1679397&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java Thu May 14 16:25:52 2015
@@ -17,45 +17,30 @@ package org.apache.solr.handler.admin;
  * limitations under the License.
  */
 
-import static org.apache.solr.cloud.Overseer.*;
-import static org.apache.solr.cloud.OverseerCollectionProcessor.*;
-import static org.apache.solr.common.cloud.DocCollection.*;
-import static org.apache.solr.common.cloud.ZkStateReader.*;
-import static org.apache.solr.common.params.CollectionParams.CollectionAction.*;
-import static org.apache.solr.common.params.CommonParams.*;
-
-import java.io.IOException;
-import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
-import java.util.HashMap;
 import java.util.Iterator;
+import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Locale;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
 
+import com.google.common.collect.ImmutableSet;
 import org.apache.commons.lang.StringUtils;
 import org.apache.solr.client.solrj.SolrResponse;
-import org.apache.solr.client.solrj.SolrServerException;
 import org.apache.solr.client.solrj.impl.HttpSolrClient;
 import org.apache.solr.client.solrj.request.CoreAdminRequest;
 import org.apache.solr.client.solrj.request.CoreAdminRequest.RequestSyncShard;
 import org.apache.solr.cloud.DistributedQueue;
 import org.apache.solr.cloud.DistributedQueue.QueueEvent;
-import org.apache.solr.cloud.LeaderElector;
-import org.apache.solr.cloud.Overseer;
-import org.apache.solr.cloud.OverseerCollectionProcessor;
 import org.apache.solr.cloud.OverseerSolrResponse;
 import org.apache.solr.cloud.overseer.SliceMutator;
 import org.apache.solr.cloud.rule.Rule;
 import org.apache.solr.common.SolrException;
 import org.apache.solr.common.SolrException.ErrorCode;
 import org.apache.solr.common.cloud.ClusterState;
-import org.apache.solr.common.cloud.DocCollection;
 import org.apache.solr.common.cloud.ImplicitDocRouter;
-import org.apache.solr.common.cloud.Replica;
-import org.apache.solr.common.cloud.Slice;
 import org.apache.solr.common.cloud.SolrZkClient;
 import org.apache.solr.common.cloud.ZkCoreNodeProps;
 import org.apache.solr.common.cloud.ZkNodeProps;
@@ -63,7 +48,6 @@ import org.apache.solr.common.cloud.ZkSt
 import org.apache.solr.common.params.CollectionParams.CollectionAction;
 import org.apache.solr.common.params.CoreAdminParams;
 import org.apache.solr.common.params.ModifiableSolrParams;
-import org.apache.solr.common.params.ShardParams;
 import org.apache.solr.common.params.SolrParams;
 import org.apache.solr.common.util.NamedList;
 import org.apache.solr.common.util.SimpleOrderedMap;
@@ -77,7 +61,36 @@ import org.apache.zookeeper.KeeperExcept
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.collect.ImmutableSet;
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.apache.solr.cloud.Overseer.QUEUE_OPERATION;
+import static org.apache.solr.cloud.OverseerCollectionProcessor.ASYNC;
+import static org.apache.solr.cloud.OverseerCollectionProcessor.COLL_CONF;
+import static org.apache.solr.cloud.OverseerCollectionProcessor.COLL_PROP_PREFIX;
+import static org.apache.solr.cloud.OverseerCollectionProcessor.CREATE_NODE_SET;
+import static org.apache.solr.cloud.OverseerCollectionProcessor.CREATE_NODE_SET_SHUFFLE;
+import static org.apache.solr.cloud.OverseerCollectionProcessor.NUM_SLICES;
+import static org.apache.solr.cloud.OverseerCollectionProcessor.ONLY_ACTIVE_NODES;
+import static org.apache.solr.cloud.OverseerCollectionProcessor.ONLY_IF_DOWN;
+import static org.apache.solr.cloud.OverseerCollectionProcessor.REQUESTID;
+import static org.apache.solr.cloud.OverseerCollectionProcessor.SHARDS_PROP;
+import static org.apache.solr.cloud.OverseerCollectionProcessor.SHARD_UNIQUE;
+import static org.apache.solr.common.cloud.DocCollection.DOC_ROUTER;
+import static org.apache.solr.common.cloud.DocCollection.STATE_FORMAT;
+import static org.apache.solr.common.cloud.ZkStateReader.AUTO_ADD_REPLICAS;
+import static org.apache.solr.common.cloud.ZkStateReader.COLLECTION_PROP;
+import static org.apache.solr.common.cloud.ZkStateReader.CONFIGS_ZKNODE;
+import static org.apache.solr.common.cloud.ZkStateReader.MAX_SHARDS_PER_NODE;
+import static org.apache.solr.common.cloud.ZkStateReader.PROPERTY_PROP;
+import static org.apache.solr.common.cloud.ZkStateReader.PROPERTY_VALUE_PROP;
+import static org.apache.solr.common.cloud.ZkStateReader.REPLICATION_FACTOR;
+import static org.apache.solr.common.cloud.ZkStateReader.REPLICA_PROP;
+import static org.apache.solr.common.cloud.ZkStateReader.SHARD_ID_PROP;
+import static org.apache.solr.common.params.CollectionParams.CollectionAction.*;
+import static org.apache.solr.common.params.CommonParams.NAME;
+import static org.apache.solr.common.params.CommonParams.VALUE_LONG;
+import static org.apache.solr.common.params.CoreAdminParams.DATA_DIR;
+import static org.apache.solr.common.params.CoreAdminParams.INSTANCE_DIR;
+import static org.apache.solr.common.params.ShardParams._ROUTE_;
 
 public class CollectionsHandler extends RequestHandlerBase {
   protected static Logger log = LoggerFactory.getLogger(CollectionsHandler.class);
@@ -85,8 +98,8 @@ public class CollectionsHandler extends
 
   public CollectionsHandler() {
     super();
-    // Unlike most request handlers, CoreContainer initialization 
-    // should happen in the constructor...  
+    // Unlike most request handlers, CoreContainer initialization
+    // should happen in the constructor...
     this.coreContainer = null;
   }
 
@@ -133,528 +146,51 @@ public class CollectionsHandler extends
 
     // Pick the action
     SolrParams params = req.getParams();
-    CollectionAction action = null;
     String a = params.get(CoreAdminParams.ACTION);
     if (a != null) {
-      action = CollectionAction.get(a);
-    }
-    if (action == null) {
-      throw new SolrException(ErrorCode.BAD_REQUEST, "Unknown action: " + a);
-    }
-
-    switch (action) {
-      case CREATE: {
-        this.handleCreateAction(req, rsp);
-        break;
-      }
-      case DELETE: {
-        this.handleDeleteAction(req, rsp);
-        break;
-      }
-      case RELOAD: {
-        this.handleReloadAction(req, rsp);
-        break;
-      }
-      case SYNCSHARD: {
-        this.handleSyncShardAction(req, rsp);
-        break;
-      }
-      case CREATEALIAS: {
-        this.handleCreateAliasAction(req, rsp);
-        break;
-      }
-      case DELETEALIAS: {
-        this.handleDeleteAliasAction(req, rsp);
-        break;
-      }
-      case SPLITSHARD:  {
-        this.handleSplitShardAction(req, rsp);
-        break;
-      }
-      case DELETESHARD: {
-        this.handleDeleteShardAction(req, rsp);
-        break;
-      }
-      case CREATESHARD: {
-        this.handleCreateShard(req, rsp);
-        break;
-      }
-      case DELETEREPLICA: {
-        this.handleRemoveReplica(req, rsp);
-        break;
-      }
-      case MIGRATE: {
-        this.handleMigrate(req, rsp);
-        break;
-      }
-      case ADDROLE: {
-        handleRole(ADDROLE, req, rsp);
-        break;
-      }
-      case REMOVEROLE: {
-        handleRole(REMOVEROLE, req, rsp);
-        break;
-      }
-      case CLUSTERPROP: {
-        this.handleProp(req, rsp);
-        break;
-      }
-      case ADDREPLICA:  {
-        this.handleAddReplica(req, rsp);
-        break;
-      }
-      case REQUESTSTATUS: {
-        this.handleRequestStatus(req, rsp);
-        break;
-      }
-      case OVERSEERSTATUS:  {
-        this.handleOverseerStatus(req, rsp);
-        break;
-      }
-      case LIST: {
-        this.handleListAction(req, rsp);
-        break;
-      }
-      case CLUSTERSTATUS:  {
-        this.handleClusterStatus(req, rsp);
-        break;
-      }
-      case ADDREPLICAPROP: {
-        this.handleAddReplicaProp(req, rsp);
-        break;
-      }
-      case DELETEREPLICAPROP: {
-        this.handleDeleteReplicaProp(req, rsp);
-        break;
-      }
-      case BALANCESHARDUNIQUE: {
-        this.handleBalanceShardUnique(req, rsp);
-        break;
-      }
-      case REBALANCELEADERS: {
-        this.handleBalanceLeaders(req, rsp);
-        break;
-      }
-      default: {
-          throw new RuntimeException("Unknown action: " + action);
-      }
-    }
-
-    rsp.setHttpCaching(false);
-  }
-
-
-  private void handleBalanceLeaders(SolrQueryRequest req, SolrQueryResponse rsp) throws KeeperException, InterruptedException {
-    req.getParams().required().check(COLLECTION_PROP);
-
-    String collectionName = req.getParams().get(COLLECTION_PROP);
-    if (StringUtils.isBlank(collectionName)) {
-      throw new SolrException(ErrorCode.BAD_REQUEST,
-          String.format(Locale.ROOT, "The " + COLLECTION_PROP + " is required for the REASSIGNLEADERS command."));
-    }
-    coreContainer.getZkController().getZkStateReader().updateClusterState(true);
-    ClusterState clusterState = coreContainer.getZkController().getClusterState();
-    DocCollection dc = clusterState.getCollection(collectionName);
-    if (dc == null) {
-      throw new SolrException(ErrorCode.BAD_REQUEST, "Collection '" + collectionName + "' does not exist, no action taken.");
-    }
-    Map<String, String> currentRequests = new HashMap<>();
-    int max = req.getParams().getInt(MAX_AT_ONCE_PROP, Integer.MAX_VALUE);
-    if (max <= 0) max = Integer.MAX_VALUE;
-    int maxWaitSecs = req.getParams().getInt(MAX_WAIT_SECONDS_PROP, 60);
-    NamedList<Object> results = new NamedList<>();
-
-    boolean keepGoing = true;
-    for (Slice slice : dc.getSlices()) {
-      insurePreferredIsLeader(req, results, slice, currentRequests);
-      if (currentRequests.size() == max) {
-        log.info("Queued " + max + " leader reassignments, waiting for some to complete.");
-        keepGoing = waitForLeaderChange(currentRequests, maxWaitSecs, false, results);
-        if (keepGoing == false) {
-          break; // If we've waited longer than specified, don't continue to wait!
-        }
+      CollectionAction action = CollectionAction.get(a);
+      if (action == null)
+        throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Unknown action: " + a);
+      CollectionOperation operation = CollectionOperation.get(action);
+      log.info("Invoked Collection Action :{} with params{} ", action.toLower(), req.getParamString());
+      Map<String, Object> result = operation.call(req, rsp, this);
+      if (result != null) {
+        result.put(QUEUE_OPERATION, operation.action.toLower());
+        ZkNodeProps props = new ZkNodeProps(result);
+        handleResponse(operation.action.toLower(), props, rsp, operation.timeOut);
       }
-    }
-    if (keepGoing == true) {
-      keepGoing = waitForLeaderChange(currentRequests, maxWaitSecs, true, results);
-    }
-    if (keepGoing == true) {
-      log.info("All leader reassignments completed.");
     } else {
-      log.warn("Exceeded specified timeout of ." + maxWaitSecs + "' all leaders may not have been reassigned");
-    }
-
-    rsp.getValues().addAll(results);
-  }
-
-  private void insurePreferredIsLeader(SolrQueryRequest req, NamedList<Object> results,
-                                              Slice slice, Map<String, String> currentRequests) throws KeeperException, InterruptedException {
-    final String inactivePreferreds = "inactivePreferreds";
-    final String alreadyLeaders = "alreadyLeaders";
-    String collectionName = req.getParams().get(COLLECTION_PROP);
-
-    for (Replica replica : slice.getReplicas()) {
-      // Tell the replica to become the leader if we're the preferred leader AND active AND not the leader already
-      if (replica.getBool(SliceMutator.PREFERRED_LEADER_PROP, false) == false) {
-        continue;
-      }
-      // OK, we are the preferred leader, are we the actual leader?
-      if (replica.getBool(LEADER_PROP, false)) {
-        //We're a preferred leader, but we're _also_ the leader, don't need to do anything.
-        NamedList<Object> noops = (NamedList<Object>) results.get(alreadyLeaders);
-        if (noops == null) {
-          noops = new NamedList<>();
-          results.add(alreadyLeaders, noops);
-        }
-        NamedList<Object> res = new NamedList<>();
-        res.add("status", "success");
-        res.add("msg", "Already leader");
-        res.add("shard", slice.getName());
-        res.add("nodeName", replica.getNodeName());
-        noops.add(replica.getName(), res);
-        return; // already the leader, do nothing.
-      }
-
-      // We're the preferred leader, but someone else is leader. Only become leader if we're active.
-      if (replica.getState() != Replica.State.ACTIVE) {
-        NamedList<Object> inactives = (NamedList<Object>) results.get(inactivePreferreds);
-        if (inactives == null) {
-          inactives = new NamedList<>();
-          results.add(inactivePreferreds, inactives);
-        }
-        NamedList<Object> res = new NamedList<>();
-        res.add("status", "skipped");
-        res.add("msg", "Node is a referredLeader, but it's inactive. Skipping");
-        res.add("shard", slice.getName());
-        res.add("nodeName", replica.getNodeName());
-        inactives.add(replica.getName(), res);
-        return; // Don't try to become the leader if we're not active!
-      }
-
-      // Replica is the preferred leader but not the actual leader, do something about that.
-      // "Something" is
-      // 1> if the preferred leader isn't first in line, tell it to re-queue itself.
-      // 2> tell the actual leader to re-queue itself.
-
-      ZkStateReader zkStateReader = coreContainer.getZkController().getZkStateReader();
-
-      List<String> electionNodes = OverseerCollectionProcessor.getSortedElectionNodes(zkStateReader.getZkClient(),
-          ZkStateReader.getShardLeadersElectPath(collectionName, slice.getName()));
-
-      if (electionNodes.size() < 2) { // if there's only one node in the queue, should already be leader and we shouldn't be here anyway.
-        log.warn("Rebalancing leaders and slice " + slice.getName() + " has less than two elements in the leader " +
-            "election queue, but replica " + replica.getName() + " doesn't think it's the leader. Do nothing");
-        return;
-      }
-
-      // Ok, the sorting for election nodes is a bit strange. If the sequence numbers are the same, then the whole
-      // string is used, but that sorts nodes with the same sequence number by their session IDs from ZK.
-      // While this is determinate, it's not quite what we need, so re-queue nodes that aren't us and are
-      // watching the leader node..
-
-      String firstWatcher = electionNodes.get(1);
-
-      if (LeaderElector.getNodeName(firstWatcher).equals(replica.getName()) == false) {
-        makeReplicaFirstWatcher(collectionName, slice, replica);
-      }
-
-      String coreName = slice.getReplica(LeaderElector.getNodeName(electionNodes.get(0))).getStr(CORE_NAME_PROP);
-      rejoinElection(collectionName, slice, electionNodes.get(0), coreName, false);
-      waitForNodeChange(collectionName, slice, electionNodes.get(0));
-
-
-      return; // Done with this slice, skip the rest of the replicas.
-    }
-  }
-  // Put the replica in at the head of the queue and send all nodes with the same sequence number to the back of the list
-  void makeReplicaFirstWatcher(String collectionName, Slice slice, Replica replica)
-      throws KeeperException, InterruptedException {
-
-    ZkStateReader zkStateReader = coreContainer.getZkController().getZkStateReader();
-    List<String> electionNodes = OverseerCollectionProcessor.getSortedElectionNodes(zkStateReader.getZkClient(),
-        ZkStateReader.getShardLeadersElectPath(collectionName, slice.getName()));
-
-    // First, queue up the preferred leader at the head of the queue.
-    int newSeq = -1;
-    for (String electionNode : electionNodes) {
-      if (LeaderElector.getNodeName(electionNode).equals(replica.getName())) {
-        String coreName = slice.getReplica(LeaderElector.getNodeName(electionNode)).getStr(CORE_NAME_PROP);
-        rejoinElection(collectionName, slice, electionNode, coreName, true);
-        newSeq = waitForNodeChange(collectionName, slice, electionNode);
-        break;
-      }
-    }
-    if (newSeq == -1) {
-      return; // let's not continue if we didn't get what we expect. Possibly we're offline etc..
-    }
-
-    List<String> electionNodesTmp = OverseerCollectionProcessor.getSortedElectionNodes(zkStateReader.getZkClient(),
-        ZkStateReader.getShardLeadersElectPath(collectionName, slice.getName()));
-
-
-    // Now find other nodes that have the same sequence number as this node and re-queue them at the end of the queue.
-    electionNodes = OverseerCollectionProcessor.getSortedElectionNodes(zkStateReader.getZkClient(),
-        ZkStateReader.getShardLeadersElectPath(collectionName, slice.getName()));
-
-    for (String thisNode : electionNodes) {
-      if (LeaderElector.getSeq(thisNode) > newSeq) {
-        break;
-      }
-      if (LeaderElector.getNodeName(thisNode).equals(replica.getName())) {
-        continue;
-      }
-      if (LeaderElector.getSeq(thisNode) == newSeq) {
-        String coreName = slice.getReplica(LeaderElector.getNodeName(thisNode)).getStr(CORE_NAME_PROP);
-        rejoinElection(collectionName, slice, thisNode, coreName, false);
-        waitForNodeChange(collectionName, slice, thisNode);
-      }
-    }
-  }
-
-  int waitForNodeChange(String collectionName, Slice slice, String electionNode) throws InterruptedException, KeeperException {
-    String nodeName = LeaderElector.getNodeName(electionNode);
-    int oldSeq = LeaderElector.getSeq(electionNode);
-     for (int idx = 0; idx < 600; ++idx) {
-      ZkStateReader zkStateReader = coreContainer.getZkController().getZkStateReader();
-      List<String> electionNodes = OverseerCollectionProcessor.getSortedElectionNodes(zkStateReader.getZkClient(),
-          ZkStateReader.getShardLeadersElectPath(collectionName, slice.getName()));
-      for (String testNode : electionNodes) {
-        if (LeaderElector.getNodeName(testNode).equals(nodeName) && oldSeq != LeaderElector.getSeq(testNode)) {
-          return LeaderElector.getSeq(testNode);
-        }
-      }
-
-      Thread.sleep(100);
-    }
-    return -1;
-  }
-  private void rejoinElection(String collectionName, Slice slice, String electionNode, String core,
-                              boolean rejoinAtHead) throws KeeperException, InterruptedException {
-    Replica replica = slice.getReplica(LeaderElector.getNodeName(electionNode));
-    Map<String, Object> propMap = new HashMap<>();
-    propMap.put(COLLECTION_PROP, collectionName);
-    propMap.put(SHARD_ID_PROP, slice.getName());
-    propMap.put(Overseer.QUEUE_OPERATION, REBALANCELEADERS.toLower());
-    propMap.put(CORE_NAME_PROP, core);
-    propMap.put(NODE_NAME_PROP, replica.getName());
-    propMap.put(ZkStateReader.BASE_URL_PROP, replica.getProperties().get(ZkStateReader.BASE_URL_PROP));
-    propMap.put(REJOIN_AT_HEAD_PROP, Boolean.toString(rejoinAtHead)); // Get ourselves to be first in line.
-    propMap.put(ELECTION_NODE_PROP, electionNode);
-    String asyncId = REBALANCELEADERS.toLower() + "_" + core + "_" + Math.abs(System.nanoTime());
-    propMap.put(ASYNC, asyncId);
-    ZkNodeProps m = new ZkNodeProps(propMap);
-    SolrQueryResponse rspIgnore = new SolrQueryResponse(); // I'm constructing my own response
-    handleResponse(REBALANCELEADERS.toLower(), m, rspIgnore); // Want to construct my own response here.
-  }
-
-  // currentAsyncIds - map of request IDs and reporting data (value)
-  // maxWaitSecs - How long are we going to wait? Defaults to 30 seconds.
-  // waitForAll - if true, do not return until all assignments have been made.
-  // results - a place to stash results for reporting back to the user.
-  //
-  private boolean waitForLeaderChange(Map<String, String> currentAsyncIds, final int maxWaitSecs,
-                                      Boolean waitForAll, NamedList<Object> results)
-      throws KeeperException, InterruptedException {
-
-    if (currentAsyncIds.size() == 0) return true;
-
-    for (int idx = 0; idx < maxWaitSecs * 10; ++idx) {
-      Iterator<Map.Entry<String, String>> iter = currentAsyncIds.entrySet().iterator();
-      boolean foundChange = false;
-      while (iter.hasNext()) {
-        Map.Entry<String, String> pair = iter.next();
-        String asyncId = pair.getKey();
-        if (coreContainer.getZkController().getOverseerFailureMap().contains(asyncId)) {
-          coreContainer.getZkController().getOverseerFailureMap().remove(asyncId);
-          NamedList<Object> fails = (NamedList<Object>) results.get("failures");
-          if (fails == null) {
-            fails = new NamedList<>();
-            results.add("failures", fails);
-          }
-          NamedList<Object> res = new NamedList<>();
-          res.add("status", "failed");
-          res.add("msg", "Failed to assign '" + pair.getValue() + "' to be leader");
-          fails.add(asyncId.substring(REBALANCELEADERS.toLower().length()), res);
-          iter.remove();
-          foundChange = true;
-        } else if (coreContainer.getZkController().getOverseerCompletedMap().contains(asyncId)) {
-          coreContainer.getZkController().getOverseerCompletedMap().remove(asyncId);
-          NamedList<Object> successes = (NamedList<Object>) results.get("successes");
-          if (successes == null) {
-            successes = new NamedList<>();
-            results.add("successes", successes);
-          }
-          NamedList<Object> res = new NamedList<>();
-          res.add("status", "success");
-          res.add("msg", "Assigned '" + pair.getValue() + "' to be leader");
-          successes.add(asyncId.substring(REBALANCELEADERS.toLower().length()), res);
-          iter.remove();
-          foundChange = true;
-        }
-      }
-      // We're done if we're processing a few at a time or all requests are processed.
-      if ((foundChange && waitForAll == false) || currentAsyncIds.size() == 0) {
-        return true;
-      }
-      Thread.sleep(100); //TODO: Is there a better thing to do than sleep here?
-    }
-    return false;
-  }
-  private void handleAddReplicaProp(SolrQueryRequest req, SolrQueryResponse rsp) throws KeeperException, InterruptedException {
-    req.getParams().required().check(COLLECTION_PROP, PROPERTY_PROP, SHARD_ID_PROP, REPLICA_PROP, PROPERTY_VALUE_PROP);
-
-
-    Map<String, Object> map = ZkNodeProps.makeMap(Overseer.QUEUE_OPERATION, ADDREPLICAPROP.toLower());
-    copyIfNotNull(req.getParams(), map, COLLECTION_PROP, SHARD_ID_PROP, REPLICA_PROP, PROPERTY_PROP,
-        SHARD_UNIQUE, PROPERTY_VALUE_PROP);
-
-    String property = (String) map.get(PROPERTY_PROP);
-    if (property.startsWith(OverseerCollectionProcessor.COLL_PROP_PREFIX) == false) {
-      property = OverseerCollectionProcessor.COLL_PROP_PREFIX + property;
-    }
-
-    boolean uniquePerSlice = Boolean.parseBoolean((String) map.get(SHARD_UNIQUE));
-
-    // Check if we're trying to set a property with parameters that allow us to set the property on multiple replicas
-    // in a slice on properties that are known to only be one-per-slice and error out if so.
-    if (StringUtils.isNotBlank((String)map.get(SHARD_UNIQUE)) &&
-        SliceMutator.SLICE_UNIQUE_BOOLEAN_PROPERTIES.contains(property.toLowerCase(Locale.ROOT)) &&
-        uniquePerSlice == false) {
-      throw new SolrException(SolrException.ErrorCode.BAD_REQUEST,
-          "Overseer replica property command received for property " + property +
-              " with the " + SHARD_UNIQUE +
-              " parameter set to something other than 'true'. No action taken.");
-    }
-    handleResponse(ADDREPLICAPROP.toLower(), new ZkNodeProps(map), rsp);
-  }
-
-  private void handleDeleteReplicaProp(SolrQueryRequest req, SolrQueryResponse rsp) throws KeeperException, InterruptedException {
-    req.getParams().required().check(COLLECTION_PROP, PROPERTY_PROP, SHARD_ID_PROP, REPLICA_PROP);
-
-    Map<String, Object> map = ZkNodeProps.makeMap(Overseer.QUEUE_OPERATION, DELETEREPLICAPROP.toLower());
-    copyIfNotNull(req.getParams(), map, COLLECTION_PROP, SHARD_ID_PROP, REPLICA_PROP, PROPERTY_PROP);
+      throw new SolrException(ErrorCode.BAD_REQUEST, "action is a required param");
 
-    handleResponse(DELETEREPLICAPROP.toLower(), new ZkNodeProps(map), rsp);
-  }
-
-
-
-  private void handleBalanceShardUnique(SolrQueryRequest req, SolrQueryResponse rsp) throws KeeperException, InterruptedException {
-    req.getParams().required().check(COLLECTION_PROP, PROPERTY_PROP);
-    Boolean shardUnique = Boolean.parseBoolean(req.getParams().get(SHARD_UNIQUE));
-    String prop = req.getParams().get(PROPERTY_PROP).toLowerCase(Locale.ROOT);
-    if (StringUtils.startsWith(prop, OverseerCollectionProcessor.COLL_PROP_PREFIX) == false) {
-      prop = OverseerCollectionProcessor.COLL_PROP_PREFIX + prop;
     }
 
-    if (shardUnique == false &&
-        SliceMutator.SLICE_UNIQUE_BOOLEAN_PROPERTIES.contains(prop) == false) {
-      throw new SolrException(ErrorCode.BAD_REQUEST, "Balancing properties amongst replicas in a slice requires that"
-      + " the property be pre-defined as a unique property (e.g. 'preferredLeader') or that 'shardUnique' be set to 'true'. " +
-      " Property: " + prop + " shardUnique: " + Boolean.toString(shardUnique));
-    }
-
-    Map<String, Object> map = ZkNodeProps.makeMap(Overseer.QUEUE_OPERATION, BALANCESHARDUNIQUE.toLower());
-    copyIfNotNull(req.getParams(), map, COLLECTION_PROP, PROPERTY_PROP, ONLY_ACTIVE_NODES, SHARD_UNIQUE);
-
-    handleResponse(BALANCESHARDUNIQUE.toLower(), new ZkNodeProps(map), rsp);
+    rsp.setHttpCaching(false);
   }
 
-  private void handleOverseerStatus(SolrQueryRequest req, SolrQueryResponse rsp) throws KeeperException, InterruptedException {
-    Map<String, Object> props = ZkNodeProps.makeMap(
-        Overseer.QUEUE_OPERATION, OVERSEERSTATUS.toLower());
-    handleResponse(OVERSEERSTATUS.toLower(), new ZkNodeProps(props), rsp);
-  }
 
-  private void handleProp(SolrQueryRequest req, SolrQueryResponse rsp) throws KeeperException, InterruptedException {
-    req.getParams().required().check(NAME);
-    String name = req.getParams().get(NAME);
-    String val = req.getParams().get(VALUE_LONG);
-    coreContainer.getZkController().getZkStateReader().setClusterProperty(name, val);
-  }
 
-  static Set<String> KNOWN_ROLES = ImmutableSet.of("overseer");
 
-  private void handleRole(CollectionAction action, SolrQueryRequest req, SolrQueryResponse rsp) throws KeeperException, InterruptedException {
-    req.getParams().required().check("role", "node");
-    Map<String, Object> map = ZkNodeProps.makeMap(Overseer.QUEUE_OPERATION, action.toLower());
-    copyIfNotNull(req.getParams(), map,"role", "node");
-    ZkNodeProps m = new ZkNodeProps(map);
-    if(!KNOWN_ROLES.contains(m.getStr("role"))) throw new SolrException(ErrorCode.BAD_REQUEST,"Unknown role. Supported roles are ,"+ KNOWN_ROLES);
-    handleResponse(action.toString().toLowerCase(Locale.ROOT), m, rsp);
-  }
+  static final Set<String> KNOWN_ROLES = ImmutableSet.of("overseer");
 
   public static long DEFAULT_ZK_TIMEOUT = 180*1000;
 
-  private void handleRequestStatus(SolrQueryRequest req, SolrQueryResponse rsp) throws KeeperException, InterruptedException {
-    log.debug("REQUESTSTATUS action invoked: " + req.getParamString());
-    req.getParams().required().check(REQUESTID);
-
-    String requestId = req.getParams().get(REQUESTID);
-
-    if (requestId.equals("-1")) {
-      // Special taskId (-1), clears up the request state maps.
-      if(requestId.equals("-1")) {
-        coreContainer.getZkController().getOverseerCompletedMap().clear();
-        coreContainer.getZkController().getOverseerFailureMap().clear();
-        return;
-      }
-    } else {
-      NamedList<Object> results = new NamedList<>();
-      if (coreContainer.getZkController().getOverseerCompletedMap().contains(requestId)) {
-        SimpleOrderedMap success = new SimpleOrderedMap();
-        success.add("state", "completed");
-        success.add("msg", "found " + requestId + " in completed tasks");
-        results.add("status", success);
-      } else if (coreContainer.getZkController().getOverseerFailureMap().contains(requestId)) {
-        SimpleOrderedMap success = new SimpleOrderedMap();
-        success.add("state", "failed");
-        success.add("msg", "found " + requestId + " in failed tasks");
-        results.add("status", success);
-      } else if (coreContainer.getZkController().getOverseerRunningMap().contains(requestId)) {
-        SimpleOrderedMap success = new SimpleOrderedMap();
-        success.add("state", "running");
-        success.add("msg", "found " + requestId + " in running tasks");
-        results.add("status", success);
-      } else if(overseerCollectionQueueContains(requestId)){
-        SimpleOrderedMap success = new SimpleOrderedMap();
-        success.add("state", "submitted");
-        success.add("msg", "found " + requestId + " in submitted tasks");
-        results.add("status", success);
-      } else {
-        SimpleOrderedMap failure = new SimpleOrderedMap();
-        failure.add("state", "notfound");
-        failure.add("msg", "Did not find taskid [" + requestId + "] in any tasks queue");
-        results.add("status", failure);
-      }
-      SolrResponse response = new OverseerSolrResponse(results);
-
-      rsp.getValues().addAll(response.getResponse());
-    }
-  }
-
-  private boolean overseerCollectionQueueContains(String asyncId) throws KeeperException, InterruptedException {
-    DistributedQueue collectionQueue = coreContainer.getZkController().getOverseerCollectionQueue();
-    return collectionQueue.containsTaskWithRequestId(asyncId);
-  }
-
-  private void handleResponse(String operation, ZkNodeProps m,
+  void handleResponse(String operation, ZkNodeProps m,
                               SolrQueryResponse rsp) throws KeeperException, InterruptedException {
     handleResponse(operation, m, rsp, DEFAULT_ZK_TIMEOUT);
   }
-  
+
   private void handleResponse(String operation, ZkNodeProps m,
       SolrQueryResponse rsp, long timeout) throws KeeperException, InterruptedException {
     long time = System.nanoTime();
 
      if(m.containsKey(ASYNC) && m.get(ASYNC) != null) {
- 
+
        String asyncId = m.getStr(ASYNC);
- 
+
        if(asyncId.equals("-1")) {
          throw new SolrException(ErrorCode.BAD_REQUEST, "requestid can not be -1. It is reserved for cleanup purposes.");
        }
- 
+
        NamedList<String> r = new NamedList<>();
 
        if (coreContainer.getZkController().getOverseerCompletedMap().contains(asyncId) ||
@@ -662,16 +198,16 @@ public class CollectionsHandler extends
            coreContainer.getZkController().getOverseerRunningMap().contains(asyncId) ||
            overseerCollectionQueueContains(asyncId)) {
          r.add("error", "Task with the same requestid already exists.");
- 
+
        } else {
          coreContainer.getZkController().getOverseerCollectionQueue()
              .offer(ZkStateReader.toJSON(m));
        }
        r.add(CoreAdminParams.REQUESTID, (String) m.get(ASYNC));
        SolrResponse response = new OverseerSolrResponse(r);
- 
+
        rsp.getValues().addAll(response.getResponse());
- 
+
        return;
      }
 
@@ -702,139 +238,10 @@ public class CollectionsHandler extends
       }
     }
   }
-  
-  private void handleReloadAction(SolrQueryRequest req, SolrQueryResponse rsp) throws KeeperException, InterruptedException {
-    log.info("Reloading Collection : " + req.getParamString());
-    String name = req.getParams().required().get(NAME);
-    
-    ZkNodeProps m = new ZkNodeProps(Overseer.QUEUE_OPERATION,
-        RELOAD.toLower(), NAME, name);
-
-    handleResponse(RELOAD.toLower(), m, rsp);
-  }
-  
-  private void handleSyncShardAction(SolrQueryRequest req, SolrQueryResponse rsp) throws KeeperException, InterruptedException, SolrServerException, IOException {
-    log.info("Syncing shard : " + req.getParamString());
-    String collection = req.getParams().required().get("collection");
-    String shard = req.getParams().required().get("shard");
-    
-    ClusterState clusterState = coreContainer.getZkController().getClusterState();
-    
-    ZkNodeProps leaderProps = clusterState.getLeader(collection, shard);
-    ZkCoreNodeProps nodeProps = new ZkCoreNodeProps(leaderProps);
-    
-    ;
-    try (HttpSolrClient client = new HttpSolrClient(nodeProps.getBaseUrl())) {
-      client.setConnectionTimeout(15000);
-      client.setSoTimeout(60000);
-      RequestSyncShard reqSyncShard = new CoreAdminRequest.RequestSyncShard();
-      reqSyncShard.setCollection(collection);
-      reqSyncShard.setShard(shard);
-      reqSyncShard.setCoreName(nodeProps.getCoreName());
-      client.request(reqSyncShard);
-    }
-  }
-  
-  private void handleCreateAliasAction(SolrQueryRequest req,
-      SolrQueryResponse rsp) throws Exception {
-    log.info("Create alias action : " + req.getParamString());
-    String name = req.getParams().required().get(NAME);
-    String collections = req.getParams().required().get("collections");
-    
-    ZkNodeProps m = new ZkNodeProps(Overseer.QUEUE_OPERATION,
-        CREATEALIAS.toLower(), NAME, name, "collections",
-        collections);
-    
-    handleResponse(CREATEALIAS.toLower(), m, rsp);
-  }
-  
-  private void handleDeleteAliasAction(SolrQueryRequest req,
-      SolrQueryResponse rsp) throws Exception {
-    log.info("Delete alias action : " + req.getParamString());
-    String name = req.getParams().required().get(NAME);
-    
-    ZkNodeProps m = new ZkNodeProps(Overseer.QUEUE_OPERATION,
-        DELETEALIAS.toLower(), NAME, name);
-    
-    handleResponse(DELETEALIAS.toLower(), m, rsp);
-  }
-
-  private void handleDeleteAction(SolrQueryRequest req, SolrQueryResponse rsp) throws KeeperException, InterruptedException {
-    log.info("Deleting Collection : " + req.getParamString());
-
-    String name = req.getParams().required().get(NAME);
-    
-    ZkNodeProps m = new ZkNodeProps(Overseer.QUEUE_OPERATION,
-        DELETE.toLower(), NAME, name);
-
-    handleResponse(DELETE.toLower(), m, rsp);
-  }
-
-  // very simple currently, you can pass a template collection, and the new collection is created on
-  // every node the template collection is on
-  // there is a lot more to add - you should also be able to create with an explicit server list
-  // we might also want to think about error handling (add the request to a zk queue and involve overseer?)
-  // as well as specific replicas= options
-  private void handleCreateAction(SolrQueryRequest req,
-      SolrQueryResponse rsp) throws InterruptedException, KeeperException {
-    log.info("Creating Collection : " + req.getParamString());
-    String name = req.getParams().required().get(NAME);
-    if (name == null) {
-      log.error("Collection name is required to create a new collection");
-      throw new SolrException(ErrorCode.BAD_REQUEST,
-          "Collection name is required to create a new collection");
-    }
-
-    Map<String,Object> props = ZkNodeProps.makeMap(
-        Overseer.QUEUE_OPERATION,
-        CREATE.toLower(),
-        "fromApi","true");
-    copyIfNotNull(req.getParams(),props,
-        NAME,
-        REPLICATION_FACTOR,
-         COLL_CONF,
-         NUM_SLICES,
-         MAX_SHARDS_PER_NODE,
-         CREATE_NODE_SET, CREATE_NODE_SET_SHUFFLE,
-         SHARDS_PROP,
-         ASYNC,
-         DocCollection.STATE_FORMAT,
-         AUTO_ADD_REPLICAS,
-        "router.");
-    if(props.get(DocCollection.STATE_FORMAT) == null){
-      props.put(DocCollection.STATE_FORMAT,"2");
-    }
-    addRuleMap(req.getParams(), props, "rule");
-    addRuleMap(req.getParams(), props, "snitch");
-
-    if(SYSTEM_COLL.equals(name)){
-      //We must always create asystem collection with only a single shard
-      props.put(NUM_SLICES,1);
-      props.remove(SHARDS_PROP);
-      createSysConfigSet();
-
-    }
-    copyPropertiesIfNotNull(req.getParams(), props);
-
-    ZkNodeProps m = new ZkNodeProps(props);
-    handleResponse(CREATE.toLower(), m, rsp);
-  }
-
-  private void addRuleMap(SolrParams params, Map<String, Object> props, String key) {
-    String[] rules = params.getParams(key);
-    if(rules!= null && rules.length >0){
-      ArrayList<Map> l = new ArrayList<>();
-      for (String rule : rules) l.add(Rule.parseRule(rule));
-      props.put(key, l);
-    }
-  }
 
-  private void createSysConfigSet() throws KeeperException, InterruptedException {
-    SolrZkClient zk = coreContainer.getZkController().getZkStateReader().getZkClient();
-    createNodeIfNotExists(zk,ZkStateReader.CONFIGS_ZKNODE, null);
-    createNodeIfNotExists(zk,ZkStateReader.CONFIGS_ZKNODE+"/"+SYSTEM_COLL, null);
-    createNodeIfNotExists(zk,ZkStateReader.CONFIGS_ZKNODE+"/"+SYSTEM_COLL+"/schema.xml", BlobHandler.SCHEMA.replaceAll("'","\"").getBytes(StandardCharsets.UTF_8));
-    createNodeIfNotExists(zk, ZkStateReader.CONFIGS_ZKNODE + "/" + SYSTEM_COLL + "/solrconfig.xml", BlobHandler.CONF.replaceAll("'", "\"").getBytes(StandardCharsets.UTF_8));
+  private boolean overseerCollectionQueueContains(String asyncId) throws KeeperException, InterruptedException {
+    DistributedQueue collectionQueue = coreContainer.getZkController().getOverseerCollectionQueue();
+    return collectionQueue.containsTaskWithRequestId(asyncId);
   }
 
   public static void createNodeIfNotExists(SolrZkClient zk, String path, byte[] data) throws KeeperException, InterruptedException {
@@ -848,199 +255,452 @@ public class CollectionsHandler extends
     }
   }
 
-
-  private void handleRemoveReplica(SolrQueryRequest req, SolrQueryResponse rsp) throws KeeperException, InterruptedException {
-    log.info("Remove replica: " + req.getParamString());
-    req.getParams().required().check(COLLECTION_PROP, SHARD_ID_PROP, "replica");
-    Map<String, Object> map = makeMap(QUEUE_OPERATION, DELETEREPLICA.toLower());
-    copyIfNotNull(req.getParams(),map,COLLECTION_PROP,SHARD_ID_PROP,"replica", ASYNC, ONLY_IF_DOWN);
-    ZkNodeProps m = new ZkNodeProps(map);
-    handleResponse(DELETEREPLICA.toLower(), m, rsp);
-  }
-
-
-  private void handleCreateShard(SolrQueryRequest req, SolrQueryResponse rsp) throws KeeperException, InterruptedException {
-    log.info("Create shard: " + req.getParamString());
-    req.getParams().required().check(COLLECTION_PROP, SHARD_ID_PROP);
-    ClusterState clusterState = coreContainer.getZkController().getClusterState();
-    if (!ImplicitDocRouter.NAME.equals(((Map) clusterState.getCollection(req.getParams().get(COLLECTION_PROP)).get(DOC_ROUTER)).get(NAME)))
-      throw new SolrException(ErrorCode.BAD_REQUEST, "shards can be added only to 'implicit' collections" );
-
-    Map<String, Object> map = makeMap(QUEUE_OPERATION, CREATESHARD.toLower());
-    copyIfNotNull(req.getParams(),map,COLLECTION_PROP, SHARD_ID_PROP, ZkStateReader.REPLICATION_FACTOR, CREATE_NODE_SET, ASYNC);
-    copyPropertiesIfNotNull(req.getParams(), map);
-    ZkNodeProps m = new ZkNodeProps(map);
-    handleResponse(CREATESHARD.toLower(), m, rsp);
-  }
-
-  private static void copyIfNotNull(SolrParams params, Map<String, Object> props, String... keys) {
-    ArrayList<String> prefixes = new ArrayList<>(1);
-    if(keys !=null){
-      for (String key : keys) {
-        if(key.endsWith(".")) {
-          prefixes.add(key);
-          continue;
-        }
-        String v = params.get(key);
-        if(v != null) props.put(key,v);
-      }
-    }
-    if(prefixes.isEmpty()) return;
-    Iterator<String> it = params.getParameterNamesIterator();
-    String prefix = null;
-    for(;it.hasNext();){
-      String name = it.next();
-      for (int i = 0; i < prefixes.size(); i++) {
-        if(name.startsWith(prefixes.get(i))){
-          String val = params.get(name);
-          if(val !=null) props.put(name,val);
-        }
-      }
-    }
-
-  }
-
-  private void copyPropertiesIfNotNull(SolrParams params, Map<String, Object> props) {
+  private static Map<String, Object> copyPropertiesWithPrefix(SolrParams params, Map<String, Object> props, String prefix) {
     Iterator<String> iter =  params.getParameterNamesIterator();
     while (iter.hasNext()) {
       String param = iter.next();
-      if (param.startsWith(OverseerCollectionProcessor.COLL_PROP_PREFIX)) {
+      if (param.startsWith(prefix)) {
         props.put(param, params.get(param));
       }
     }
+    return props;
   }
 
+  public static ModifiableSolrParams params(String... params) {
+    ModifiableSolrParams msp = new ModifiableSolrParams();
+    for (int i = 0; i < params.length; i += 2) {
+      msp.add(params[i], params[i + 1]);
+    }
+    return msp;
+  }
 
-  private void handleDeleteShardAction(SolrQueryRequest req,
-      SolrQueryResponse rsp) throws InterruptedException, KeeperException {
-    log.info("Deleting Shard : " + req.getParamString());
-    String name = req.getParams().required().get(ZkStateReader.COLLECTION_PROP);
-    String shard = req.getParams().required().get(ZkStateReader.SHARD_ID_PROP);
-    
-    Map<String,Object> props = new HashMap<>();
-    props.put(ZkStateReader.COLLECTION_PROP, name);
-    props.put(Overseer.QUEUE_OPERATION, DELETESHARD.toLower());
-    props.put(ZkStateReader.SHARD_ID_PROP, shard);
+  //////////////////////// SolrInfoMBeans methods //////////////////////
 
-    ZkNodeProps m = new ZkNodeProps(props);
-    handleResponse(DELETESHARD.toLower(), m, rsp);
+  @Override
+  public String getDescription() {
+    return "Manage SolrCloud Collections";
   }
 
-  private void handleSplitShardAction(SolrQueryRequest req, SolrQueryResponse rsp) throws KeeperException, InterruptedException {
-    log.info("Splitting shard : " + req.getParamString());
-    String name = req.getParams().required().get("collection");
-    // TODO : add support for multiple shards
-    String shard = req.getParams().get("shard");
-    String rangesStr = req.getParams().get(CoreAdminParams.RANGES);
-    String splitKey = req.getParams().get("split.key");
+  public static final String SYSTEM_COLL = ".system";
 
-    if (splitKey == null && shard == null) {
-      throw new SolrException( SolrException.ErrorCode.BAD_REQUEST, "Missing required parameter: shard");
-    }
-    if (splitKey != null && shard != null)  {
-      throw new SolrException( SolrException.ErrorCode.BAD_REQUEST,
-          "Only one of 'shard' or 'split.key' should be specified");
-    }
-    if (splitKey != null && rangesStr != null)  {
-      throw new SolrException( SolrException.ErrorCode.BAD_REQUEST,
-          "Only one of 'ranges' or 'split.key' should be specified");
-    }
+  enum CollectionOperation {
+    /**
+     * very simple currently, you can pass a template collection, and the new collection is created on
+     * every node the template collection is on
+     * there is a lot more to add - you should also be able to create with an explicit server list
+     * we might also want to think about error handling (add the request to a zk queue and involve overseer?)
+     * as well as specific replicas= options
+     */
+    CREATE_OP(CREATE) {
+      @Override
+      Map<String, Object> call(SolrQueryRequest req, SolrQueryResponse rsp, CollectionsHandler h)
+          throws KeeperException, InterruptedException {
+        Map<String, Object> props = req.getParams().required().getAll(null, NAME);
+        props.put("fromApi", "true");
+        req.getParams().getAll(props,
+            NAME,
+            REPLICATION_FACTOR,
+            COLL_CONF,
+            NUM_SLICES,
+            MAX_SHARDS_PER_NODE,
+            CREATE_NODE_SET, CREATE_NODE_SET_SHUFFLE,
+            SHARDS_PROP,
+            ASYNC,
+            STATE_FORMAT,
+            AUTO_ADD_REPLICAS);
 
-    Map<String,Object> props = new HashMap<>();
-    props.put(Overseer.QUEUE_OPERATION, SPLITSHARD.toLower());
-    props.put("collection", name);
-    if (shard != null)  {
-      props.put(ZkStateReader.SHARD_ID_PROP, shard);
-    }
-    if (splitKey != null) {
-      props.put("split.key", splitKey);
-    }
-    if (rangesStr != null)  {
-      props.put(CoreAdminParams.RANGES, rangesStr);
-    }
+        if (props.get(STATE_FORMAT) == null) {
+          props.put(STATE_FORMAT, "2");
+        }
+        addRuleMap(req.getParams(), props, "rule");
+        addRuleMap(req.getParams(), props, "snitch");
 
-    if (req.getParams().get(ASYNC) != null)
-      props.put(ASYNC, req.getParams().get(ASYNC));
+        if (SYSTEM_COLL.equals(props.get(NAME))) {
+          //We must always create asystem collection with only a single shard
+          props.put(NUM_SLICES, 1);
+          props.remove(SHARDS_PROP);
+          createSysConfigSet(h.coreContainer);
 
-    copyPropertiesIfNotNull(req.getParams(), props);
+        }
+        copyPropertiesWithPrefix(req.getParams(), props, COLL_PROP_PREFIX);
+        return copyPropertiesWithPrefix(req.getParams(), props, "router.");
 
-    ZkNodeProps m = new ZkNodeProps(props);
+      }
 
-    handleResponse(SPLITSHARD.toLower(), m, rsp, DEFAULT_ZK_TIMEOUT * 5);
-  }
+      private void addRuleMap(SolrParams params, Map<String, Object> props, String key) {
+        String[] rules = params.getParams(key);
+        if (rules != null && rules.length > 0) {
+          ArrayList<Map> l = new ArrayList<>();
+          for (String rule : rules) l.add(Rule.parseRule(rule));
+          props.put(key, l);
+        }
+      }
 
-  private void handleMigrate(SolrQueryRequest req, SolrQueryResponse rsp) throws KeeperException, InterruptedException {
-    log.info("Migrate action invoked: " + req.getParamString());
-    req.getParams().required().check("collection", "split.key", "target.collection");
-    Map<String,Object> props = new HashMap<>();
-    props.put(Overseer.QUEUE_OPERATION, MIGRATE.toLower());
-    copyIfNotNull(req.getParams(), props, "collection", "split.key", "target.collection", "forward.timeout", ASYNC);
-    ZkNodeProps m = new ZkNodeProps(props);
-    handleResponse(MIGRATE.toLower(), m, rsp, DEFAULT_ZK_TIMEOUT * 20);
-  }
+      private void createSysConfigSet(CoreContainer coreContainer) throws KeeperException, InterruptedException {
+        SolrZkClient zk = coreContainer.getZkController().getZkStateReader().getZkClient();
+        createNodeIfNotExists(zk, CONFIGS_ZKNODE, null);
+        createNodeIfNotExists(zk, CONFIGS_ZKNODE + "/" + SYSTEM_COLL, null);
+        createNodeIfNotExists(zk, CONFIGS_ZKNODE + "/" + SYSTEM_COLL + "/schema.xml",
+            BlobHandler.SCHEMA.replaceAll("'", "\"").getBytes(UTF_8));
+        createNodeIfNotExists(zk, CONFIGS_ZKNODE + "/" + SYSTEM_COLL + "/solrconfig.xml",
+            BlobHandler.CONF.replaceAll("'", "\"").getBytes(UTF_8));
+      }
+    },
+    DELETE_OP(DELETE) {
+      @Override
+      Map<String, Object> call(SolrQueryRequest req, SolrQueryResponse rsp, CollectionsHandler handler)
+          throws Exception {
+        return req.getParams().required().getAll(null, NAME);
+      }
+    },
+    RELOAD_OP(RELOAD) {
+      @Override
+      Map<String, Object> call(SolrQueryRequest req, SolrQueryResponse rsp, CollectionsHandler handler)
+          throws Exception {
+        return req.getParams().required().getAll(null, NAME);
+
+      }
+    },
+    SYNCSHARD_OP(SYNCSHARD) {
+      @Override
+      Map<String, Object> call(SolrQueryRequest req, SolrQueryResponse rsp, CollectionsHandler h)
+          throws Exception {
+        String collection = req.getParams().required().get("collection");
+        String shard = req.getParams().required().get("shard");
+
+        ClusterState clusterState = h.coreContainer.getZkController().getClusterState();
+
+        ZkNodeProps leaderProps = clusterState.getLeader(collection, shard);
+        ZkCoreNodeProps nodeProps = new ZkCoreNodeProps(leaderProps);
+
+        ;
+        try (HttpSolrClient client = new HttpSolrClient(nodeProps.getBaseUrl())) {
+          client.setConnectionTimeout(15000);
+          client.setSoTimeout(60000);
+          RequestSyncShard reqSyncShard = new CoreAdminRequest.RequestSyncShard();
+          reqSyncShard.setCollection(collection);
+          reqSyncShard.setShard(shard);
+          reqSyncShard.setCoreName(nodeProps.getCoreName());
+          client.request(reqSyncShard);
+        }
+        return null;
+      }
 
-  private void handleAddReplica(SolrQueryRequest req, SolrQueryResponse rsp) throws KeeperException, InterruptedException  {
-    log.info("Add replica action invoked: " + req.getParamString());
-    Map<String,Object> props = new HashMap<>();
-    props.put(Overseer.QUEUE_OPERATION, CollectionAction.ADDREPLICA.toString());
-    copyIfNotNull(req.getParams(), props, COLLECTION_PROP, "node", SHARD_ID_PROP, ShardParams._ROUTE_,
-        CoreAdminParams.NAME, CoreAdminParams.INSTANCE_DIR, CoreAdminParams.DATA_DIR, ASYNC);
-    copyPropertiesIfNotNull(req.getParams(), props);
-    ZkNodeProps m = new ZkNodeProps(props);
-    handleResponse(CollectionAction.ADDREPLICA.toString(), m, rsp);
-  }
+    },
+    CREATEALIAS_OP(CREATEALIAS) {
+      @Override
+      Map<String, Object> call(SolrQueryRequest req, SolrQueryResponse rsp, CollectionsHandler handler)
+          throws Exception {
+        return req.getParams().required().getAll(null, NAME, "collections");
+      }
+    },
+    DELETEALIAS_OP(DELETEALIAS) {
+      @Override
+      Map<String, Object> call(SolrQueryRequest req, SolrQueryResponse rsp, CollectionsHandler handler)
+          throws Exception {
+        return req.getParams().required().getAll(null, NAME);
+      }
+
+    },
+    SPLITSHARD_OP(SPLITSHARD, DEFAULT_ZK_TIMEOUT * 5) {
+      @Override
+      Map<String, Object> call(SolrQueryRequest req, SolrQueryResponse rsp, CollectionsHandler h)
+          throws Exception {
+        String name = req.getParams().required().get(COLLECTION_PROP);
+        // TODO : add support for multiple shards
+        String shard = req.getParams().get(SHARD_ID_PROP);
+        String rangesStr = req.getParams().get(CoreAdminParams.RANGES);
+        String splitKey = req.getParams().get("split.key");
 
-  /**
-   * Handle cluster status request.
-   * Can return status per specific collection/shard or per all collections.
-   *
-   * @param req solr request
-   * @param rsp solr response
-   */
-  private void handleClusterStatus(SolrQueryRequest req, SolrQueryResponse rsp) throws KeeperException, InterruptedException {
-    Map<String,Object> props = new HashMap<>();
-    props.put(Overseer.QUEUE_OPERATION, CollectionAction.CLUSTERSTATUS.toLower());
-    copyIfNotNull(req.getParams(), props, COLLECTION_PROP, SHARD_ID_PROP, ShardParams._ROUTE_);
-    handleResponse(CollectionAction.CLUSTERSTATUS.toString(), new ZkNodeProps(props), rsp);
-  }
+        if (splitKey == null && shard == null) {
+          throw new SolrException(ErrorCode.BAD_REQUEST, "Missing required parameter: shard");
+        }
+        if (splitKey != null && shard != null) {
+          throw new SolrException(ErrorCode.BAD_REQUEST,
+              "Only one of 'shard' or 'split.key' should be specified");
+        }
+        if (splitKey != null && rangesStr != null) {
+          throw new SolrException(ErrorCode.BAD_REQUEST,
+              "Only one of 'ranges' or 'split.key' should be specified");
+        }
 
-  /**
-   * Handled list collection request.
-   * Do list collection request to zk host
-   *
-   * @param req solr request
-   * @param rsp solr response
-   * @throws KeeperException      zk connection failed
-   * @throws InterruptedException connection interrupted
-   */
-  private void handleListAction(SolrQueryRequest req, SolrQueryResponse rsp) throws KeeperException, InterruptedException {
-    NamedList<Object> results = new NamedList<>();
-    Set<String> collections = coreContainer.getZkController().getZkStateReader().getClusterState().getCollections();
-    List<String> collectionList = new ArrayList<>();
-    for (String collection : collections) {
-      collectionList.add(collection);
-    }
-    results.add("collections", collectionList);
-    SolrResponse response = new OverseerSolrResponse(results);
+        Map<String, Object> map = req.getParams().getAll(null,
+            COLLECTION_PROP,
+            SHARD_ID_PROP,
+            "split.key",
+            CoreAdminParams.RANGES,
+            ASYNC);
+        return copyPropertiesWithPrefix(req.getParams(), map, COLL_PROP_PREFIX);
+      }
+    },
+    DELETESHARD_OP(DELETESHARD) {
+      @Override
+      Map<String, Object> call(SolrQueryRequest req, SolrQueryResponse rsp, CollectionsHandler handler) throws Exception {
+        return req.getParams().required().getAll(null,
+            COLLECTION_PROP,
+            SHARD_ID_PROP);
+      }
+    },
+    CREATESHARD_OP(CREATESHARD) {
+      @Override
+      Map<String, Object> call(SolrQueryRequest req, SolrQueryResponse rsp, CollectionsHandler handler) throws Exception {
+        Map<String, Object> map = req.getParams().required().getAll(null,
+            COLLECTION_PROP,
+            SHARD_ID_PROP);
+        ClusterState clusterState = handler.coreContainer.getZkController().getClusterState();
+        if (!ImplicitDocRouter.NAME.equals(((Map) clusterState.getCollection(req.getParams().get(COLLECTION_PROP)).get(DOC_ROUTER)).get(NAME)))
+          throw new SolrException(ErrorCode.BAD_REQUEST, "shards can be added only to 'implicit' collections");
+        req.getParams().getAll(map,
+            REPLICATION_FACTOR,
+            CREATE_NODE_SET, ASYNC);
+        return copyPropertiesWithPrefix(req.getParams(), map, COLL_PROP_PREFIX);
+      }
+    },
+    DELETEREPLICA_OP(DELETEREPLICA) {
+      @Override
+      Map<String, Object> call(SolrQueryRequest req, SolrQueryResponse rsp, CollectionsHandler handler) throws Exception {
+        Map<String, Object> map = req.getParams().required().getAll(null,
+            COLLECTION_PROP,
+            SHARD_ID_PROP,
+            REPLICA_PROP);
+        return req.getParams().getAll(map, ASYNC, ONLY_IF_DOWN);
+      }
+    },
+    MIGRATE_OP(MIGRATE) {
+      @Override
+      Map<String, Object> call(SolrQueryRequest req, SolrQueryResponse rsp, CollectionsHandler h) throws Exception {
+        Map<String, Object> map = req.getParams().required().getAll(null, COLLECTION_PROP, "split.key", "target.collection");
+        return req.getParams().getAll(map, "forward.timeout", ASYNC);
+      }
+    },
+    ADDROLE_OP(ADDROLE) {
+      @Override
+      Map<String, Object> call(SolrQueryRequest req, SolrQueryResponse rsp, CollectionsHandler handler) throws Exception {
+        Map<String, Object> map = req.getParams().required().getAll(null, "role", "node");
+        if (!KNOWN_ROLES.contains(map.get("role")))
+          throw new SolrException(ErrorCode.BAD_REQUEST, "Unknown role. Supported roles are ," + KNOWN_ROLES);
+        return map;
+      }
+    },
+    REMOVEROLE_OP(REMOVEROLE) {
+      @Override
+      Map<String, Object> call(SolrQueryRequest req, SolrQueryResponse rsp, CollectionsHandler h) throws Exception {
+        Map<String, Object> map = req.getParams().required().getAll(null, "role", "node");
+        if (!KNOWN_ROLES.contains(map.get("role")))
+          throw new SolrException(ErrorCode.BAD_REQUEST, "Unknown role. Supported roles are ," + KNOWN_ROLES);
+        return map;
+      }
+    },
+    CLUSTERPROP_OP(CLUSTERPROP) {
+      @Override
+      Map<String, Object> call(SolrQueryRequest req, SolrQueryResponse rsp, CollectionsHandler h) throws Exception {
+        String name = req.getParams().required().get(NAME);
+        String val = req.getParams().get(VALUE_LONG);
+        h.coreContainer.getZkController().getZkStateReader().setClusterProperty(name, val);
+        return null;
+      }
+    },
+    REQUESTSTATUS_OP(REQUESTSTATUS) {
+      @Override
+      Map<String, Object> call(SolrQueryRequest req, SolrQueryResponse rsp, CollectionsHandler h) throws Exception {
+        CoreContainer coreContainer = h.coreContainer;
+        req.getParams().required().check(REQUESTID);
+
+        String requestId = req.getParams().get(REQUESTID);
+
+        if (requestId.equals("-1")) {
+          // Special taskId (-1), clears up the request state maps.
+          if (requestId.equals("-1")) {
+            coreContainer.getZkController().getOverseerCompletedMap().clear();
+            coreContainer.getZkController().getOverseerFailureMap().clear();
+            return null;
+          }
+        } else {
+          NamedList<Object> results = new NamedList<>();
+          if (coreContainer.getZkController().getOverseerCompletedMap().contains(requestId)) {
+            SimpleOrderedMap success = new SimpleOrderedMap();
+            success.add("state", "completed");
+            success.add("msg", "found " + requestId + " in completed tasks");
+            results.add("status", success);
+          } else if (coreContainer.getZkController().getOverseerFailureMap().contains(requestId)) {
+            SimpleOrderedMap success = new SimpleOrderedMap();
+            success.add("state", "failed");
+            success.add("msg", "found " + requestId + " in failed tasks");
+            results.add("status", success);
+          } else if (coreContainer.getZkController().getOverseerRunningMap().contains(requestId)) {
+            SimpleOrderedMap success = new SimpleOrderedMap();
+            success.add("state", "running");
+            success.add("msg", "found " + requestId + " in running tasks");
+            results.add("status", success);
+          } else if (h.overseerCollectionQueueContains(requestId)) {
+            SimpleOrderedMap success = new SimpleOrderedMap();
+            success.add("state", "submitted");
+            success.add("msg", "found " + requestId + " in submitted tasks");
+            results.add("status", success);
+          } else {
+            SimpleOrderedMap failure = new SimpleOrderedMap();
+            failure.add("state", "notfound");
+            failure.add("msg", "Did not find taskid [" + requestId + "] in any tasks queue");
+            results.add("status", failure);
+          }
+          SolrResponse response = new OverseerSolrResponse(results);
 
-    rsp.getValues().addAll(response.getResponse());
-  }
+          rsp.getValues().addAll(response.getResponse());
+        }
+        return null;
+      }
 
+    },
+    ADDREPLICA_OP(ADDREPLICA) {
+      @Override
+      Map<String, Object> call(SolrQueryRequest req, SolrQueryResponse rsp, CollectionsHandler h)
+          throws Exception {
+        Map<String, Object> props = req.getParams().getAll(null,
+            COLLECTION_PROP,
+            "node",
+            SHARD_ID_PROP,
+            _ROUTE_,
+            CoreAdminParams.NAME,
+            INSTANCE_DIR,
+            DATA_DIR,
+            ASYNC);
+        return copyPropertiesWithPrefix(req.getParams(), props, COLL_PROP_PREFIX);
+      }
+    },
+    OVERSEERSTATUS_OP(OVERSEERSTATUS) {
+      @Override
+      Map<String, Object> call(SolrQueryRequest req, SolrQueryResponse rsp, CollectionsHandler h) throws Exception {
+        return new LinkedHashMap<>();
+      }
+    },
+
+    /**
+     * Handle list collection request.
+     * Do list collection request to zk host
+     */
+    LIST_OP(LIST) {
+      @Override
+      Map<String, Object> call(SolrQueryRequest req, SolrQueryResponse rsp, CollectionsHandler handler) throws Exception {
+        NamedList<Object> results = new NamedList<>();
+        Set<String> collections = handler.coreContainer.getZkController().getZkStateReader().getClusterState().getCollections();
+        List<String> collectionList = new ArrayList<>();
+        for (String collection : collections) {
+          collectionList.add(collection);
+        }
+        results.add("collections", collectionList);
+        SolrResponse response = new OverseerSolrResponse(results);
+        rsp.getValues().addAll(response.getResponse());
+        return null;
+      }
+    },
+    /**
+     * Handle cluster status request.
+     * Can return status per specific collection/shard or per all collections.
+     */
+    CLUSTERSTATUS_OP(CLUSTERSTATUS) {
+      @Override
+      Map<String, Object> call(SolrQueryRequest req, SolrQueryResponse rsp, CollectionsHandler handler)
+          throws KeeperException, InterruptedException {
+        return req.getParams().getAll(null,
+            COLLECTION_PROP,
+            SHARD_ID_PROP,
+            _ROUTE_);
+      }
+    },
+    ADDREPLICAPROP_OP(ADDREPLICAPROP) {
+      @Override
+      Map<String, Object> call(SolrQueryRequest req, SolrQueryResponse rsp, CollectionsHandler h) throws Exception {
+        Map<String, Object> map = req.getParams().required().getAll(null,
+            COLLECTION_PROP,
+            PROPERTY_PROP,
+            SHARD_ID_PROP,
+            REPLICA_PROP,
+            PROPERTY_VALUE_PROP);
+        req.getParams().getAll(map, SHARD_UNIQUE);
+        String property = (String) map.get(PROPERTY_PROP);
+        if (!property.startsWith(COLL_PROP_PREFIX)) {
+          property = COLL_PROP_PREFIX + property;
+        }
 
-  public static ModifiableSolrParams params(String... params) {
-    ModifiableSolrParams msp = new ModifiableSolrParams();
-    for (int i=0; i<params.length; i+=2) {
-      msp.add(params[i], params[i+1]);
+        boolean uniquePerSlice = Boolean.parseBoolean((String) map.get(SHARD_UNIQUE));
+
+        // Check if we're trying to set a property with parameters that allow us to set the property on multiple replicas
+        // in a slice on properties that are known to only be one-per-slice and error out if so.
+        if (StringUtils.isNotBlank((String) map.get(SHARD_UNIQUE)) &&
+            SliceMutator.SLICE_UNIQUE_BOOLEAN_PROPERTIES.contains(property.toLowerCase(Locale.ROOT)) &&
+            uniquePerSlice == false) {
+          throw new SolrException(SolrException.ErrorCode.BAD_REQUEST,
+              "Overseer replica property command received for property " + property +
+                  " with the " + SHARD_UNIQUE +
+                  " parameter set to something other than 'true'. No action taken.");
+        }
+        return map;
+      }
+    },
+    DELETEREPLICAPROP_OP(DELETEREPLICAPROP) {
+      @Override
+      Map<String, Object> call(SolrQueryRequest req, SolrQueryResponse rsp, CollectionsHandler h) throws Exception {
+        Map<String, Object> map = req.getParams().required().getAll(null,
+            COLLECTION_PROP,
+            PROPERTY_PROP,
+            SHARD_ID_PROP,
+            REPLICA_PROP);
+        return req.getParams().getAll(map, PROPERTY_PROP);
+      }
+    },
+    BALANCESHARDUNIQUE_OP(BALANCESHARDUNIQUE) {
+      @Override
+      Map<String, Object> call(SolrQueryRequest req, SolrQueryResponse rsp, CollectionsHandler h) throws Exception {
+        Map<String, Object> map = req.getParams().required().getAll(null,
+            COLLECTION_PROP,
+            PROPERTY_PROP);
+        Boolean shardUnique = Boolean.parseBoolean(req.getParams().get(SHARD_UNIQUE));
+        String prop = req.getParams().get(PROPERTY_PROP).toLowerCase(Locale.ROOT);
+        if (!StringUtils.startsWith(prop, COLL_PROP_PREFIX)) {
+          prop = COLL_PROP_PREFIX + prop;
+        }
+
+        if (!shardUnique &&
+            !SliceMutator.SLICE_UNIQUE_BOOLEAN_PROPERTIES.contains(prop)) {
+          throw new SolrException(ErrorCode.BAD_REQUEST, "Balancing properties amongst replicas in a slice requires that"
+              + " the property be pre-defined as a unique property (e.g. 'preferredLeader') or that 'shardUnique' be set to 'true'. " +
+              " Property: " + prop + " shardUnique: " + Boolean.toString(shardUnique));
+        }
+
+        return req.getParams().getAll(map, ONLY_ACTIVE_NODES, SHARD_UNIQUE);
+      }
+    },
+    REBALANCELEADERS_OP(REBALANCELEADERS) {
+      @Override
+      Map<String, Object> call(SolrQueryRequest req, SolrQueryResponse rsp, CollectionsHandler h) throws Exception {
+        new RebalanceLeaders(req,rsp,h).execute();
+        return null;
+      }
+    };
+    CollectionAction action;
+    long timeOut;
+
+    CollectionOperation(CollectionAction action) {
+      this(action, DEFAULT_ZK_TIMEOUT);
     }
-    return msp;
-  }
 
-  //////////////////////// SolrInfoMBeans methods //////////////////////
+    CollectionOperation(CollectionAction action, long timeOut) {
+      this.action = action;
+      this.timeOut = timeOut;
+    }
 
-  @Override
-  public String getDescription() {
-    return "Manage SolrCloud Collections";
+    /**
+     * All actions must implement this method. If a non null map is returned , the action name is added to
+     * the map and sent to overseer for processing. If it returns a null, the call returns immediately
+     */
+    abstract Map<String, Object> call(SolrQueryRequest req, SolrQueryResponse rsp, CollectionsHandler h) throws Exception;
+
+    public static CollectionOperation get(CollectionAction action) {
+      for (CollectionOperation op : values()) {
+        if (op.action == action) return op;
+      }
+      throw new SolrException(ErrorCode.SERVER_ERROR, "No such action" + action);
+    }
   }
-  public static final String SYSTEM_COLL =".system";
 
 }

Added: lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/admin/RebalanceLeaders.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/admin/RebalanceLeaders.java?rev=1679397&view=auto
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/admin/RebalanceLeaders.java (added)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/admin/RebalanceLeaders.java Thu May 14 16:25:52 2015
@@ -0,0 +1,327 @@
+package org.apache.solr.handler.admin;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.solr.cloud.LeaderElector;
+import org.apache.solr.cloud.OverseerCollectionProcessor;
+import org.apache.solr.cloud.overseer.SliceMutator;
+import org.apache.solr.common.SolrException;
+import org.apache.solr.common.cloud.ClusterState;
+import org.apache.solr.common.cloud.DocCollection;
+import org.apache.solr.common.cloud.Replica;
+import org.apache.solr.common.cloud.Slice;
+import org.apache.solr.common.cloud.ZkNodeProps;
+import org.apache.solr.common.cloud.ZkStateReader;
+import org.apache.solr.common.util.NamedList;
+import org.apache.solr.core.CoreContainer;
+import org.apache.solr.request.SolrQueryRequest;
+import org.apache.solr.response.SolrQueryResponse;
+import org.apache.zookeeper.KeeperException;
+
+import static org.apache.solr.cloud.Overseer.QUEUE_OPERATION;
+import static org.apache.solr.cloud.OverseerCollectionProcessor.ASYNC;
+import static org.apache.solr.common.cloud.ZkStateReader.COLLECTION_PROP;
+import static org.apache.solr.common.cloud.ZkStateReader.CORE_NAME_PROP;
+import static org.apache.solr.common.cloud.ZkStateReader.ELECTION_NODE_PROP;
+import static org.apache.solr.common.cloud.ZkStateReader.LEADER_PROP;
+import static org.apache.solr.common.cloud.ZkStateReader.MAX_AT_ONCE_PROP;
+import static org.apache.solr.common.cloud.ZkStateReader.MAX_WAIT_SECONDS_PROP;
+import static org.apache.solr.common.cloud.ZkStateReader.NODE_NAME_PROP;
+import static org.apache.solr.common.cloud.ZkStateReader.REJOIN_AT_HEAD_PROP;
+import static org.apache.solr.common.cloud.ZkStateReader.SHARD_ID_PROP;
+import static org.apache.solr.common.params.CollectionParams.CollectionAction.REBALANCELEADERS;
+
+class RebalanceLeaders {
+  final SolrQueryRequest req;
+  final SolrQueryResponse rsp;
+  final CollectionsHandler collectionsHandler;
+  final CoreContainer coreContainer;
+
+  RebalanceLeaders(SolrQueryRequest req, SolrQueryResponse rsp, CollectionsHandler collectionsHandler) {
+    this.req = req;
+    this.rsp = rsp;
+    this.collectionsHandler = collectionsHandler;
+    coreContainer = collectionsHandler.getCoreContainer();
+  }
+
+  void execute() throws KeeperException, InterruptedException {
+    req.getParams().required().check(COLLECTION_PROP);
+
+    String collectionName = req.getParams().get(COLLECTION_PROP);
+    if (StringUtils.isBlank(collectionName)) {
+      throw new SolrException(SolrException.ErrorCode.BAD_REQUEST,
+          String.format(Locale.ROOT, "The " + COLLECTION_PROP + " is required for the REASSIGNLEADERS command."));
+    }
+    coreContainer.getZkController().getZkStateReader().updateClusterState(true);
+    ClusterState clusterState = coreContainer.getZkController().getClusterState();
+    DocCollection dc = clusterState.getCollection(collectionName);
+    if (dc == null) {
+      throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Collection '" + collectionName + "' does not exist, no action taken.");
+    }
+    Map<String, String> currentRequests = new HashMap<>();
+    int max = req.getParams().getInt(MAX_AT_ONCE_PROP, Integer.MAX_VALUE);
+    if (max <= 0) max = Integer.MAX_VALUE;
+    int maxWaitSecs = req.getParams().getInt(MAX_WAIT_SECONDS_PROP, 60);
+    NamedList<Object> results = new NamedList<>();
+
+    boolean keepGoing = true;
+    for (Slice slice : dc.getSlices()) {
+      insurePreferredIsLeader(results, slice, currentRequests);
+      if (currentRequests.size() == max) {
+        CollectionsHandler.log.info("Queued " + max + " leader reassignments, waiting for some to complete.");
+        keepGoing = waitForLeaderChange(currentRequests, maxWaitSecs, false, results);
+        if (keepGoing == false) {
+          break; // If we've waited longer than specified, don't continue to wait!
+        }
+      }
+    }
+    if (keepGoing == true) {
+      keepGoing = waitForLeaderChange(currentRequests, maxWaitSecs, true, results);
+    }
+    if (keepGoing == true) {
+      CollectionsHandler.log.info("All leader reassignments completed.");
+    } else {
+      CollectionsHandler.log.warn("Exceeded specified timeout of ." + maxWaitSecs + "' all leaders may not have been reassigned");
+    }
+
+    rsp.getValues().addAll(results);
+  }
+
+  private void insurePreferredIsLeader(NamedList<Object> results,
+                                       Slice slice, Map<String, String> currentRequests) throws KeeperException, InterruptedException {
+    final String inactivePreferreds = "inactivePreferreds";
+    final String alreadyLeaders = "alreadyLeaders";
+    String collectionName = req.getParams().get(COLLECTION_PROP);
+
+    for (Replica replica : slice.getReplicas()) {
+      // Tell the replica to become the leader if we're the preferred leader AND active AND not the leader already
+      if (replica.getBool(SliceMutator.PREFERRED_LEADER_PROP, false) == false) {
+        continue;
+      }
+      // OK, we are the preferred leader, are we the actual leader?
+      if (replica.getBool(LEADER_PROP, false)) {
+        //We're a preferred leader, but we're _also_ the leader, don't need to do anything.
+        NamedList<Object> noops = (NamedList<Object>) results.get(alreadyLeaders);
+        if (noops == null) {
+          noops = new NamedList<>();
+          results.add(alreadyLeaders, noops);
+        }
+        NamedList<Object> res = new NamedList<>();
+        res.add("status", "success");
+        res.add("msg", "Already leader");
+        res.add("shard", slice.getName());
+        res.add("nodeName", replica.getNodeName());
+        noops.add(replica.getName(), res);
+        return; // already the leader, do nothing.
+      }
+
+      // We're the preferred leader, but someone else is leader. Only become leader if we're active.
+      if (replica.getState() != Replica.State.ACTIVE) {
+        NamedList<Object> inactives = (NamedList<Object>) results.get(inactivePreferreds);
+        if (inactives == null) {
+          inactives = new NamedList<>();
+          results.add(inactivePreferreds, inactives);
+        }
+        NamedList<Object> res = new NamedList<>();
+        res.add("status", "skipped");
+        res.add("msg", "Node is a referredLeader, but it's inactive. Skipping");
+        res.add("shard", slice.getName());
+        res.add("nodeName", replica.getNodeName());
+        inactives.add(replica.getName(), res);
+        return; // Don't try to become the leader if we're not active!
+      }
+
+      // Replica is the preferred leader but not the actual leader, do something about that.
+      // "Something" is
+      // 1> if the preferred leader isn't first in line, tell it to re-queue itself.
+      // 2> tell the actual leader to re-queue itself.
+
+      ZkStateReader zkStateReader = coreContainer.getZkController().getZkStateReader();
+
+      List<String> electionNodes = OverseerCollectionProcessor.getSortedElectionNodes(zkStateReader.getZkClient(),
+          ZkStateReader.getShardLeadersElectPath(collectionName, slice.getName()));
+
+      if (electionNodes.size() < 2) { // if there's only one node in the queue, should already be leader and we shouldn't be here anyway.
+        CollectionsHandler.log.warn("Rebalancing leaders and slice " + slice.getName() + " has less than two elements in the leader " +
+            "election queue, but replica " + replica.getName() + " doesn't think it's the leader. Do nothing");
+        return;
+      }
+
+      // Ok, the sorting for election nodes is a bit strange. If the sequence numbers are the same, then the whole
+      // string is used, but that sorts nodes with the same sequence number by their session IDs from ZK.
+      // While this is determinate, it's not quite what we need, so re-queue nodes that aren't us and are
+      // watching the leader node..
+
+      String firstWatcher = electionNodes.get(1);
+
+      if (LeaderElector.getNodeName(firstWatcher).equals(replica.getName()) == false) {
+        makeReplicaFirstWatcher(collectionName, slice, replica);
+      }
+
+      String coreName = slice.getReplica(LeaderElector.getNodeName(electionNodes.get(0))).getStr(CORE_NAME_PROP);
+      rejoinElection(collectionName, slice, electionNodes.get(0), coreName, false);
+      waitForNodeChange(collectionName, slice, electionNodes.get(0));
+
+
+      return; // Done with this slice, skip the rest of the replicas.
+    }
+  }
+  // Put the replica in at the head of the queue and send all nodes with the same sequence number to the back of the list
+  void makeReplicaFirstWatcher(String collectionName, Slice slice, Replica replica)
+      throws KeeperException, InterruptedException {
+
+    ZkStateReader zkStateReader = coreContainer.getZkController().getZkStateReader();
+    List<String> electionNodes = OverseerCollectionProcessor.getSortedElectionNodes(zkStateReader.getZkClient(),
+        ZkStateReader.getShardLeadersElectPath(collectionName, slice.getName()));
+
+    // First, queue up the preferred leader at the head of the queue.
+    int newSeq = -1;
+    for (String electionNode : electionNodes) {
+      if (LeaderElector.getNodeName(electionNode).equals(replica.getName())) {
+        String coreName = slice.getReplica(LeaderElector.getNodeName(electionNode)).getStr(CORE_NAME_PROP);
+        rejoinElection(collectionName, slice, electionNode, coreName, true);
+        newSeq = waitForNodeChange(collectionName, slice, electionNode);
+        break;
+      }
+    }
+    if (newSeq == -1) {
+      return; // let's not continue if we didn't get what we expect. Possibly we're offline etc..
+    }
+
+    List<String> electionNodesTmp = OverseerCollectionProcessor.getSortedElectionNodes(zkStateReader.getZkClient(),
+        ZkStateReader.getShardLeadersElectPath(collectionName, slice.getName()));
+
+
+    // Now find other nodes that have the same sequence number as this node and re-queue them at the end of the queue.
+    electionNodes = OverseerCollectionProcessor.getSortedElectionNodes(zkStateReader.getZkClient(),
+        ZkStateReader.getShardLeadersElectPath(collectionName, slice.getName()));
+
+    for (String thisNode : electionNodes) {
+      if (LeaderElector.getSeq(thisNode) > newSeq) {
+        break;
+      }
+      if (LeaderElector.getNodeName(thisNode).equals(replica.getName())) {
+        continue;
+      }
+      if (LeaderElector.getSeq(thisNode) == newSeq) {
+        String coreName = slice.getReplica(LeaderElector.getNodeName(thisNode)).getStr(CORE_NAME_PROP);
+        rejoinElection(collectionName, slice, thisNode, coreName, false);
+        waitForNodeChange(collectionName, slice, thisNode);
+      }
+    }
+  }
+
+  int waitForNodeChange(String collectionName, Slice slice, String electionNode) throws InterruptedException, KeeperException {
+    String nodeName = LeaderElector.getNodeName(electionNode);
+    int oldSeq = LeaderElector.getSeq(electionNode);
+    for (int idx = 0; idx < 600; ++idx) {
+      ZkStateReader zkStateReader = coreContainer.getZkController().getZkStateReader();
+      List<String> electionNodes = OverseerCollectionProcessor.getSortedElectionNodes(zkStateReader.getZkClient(),
+          ZkStateReader.getShardLeadersElectPath(collectionName, slice.getName()));
+      for (String testNode : electionNodes) {
+        if (LeaderElector.getNodeName(testNode).equals(nodeName) && oldSeq != LeaderElector.getSeq(testNode)) {
+          return LeaderElector.getSeq(testNode);
+        }
+      }
+
+      Thread.sleep(100);
+    }
+    return -1;
+  }
+  private void rejoinElection(String collectionName, Slice slice, String electionNode, String core,
+                              boolean rejoinAtHead) throws KeeperException, InterruptedException {
+    Replica replica = slice.getReplica(LeaderElector.getNodeName(electionNode));
+    Map<String, Object> propMap = new HashMap<>();
+    propMap.put(COLLECTION_PROP, collectionName);
+    propMap.put(SHARD_ID_PROP, slice.getName());
+    propMap.put(QUEUE_OPERATION, REBALANCELEADERS.toLower());
+    propMap.put(CORE_NAME_PROP, core);
+    propMap.put(NODE_NAME_PROP, replica.getName());
+    propMap.put(ZkStateReader.BASE_URL_PROP, replica.getProperties().get(ZkStateReader.BASE_URL_PROP));
+    propMap.put(REJOIN_AT_HEAD_PROP, Boolean.toString(rejoinAtHead)); // Get ourselves to be first in line.
+    propMap.put(ELECTION_NODE_PROP, electionNode);
+    String asyncId = REBALANCELEADERS.toLower() + "_" + core + "_" + Math.abs(System.nanoTime());
+    propMap.put(ASYNC, asyncId);
+    ZkNodeProps m = new ZkNodeProps(propMap);
+    SolrQueryResponse rspIgnore = new SolrQueryResponse(); // I'm constructing my own response
+    collectionsHandler.handleResponse(REBALANCELEADERS.toLower(), m, rspIgnore); // Want to construct my own response here.
+  }
+
+  // currentAsyncIds - map of request IDs and reporting data (value)
+  // maxWaitSecs - How long are we going to wait? Defaults to 30 seconds.
+  // waitForAll - if true, do not return until all assignments have been made.
+  // results - a place to stash results for reporting back to the user.
+  //
+  private boolean waitForLeaderChange(Map<String, String> currentAsyncIds, final int maxWaitSecs,
+                                      Boolean waitForAll, NamedList<Object> results)
+      throws KeeperException, InterruptedException {
+
+    if (currentAsyncIds.size() == 0) return true;
+
+    for (int idx = 0; idx < maxWaitSecs * 10; ++idx) {
+      Iterator<Map.Entry<String, String>> iter = currentAsyncIds.entrySet().iterator();
+      boolean foundChange = false;
+      while (iter.hasNext()) {
+        Map.Entry<String, String> pair = iter.next();
+        String asyncId = pair.getKey();
+        if (coreContainer.getZkController().getOverseerFailureMap().contains(asyncId)) {
+          coreContainer.getZkController().getOverseerFailureMap().remove(asyncId);
+          NamedList<Object> fails = (NamedList<Object>) results.get("failures");
+          if (fails == null) {
+            fails = new NamedList<>();
+            results.add("failures", fails);
+          }
+          NamedList<Object> res = new NamedList<>();
+          res.add("status", "failed");
+          res.add("msg", "Failed to assign '" + pair.getValue() + "' to be leader");
+          fails.add(asyncId.substring(REBALANCELEADERS.toLower().length()), res);
+          iter.remove();
+          foundChange = true;
+        } else if (coreContainer.getZkController().getOverseerCompletedMap().contains(asyncId)) {
+          coreContainer.getZkController().getOverseerCompletedMap().remove(asyncId);
+          NamedList<Object> successes = (NamedList<Object>) results.get("successes");
+          if (successes == null) {
+            successes = new NamedList<>();
+            results.add("successes", successes);
+          }
+          NamedList<Object> res = new NamedList<>();
+          res.add("status", "success");
+          res.add("msg", "Assigned '" + pair.getValue() + "' to be leader");
+          successes.add(asyncId.substring(REBALANCELEADERS.toLower().length()), res);
+          iter.remove();
+          foundChange = true;
+        }
+      }
+      // We're done if we're processing a few at a time or all requests are processed.
+      if ((foundChange && waitForAll == false) || currentAsyncIds.size() == 0) {
+        return true;
+      }
+      Thread.sleep(100); //TODO: Is there a better thing to do than sleep here?
+    }
+    return false;
+  }
+
+
+}

Modified: lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/common/params/SolrParams.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/common/params/SolrParams.java?rev=1679397&r1=1679396&r2=1679397&view=diff
==============================================================================
--- lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/common/params/SolrParams.java (original)
+++ lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/common/params/SolrParams.java Thu May 14 16:25:52 2015
@@ -25,6 +25,7 @@ import org.apache.solr.common.util.StrUt
 import java.io.Serializable;
 import java.util.HashMap;
 import java.util.Iterator;
+import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 
@@ -48,14 +49,14 @@ public abstract class SolrParams impleme
     String val = get(param);
     return val==null ? def : val;
   }
-  
+
   /** returns a RequiredSolrParams wrapping this */
   public RequiredSolrParams required()
   {
     // TODO? should we want to stash a reference?
     return new RequiredSolrParams(this);
   }
-  
+
   protected String fpname(String field, String param) {
     return "f."+field+'.'+param;
   }
@@ -75,7 +76,7 @@ public abstract class SolrParams impleme
     String val = get(fpname(field,param));
     return val!=null ? val : get(param, def);
   }
-  
+
   /** returns the String values of the field parameter, "f.field.param", or
    *  the values for "param" if that is not set.
    */
@@ -95,15 +96,15 @@ public abstract class SolrParams impleme
     String val = get(param);
     return val==null ? def : StrUtils.parseBool(val);
   }
-  
-  /** Returns the Boolean value of the field param, 
+
+  /** Returns the Boolean value of the field param,
       or the value for param, or null if neither is set. */
   public Boolean getFieldBool(String field, String param) {
     String val = getFieldParam(field, param);
     return val==null ? null : StrUtils.parseBool(val);
   }
-  
-  /** Returns the boolean value of the field param, 
+
+  /** Returns the boolean value of the field param,
   or the value for param, or def if neither is set. */
   public boolean getFieldBool(String field, String param, boolean def) {
     String val = getFieldParam(field, param);
@@ -165,8 +166,8 @@ public abstract class SolrParams impleme
 
 
   /**
-   * @return The int value of the field param, or the value for param 
-   * or <code>null</code> if neither is set. 
+   * @return The int value of the field param, or the value for param
+   * or <code>null</code> if neither is set.
    **/
   public Integer getFieldInt(String field, String param) {
     String val = getFieldParam(field, param);
@@ -177,8 +178,8 @@ public abstract class SolrParams impleme
       throw new SolrException( SolrException.ErrorCode.BAD_REQUEST, ex.getMessage(), ex );
     }
   }
-  
-  /** Returns the int value of the field param, 
+
+  /** Returns the int value of the field param,
   or the value for param, or def if neither is set. */
   public int getFieldInt(String field, String param, int def) {
     String val = getFieldParam(field, param);
@@ -323,7 +324,7 @@ public abstract class SolrParams impleme
     // always use MultiMap for easier processing further down the chain
     return new MultiMapSolrParams(toMultiMap(params));
   }
-  
+
   /** Create filtered SolrParams. */
   public SolrParams toFilteredSolrParams(List<String> names) {
     NamedList<String> nl = new NamedList<>();
@@ -338,11 +339,11 @@ public abstract class SolrParams impleme
     }
     return toSolrParams(nl);
   }
-  
+
   /** Convert this to a NamedList */
   public NamedList<Object> toNamedList() {
     final SimpleOrderedMap<Object> result = new SimpleOrderedMap<>();
-    
+
     for(Iterator<String> it=getParameterNamesIterator(); it.hasNext(); ) {
       final String name = it.next();
       final String [] values = getParams(name);
@@ -355,4 +356,22 @@ public abstract class SolrParams impleme
     }
     return result;
   }
+
+  /**Copy all params to the given map or if the given map is null
+   * create a new one
+   */
+  public Map<String, Object> getAll(Map<String, Object> sink, String... params){
+    if(sink == null) sink = new LinkedHashMap<>();
+    for (String param : params) {
+      String[] v = getParams(param);
+      if(v != null && v.length>0 ) {
+        if(v.length == 1) {
+          sink.put(param, v[0]);
+        } else {
+          sink.put(param,v);
+        }
+      }
+    }
+    return sink;
+  }
 }