You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by er...@apache.org on 2014/10/17 16:50:14 UTC

svn commit: r1632594 - in /lucene/dev/trunk/solr: ./ core/src/java/org/apache/solr/cloud/ core/src/java/org/apache/solr/handler/admin/ core/src/test/org/apache/solr/cloud/ solrj/src/java/org/apache/solr/common/cloud/ solrj/src/java/org/apache/solr/comm...

Author: erick
Date: Fri Oct 17 14:50:14 2014
New Revision: 1632594

URL: http://svn.apache.org/r1632594
Log:
CollectionsAPI call REBALANCELEADERS

Modified:
    lucene/dev/trunk/solr/CHANGES.txt
    lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java
    lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/LeaderElector.java
    lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/Overseer.java
    lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionProcessor.java
    lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/ZkController.java
    lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java
    lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/OverseerRolesTest.java
    lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/RollingRestartTest.java
    lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/TestReplicaProperties.java
    lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java
    lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/common/params/CollectionParams.java

Modified: lucene/dev/trunk/solr/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/CHANGES.txt?rev=1632594&r1=1632593&r2=1632594&view=diff
==============================================================================
--- lucene/dev/trunk/solr/CHANGES.txt (original)
+++ lucene/dev/trunk/solr/CHANGES.txt Fri Oct 17 14:50:14 2014
@@ -178,6 +178,9 @@ New Features
 * SOLR-4715: Add CloudSolrServer constructors which accept a HttpClient instance.
   (Hardik Upadhyay, Shawn Heisey, shalin)
 
+* SOLR-6517: CollectionsAPI call REBALANCELEADERS. Used to balance leaders
+  across nodes for a particular collection
+
 
 Bug Fixes
 ----------------------

Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java?rev=1632594&r1=1632593&r2=1632594&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java Fri Oct 17 14:50:14 2014
@@ -212,7 +212,7 @@ final class ShardLeaderElectionContext e
     
     int leaderVoteWait = cc.getZkController().getLeaderVoteWait();
     if (!weAreReplacement) {
-      waitForReplicasToComeUp(weAreReplacement, leaderVoteWait);
+      waitForReplicasToComeUp(leaderVoteWait);
     }
 
     try (SolrCore core = cc.getCore(coreName)) {
@@ -226,7 +226,7 @@ final class ShardLeaderElectionContext e
       
       // should I be leader?
       if (weAreReplacement && !shouldIBeLeader(leaderProps, core, weAreReplacement)) {
-        rejoinLeaderElection(leaderSeqPath, core);
+        rejoinLeaderElection(core);
         return;
       }
       
@@ -297,7 +297,7 @@ final class ShardLeaderElectionContext e
         }
       }
       if (!success) {
-        rejoinLeaderElection(leaderSeqPath, core);
+        rejoinLeaderElection(core);
         return;
       }
 
@@ -323,7 +323,7 @@ final class ShardLeaderElectionContext e
         core.getCoreDescriptor().getCloudDescriptor().setLeader(false);
         
         // we could not publish ourselves as leader - try and rejoin election
-        rejoinLeaderElection(leaderSeqPath, core);
+        rejoinLeaderElection(core);
       }
     }
 
@@ -401,7 +401,7 @@ final class ShardLeaderElectionContext e
     } // core gets closed automagically    
   }
 
-  private void waitForReplicasToComeUp(boolean weAreReplacement, int timeoutms) throws InterruptedException {
+  private void waitForReplicasToComeUp(int timeoutms) throws InterruptedException {
     long timeoutAt = System.nanoTime() + TimeUnit.NANOSECONDS.convert(timeoutms, TimeUnit.MILLISECONDS);
     final String shardsElectZkPath = electionPath + LeaderElector.ELECTION_NODE;
     
@@ -448,11 +448,11 @@ final class ShardLeaderElectionContext e
     }
   }
 
-  private void rejoinLeaderElection(String leaderSeqPath, SolrCore core)
+  private void rejoinLeaderElection(SolrCore core)
       throws InterruptedException, KeeperException, IOException {
     // remove our ephemeral and re join the election
     if (cc.isShutDown()) {
-      log.info("Not rejoining election because CoreContainer is close");
+      log.info("Not rejoining election because CoreContainer is closed");
       return;
     }
     

Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/LeaderElector.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/LeaderElector.java?rev=1632594&r1=1632593&r2=1632594&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/LeaderElector.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/LeaderElector.java Fri Oct 17 14:50:14 2014
@@ -106,7 +106,6 @@ public  class LeaderElector {
         return;
       }
       // first we delete the node advertising the old leader in case the ephem is still there
-      // first we delete the node advertising the old leader in case the ephem is still there
       try {
         zkClient.delete(context.leaderPath, -1, true);
       }catch (KeeperException.NoNodeException nne){
@@ -244,7 +243,7 @@ public  class LeaderElector {
       try {
         if(joinAtHead){
           log.info("node {} Trying to join election at the head ", id);
-          List<String> nodes = OverseerCollectionProcessor.getSortedElectionNodes(zkClient);
+          List<String> nodes = OverseerCollectionProcessor.getSortedElectionNodes(zkClient, shardsElectZkPath);
           if(nodes.size() <2){
             leaderSeqPath = zkClient.create(shardsElectZkPath + "/" + id + "-n_", null,
                 CreateMode.EPHEMERAL_SEQUENTIAL, false);

Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/Overseer.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/Overseer.java?rev=1632594&r1=1632593&r2=1632594&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/Overseer.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/Overseer.java Fri Oct 17 14:50:14 2014
@@ -130,7 +130,9 @@ public class Overseer implements Closeab
 
   static enum LeaderStatus {DONT_KNOW, NO, YES}
 
-  public static final Set<String> sliceUniqueBooleanProperties = ImmutableSet.of("property.preferredleader");
+  public static final String preferredLeaderProp = COLL_PROP_PREFIX + "preferredleader";
+
+  public static final Set<String> sliceUniqueBooleanProperties = ImmutableSet.of(preferredLeaderProp);
 
   private long lastUpdatedTime = 0;
 
@@ -1169,7 +1171,7 @@ public class Overseer implements Closeab
         return null;
       }
 
-    ClusterState updateSlice(ClusterState state, String collectionName, Slice slice) {
+    private ClusterState updateSlice(ClusterState state, String collectionName, Slice slice) {
         // System.out.println("###!!!### OLD CLUSTERSTATE: " + JSONUtil.toJSON(state.getCollectionStates()));
         // System.out.println("Updating slice:" + slice);
         DocCollection newCollection = null;
@@ -1396,7 +1398,6 @@ public class Overseer implements Closeab
       }
 
   }
-
   // Class to encapsulate processing replica properties that have at most one replica hosting a property per slice.
   private class ExclusiveSliceProperty {
     private ClusterStateUpdater updater;
@@ -1698,6 +1699,7 @@ public class Overseer implements Closeab
       this.replica = replica;
     }
   }
+
   static void getShardNames(Integer numShards, List<String> shardNames) {
     if(numShards == null)
       throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "numShards" + " is a required param");

Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionProcessor.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionProcessor.java?rev=1632594&r1=1632593&r2=1632594&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionProcessor.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionProcessor.java Fri Oct 17 14:50:14 2014
@@ -18,7 +18,9 @@ package org.apache.solr.cloud;
  */
 
 import static org.apache.solr.cloud.Assign.getNodesForNewShard;
+import static org.apache.solr.common.cloud.ZkStateReader.BASE_URL_PROP;
 import static org.apache.solr.common.cloud.ZkStateReader.COLLECTION_PROP;
+import static org.apache.solr.common.cloud.ZkStateReader.CORE_NAME_PROP;
 import static org.apache.solr.common.cloud.ZkStateReader.REPLICA_PROP;
 import static org.apache.solr.common.cloud.ZkStateReader.PROPERTY_PROP;
 import static org.apache.solr.common.cloud.ZkStateReader.PROPERTY_VALUE_PROP;
@@ -31,9 +33,9 @@ import static org.apache.solr.common.par
 import static org.apache.solr.common.params.CollectionParams.CollectionAction.CREATE;
 import static org.apache.solr.common.params.CollectionParams.CollectionAction.CREATESHARD;
 import static org.apache.solr.common.params.CollectionParams.CollectionAction.DELETE;
+import static org.apache.solr.common.params.CollectionParams.CollectionAction.DELETEREPLICAPROP;
 import static org.apache.solr.common.params.CollectionParams.CollectionAction.DELETESHARD;
 import static org.apache.solr.common.params.CollectionParams.CollectionAction.REMOVEROLE;
-import static org.apache.solr.common.params.CollectionParams.CollectionAction.DELETEREPLICAPROP;
 
 import java.io.Closeable;
 import java.io.IOException;
@@ -441,7 +443,7 @@ public class OverseerCollectionProcessor
     String ldr = getLeaderNode(zk);
     if(overseerDesignates.contains(ldr)) return;
     log.info("prioritizing overseer nodes at {} overseer designates are {}", myId, overseerDesignates);
-    List<String> electionNodes = getSortedElectionNodes(zk);
+    List<String> electionNodes = getSortedElectionNodes(zk, OverseerElectionContext.PATH + LeaderElector.ELECTION_NODE);
     if(electionNodes.size()<2) return;
     log.info("sorted nodes {}", electionNodes);
 
@@ -484,10 +486,10 @@ public class OverseerCollectionProcessor
     return nodeNames;
   }
 
-  public static List<String> getSortedElectionNodes(SolrZkClient zk) throws KeeperException, InterruptedException {
+  public static List<String> getSortedElectionNodes(SolrZkClient zk, String path) throws KeeperException, InterruptedException {
     List<String> children = null;
     try {
-      children = zk.getChildren(OverseerElectionContext.PATH + LeaderElector.ELECTION_NODE, null, true);
+      children = zk.getChildren(path, null, true);
       LeaderElector.sortSeqs(children);
       return children;
     } catch (Exception e) {
@@ -651,6 +653,9 @@ public class OverseerCollectionProcessor
           case BALANCESLICEUNIQUE:
             balanceProperty(message);
             break;
+          case REBALANCELEADERS:
+            processAssignLeaders(message);
+            break;
           default:
             throw new SolrException(ErrorCode.BAD_REQUEST, "Unknown operation:"
                 + operation);
@@ -677,6 +682,32 @@ public class OverseerCollectionProcessor
   }
 
   @SuppressWarnings("unchecked")
+  // re-purpose BALANCELEADERS to reassign a single leader over here
+  private void processAssignLeaders(ZkNodeProps message) throws KeeperException, InterruptedException {
+    String collectionName = message.getStr(COLLECTION_PROP);
+    String shardId = message.getStr(SHARD_ID_PROP);
+    String baseURL = message.getStr(BASE_URL_PROP);
+    String coreName = message.getStr(CORE_NAME_PROP);
+
+    if (StringUtils.isBlank(collectionName) || StringUtils.isBlank(shardId) || StringUtils.isBlank(baseURL) ||
+        StringUtils.isBlank(coreName)) {
+      throw new SolrException(ErrorCode.BAD_REQUEST,
+          String.format(Locale.ROOT, "The '%s', '%s', '%s' and '%s' parameters are required when assigning a leader",
+              COLLECTION_PROP, SHARD_ID_PROP, BASE_URL_PROP, CORE_NAME_PROP));
+    }
+    SolrZkClient zkClient = zkStateReader.getZkClient();
+    DistributedQueue inQueue = Overseer.getInQueue(zkClient);
+    Map<String, Object> propMap = new HashMap<>();
+    propMap.put(Overseer.QUEUE_OPERATION, Overseer.OverseerAction.LEADER.toLower());
+    propMap.put(COLLECTION_PROP, collectionName);
+    propMap.put(SHARD_ID_PROP, shardId);
+    propMap.put(BASE_URL_PROP, baseURL);
+    propMap.put(CORE_NAME_PROP, coreName);
+    inQueue.offer(zkStateReader.toJSON(propMap));
+  }
+
+
+  @SuppressWarnings("unchecked")
   private void processReplicaAddPropertyCommand(ZkNodeProps message) throws KeeperException, InterruptedException {
     if (StringUtils.isBlank(message.getStr(COLLECTION_PROP)) ||
         StringUtils.isBlank(message.getStr(SHARD_ID_PROP)) ||
@@ -684,7 +715,7 @@ public class OverseerCollectionProcessor
         StringUtils.isBlank(message.getStr(PROPERTY_PROP)) ||
         StringUtils.isBlank(message.getStr(PROPERTY_VALUE_PROP))) {
       throw new SolrException(ErrorCode.BAD_REQUEST,
-          String.format(Locale.ROOT, "The '%s', '%s', '%s', '%s', and '%s' parameters are required for all replica properties add/delete' operations",
+          String.format(Locale.ROOT, "The '%s', '%s', '%s', '%s', and '%s' parameters are required for all replica properties add/delete operations",
               COLLECTION_PROP, SHARD_ID_PROP, REPLICA_PROP, PROPERTY_PROP, PROPERTY_VALUE_PROP));
     }
     SolrZkClient zkClient = zkStateReader.getZkClient();
@@ -702,7 +733,7 @@ public class OverseerCollectionProcessor
         StringUtils.isBlank(message.getStr(REPLICA_PROP)) ||
         StringUtils.isBlank(message.getStr(PROPERTY_PROP))) {
       throw new SolrException(ErrorCode.BAD_REQUEST,
-          String.format(Locale.ROOT, "The '%s', '%s', '%s', and '%s' parameters are required for all replica properties add/delete' operations",
+          String.format(Locale.ROOT, "The '%s', '%s', '%s', and '%s' parameters are required for all replica properties add/delete operations",
               COLLECTION_PROP, SHARD_ID_PROP, REPLICA_PROP, PROPERTY_PROP));
     }
     SolrZkClient zkClient = zkStateReader.getZkClient();

Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/ZkController.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/ZkController.java?rev=1632594&r1=1632593&r2=1632594&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/ZkController.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/ZkController.java Fri Oct 17 14:50:14 2014
@@ -834,7 +834,13 @@ public final class ZkController {
     ZkNodeProps leaderProps = new ZkNodeProps(props);
     
     try {
-      joinElection(desc, afterExpiration);
+      // If we're a preferred leader, insert ourselves at the head of the queue
+      boolean joinAtHead = false;
+      Replica replica = zkStateReader.getClusterState().getReplica(desc.getCloudDescriptor().getCollectionName(), coreZkNodeName);
+      if (replica != null) {
+        joinAtHead = replica.getBool(Overseer.preferredLeaderProp, false);
+      }
+      joinElection(desc, afterExpiration, joinAtHead);
     } catch (InterruptedException e) {
       // Restore the interrupted status
       Thread.currentThread().interrupt();
@@ -988,7 +994,8 @@ public final class ZkController {
   }
 
 
-  private void joinElection(CoreDescriptor cd, boolean afterExpiration) throws InterruptedException, KeeperException, IOException {
+  private void joinElection(CoreDescriptor cd, boolean afterExpiration, boolean joinAtHead)
+      throws InterruptedException, KeeperException, IOException {
     // look for old context - if we find it, cancel it
     String collection = cd.getCloudDescriptor().getCollectionName();
     final String coreNodeName = cd.getCloudDescriptor().getCoreNodeName();
@@ -1018,7 +1025,7 @@ public final class ZkController {
 
     leaderElector.setup(context);
     electionContexts.put(contextKey, context);
-    leaderElector.joinElection(context, false);
+    leaderElector.joinElection(context, false, joinAtHead);
   }
 
 

Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java?rev=1632594&r1=1632593&r2=1632594&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java Fri Oct 17 14:50:14 2014
@@ -30,16 +30,23 @@ import static org.apache.solr.cloud.Over
 import static org.apache.solr.cloud.OverseerCollectionProcessor.ROUTER;
 import static org.apache.solr.cloud.OverseerCollectionProcessor.SHARDS_PROP;
 import static org.apache.solr.common.cloud.ZkNodeProps.makeMap;
+import static org.apache.solr.common.cloud.ZkStateReader.ACTIVE;
+import static org.apache.solr.common.cloud.ZkStateReader.BASE_URL_PROP;
 import static org.apache.solr.common.cloud.ZkStateReader.COLLECTION_PROP;
+import static org.apache.solr.common.cloud.ZkStateReader.CORE_NAME_PROP;
+import static org.apache.solr.common.cloud.ZkStateReader.LEADER_PROP;
 import static org.apache.solr.common.cloud.ZkStateReader.SHARD_ID_PROP;
 import static org.apache.solr.common.cloud.ZkStateReader.PROPERTY_PROP;
 import static org.apache.solr.common.cloud.ZkStateReader.PROPERTY_VALUE_PROP;
 import static org.apache.solr.common.cloud.ZkStateReader.MAX_SHARDS_PER_NODE;
 import static org.apache.solr.common.cloud.ZkStateReader.AUTO_ADD_REPLICAS;
 import static org.apache.solr.common.cloud.ZkStateReader.REPLICA_PROP;
+import static org.apache.solr.common.cloud.ZkStateReader.MAX_AT_ONCE_PROP;
+import static org.apache.solr.common.cloud.ZkStateReader.MAX_WAIT_SECONDS_PROP;
+import static org.apache.solr.common.cloud.ZkStateReader.STATE_PROP;
 import static org.apache.solr.common.params.CollectionParams.CollectionAction.ADDROLE;
-import static org.apache.solr.common.params.CollectionParams.CollectionAction.BALANCESLICEUNIQUE;
 import static org.apache.solr.common.params.CollectionParams.CollectionAction.ADDREPLICAPROP;
+import static org.apache.solr.common.params.CollectionParams.CollectionAction.BALANCESLICEUNIQUE;
 import static org.apache.solr.common.params.CollectionParams.CollectionAction.CLUSTERPROP;
 import static org.apache.solr.common.params.CollectionParams.CollectionAction.CREATE;
 import static org.apache.solr.common.params.CollectionParams.CollectionAction.CREATEALIAS;
@@ -51,6 +58,7 @@ import static org.apache.solr.common.par
 import static org.apache.solr.common.params.CollectionParams.CollectionAction.DELETESHARD;
 import static org.apache.solr.common.params.CollectionParams.CollectionAction.MIGRATE;
 import static org.apache.solr.common.params.CollectionParams.CollectionAction.OVERSEERSTATUS;
+import static org.apache.solr.common.params.CollectionParams.CollectionAction.REBALANCELEADERS;
 import static org.apache.solr.common.params.CollectionParams.CollectionAction.RELOAD;
 import static org.apache.solr.common.params.CollectionParams.CollectionAction.REMOVEROLE;
 import static org.apache.solr.common.params.CollectionParams.CollectionAction.SPLITSHARD;
@@ -80,6 +88,8 @@ import org.apache.solr.common.SolrExcept
 import org.apache.solr.common.cloud.ClusterState;
 import org.apache.solr.common.cloud.DocCollection;
 import org.apache.solr.common.cloud.ImplicitDocRouter;
+import org.apache.solr.common.cloud.Replica;
+import org.apache.solr.common.cloud.Slice;
 import org.apache.solr.common.cloud.ZkCoreNodeProps;
 import org.apache.solr.common.cloud.ZkNodeProps;
 import org.apache.solr.common.cloud.ZkStateReader;
@@ -252,6 +262,10 @@ public class CollectionsHandler extends 
         this.handleBalanceSliceUnique(req, rsp);
         break;
       }
+      case REBALANCELEADERS: {
+        this.handleBalanceLeaders(req, rsp);
+        break;
+      }
       default: {
           throw new RuntimeException("Unknown action: " + action);
       }
@@ -260,6 +274,156 @@ public class CollectionsHandler extends 
     rsp.setHttpCaching(false);
   }
 
+
+  private void handleBalanceLeaders(SolrQueryRequest req, SolrQueryResponse rsp) throws KeeperException, InterruptedException {
+    req.getParams().required().check(COLLECTION_PROP);
+
+    String collectionName = req.getParams().get(COLLECTION_PROP);
+    if (StringUtils.isBlank(collectionName)) {
+      throw new SolrException(ErrorCode.BAD_REQUEST,
+          String.format(Locale.ROOT, "The " + COLLECTION_PROP + " is required for the REASSIGNLEADERS command."));
+    }
+    coreContainer.getZkController().getZkStateReader().updateClusterState(true);
+    ClusterState clusterState = coreContainer.getZkController().getClusterState();
+    DocCollection dc = clusterState.getCollection(collectionName);
+    if (dc == null) {
+      throw new SolrException(ErrorCode.BAD_REQUEST, "Collection '" + collectionName + "' does not exist, no action taken.");
+    }
+    Map<String, String> current = new HashMap<>();
+    int max = req.getParams().getInt(MAX_AT_ONCE_PROP, Integer.MAX_VALUE);
+    if (max <= 0) max = Integer.MAX_VALUE;
+    int maxWaitSecs = req.getParams().getInt(MAX_WAIT_SECONDS_PROP, 60);
+    NamedList<Object> results = new NamedList<>();
+    SolrQueryResponse rspIgnore = new SolrQueryResponse();
+    final String inactivePreferreds = "inactivePreferreds";
+    final String alreadyLeaders = "alreadyLeaders";
+    boolean keepGoing = true;
+    for (Slice slice : dc.getSlices()) {
+      for (Replica replica : slice.getReplicas()) {
+        // Tell the replica to become the leader if we're the preferred leader AND active AND not the leader already
+        if (replica.getBool(Overseer.preferredLeaderProp, false) == false) {
+          continue;
+        }
+        if (StringUtils.equalsIgnoreCase(replica.getStr(STATE_PROP), ACTIVE) == false) {
+          NamedList<Object> inactives = (NamedList<Object>) results.get(inactivePreferreds);
+          if (inactives == null) {
+            inactives = new NamedList<>();
+            results.add(inactivePreferreds, inactives);
+          }
+          NamedList<Object> res = new NamedList<>();
+          res.add("status", "skipped");
+          res.add("msg", "Node is a referredLeader, but it's inactive. Skipping");
+          res.add("nodeName", replica.getNodeName());
+          inactives.add(replica.getName(), res);
+          break; // Don't try to assign if we're not active!
+        }        // OK, we're the one, get in the queue to become the leader.
+        if (replica.getBool(LEADER_PROP, false)) {
+          NamedList<Object> noops = (NamedList<Object>) results.get(alreadyLeaders);
+          if (noops == null) {
+            noops = new NamedList<>();
+            results.add(alreadyLeaders, noops);
+          }
+          NamedList<Object> res = new NamedList<>();
+          res.add("status", "success");
+          res.add("msg", "Already leader");
+          res.add("nodeName", replica.getNodeName());
+          noops.add(replica.getName(), res);
+          break; // already the leader, do nothing.
+        }
+        Map<String, Object> propMap = new HashMap<>();
+        propMap.put(Overseer.QUEUE_OPERATION, REBALANCELEADERS.toLower());
+        propMap.put(COLLECTION_PROP, collectionName);
+        propMap.put(SHARD_ID_PROP, slice.getName());
+        propMap.put(BASE_URL_PROP, replica.get(BASE_URL_PROP));
+
+        String coreName = (String) replica.get(CORE_NAME_PROP);
+        // Put it in the waiting list.
+        String asyncId = REBALANCELEADERS.toLower() + "_" + coreName;
+        current.put(asyncId, String.format(Locale.ROOT, "Collection: '%s', Shard: '%s', Core: '%s', BaseUrl: '%s'",
+            collectionName, slice.getName(), coreName, replica.get(BASE_URL_PROP)));
+
+        propMap.put(CORE_NAME_PROP, coreName);
+        propMap.put(ASYNC, asyncId);
+
+        ZkNodeProps m = new ZkNodeProps(propMap);
+        log.info("Queueing collection '" + collectionName + "' slice '" + slice.getName() + "' replica '" +
+                coreName + "' to become leader.");
+        handleResponse(REBALANCELEADERS.toLower(), m, rspIgnore); // Want to construct my own response here.
+        break; // Done with this slice, skip the rest of the replicas.
+      }
+      if (current.size() == max) {
+        log.info("Queued " + max + " leader reassgnments, waiting for some to complete.");
+        keepGoing = waitForLeaderChange(current, maxWaitSecs, false, results);
+        if (keepGoing == false) {
+          break; // If we've waited longer than specified, don't continue to wait!
+        }
+      }
+    }
+    if (keepGoing == true) {
+      keepGoing = waitForLeaderChange(current, maxWaitSecs, true, results);
+    }
+    if (keepGoing == true) {
+      log.info("All leader reassignments completed.");
+    } else {
+      log.warn("Exceeded specified timeout of ." + maxWaitSecs + "' all leaders may not have been reassigned");
+    }
+
+    rsp.getValues().addAll(results);
+  }
+
+  // currentAsyncIds - map of request IDs and reporting data (value)
+  // maxWaitSecs - How long are we going to wait? Defaults to 30 seconds.
+  // waitForAll - if true, do not return until all assignments have been made.
+  // results - a place to stash results for reporting back to the user.
+  //
+  private boolean waitForLeaderChange(Map<String, String> currentAsyncIds, final int maxWaitSecs,
+                                      Boolean waitForAll, NamedList<Object> results)
+      throws KeeperException, InterruptedException {
+
+    if (currentAsyncIds.size() == 0) return true;
+
+    for (int idx = 0; idx < maxWaitSecs * 10; ++idx) {
+      Iterator<Map.Entry<String, String>> iter = currentAsyncIds.entrySet().iterator();
+      boolean foundChange = false;
+      while (iter.hasNext()) {
+        Map.Entry<String, String> pair = iter.next();
+        String asyncId = pair.getKey();
+        if (coreContainer.getZkController().getOverseerFailureMap().contains(asyncId)) {
+          coreContainer.getZkController().getOverseerFailureMap().remove(asyncId);
+          NamedList<Object> fails = (NamedList<Object>) results.get("failures");
+          if (fails == null) {
+            fails = new NamedList<>();
+            results.add("failures", fails);
+          }
+          NamedList<Object> res = new NamedList<>();
+          res.add("status", "failed");
+          res.add("msg", "Failed to assign '" + pair.getValue() + "' to be leader");
+          fails.add(asyncId.substring(REBALANCELEADERS.toLower().length()), res);
+          iter.remove();
+          foundChange = true;
+        } else if (coreContainer.getZkController().getOverseerCompletedMap().contains(asyncId)) {
+          coreContainer.getZkController().getOverseerCompletedMap().remove(asyncId);
+          NamedList<Object> successes = (NamedList<Object>) results.get("successes");
+          if (successes == null) {
+            successes = new NamedList<>();
+            results.add("successes", successes);
+          }
+          NamedList<Object> res = new NamedList<>();
+          res.add("status", "success");
+          res.add("msg", "Assigned '" + pair.getValue() + "' to be leader");
+          successes.add(asyncId.substring(REBALANCELEADERS.toLower().length()), res);
+          iter.remove();
+          foundChange = true;
+        }
+      }
+      // We're done if we're processing a few at a time or all requests are processed.
+      if ((foundChange && waitForAll == false) || currentAsyncIds.size() == 0) {
+        return true;
+      }
+      Thread.sleep(100); //TODO: Is there a better thing to do than sleep here?
+    }
+    return false;
+  }
   private void handleAddReplicaProp(SolrQueryRequest req, SolrQueryResponse rsp) throws KeeperException, InterruptedException {
     req.getParams().required().check(COLLECTION_PROP, PROPERTY_PROP, SHARD_ID_PROP, REPLICA_PROP, PROPERTY_VALUE_PROP);
 
@@ -425,7 +589,7 @@ public class CollectionsHandler extends 
        }
  
        NamedList<String> r = new NamedList<>();
- 
+
        if (coreContainer.getZkController().getOverseerCompletedMap().contains(asyncId) ||
            coreContainer.getZkController().getOverseerFailureMap().contains(asyncId) ||
            coreContainer.getZkController().getOverseerRunningMap().contains(asyncId) ||

Modified: lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/OverseerRolesTest.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/OverseerRolesTest.java?rev=1632594&r1=1632593&r2=1632594&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/OverseerRolesTest.java (original)
+++ lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/OverseerRolesTest.java Fri Oct 17 14:50:14 2014
@@ -197,7 +197,9 @@ public class OverseerRolesTest  extends 
     assertNotNull("Could not find a jetty2 kill",  leaderJetty);
 
     log.info("leader node {}", leaderJetty.getBaseUrl());
-    log.info ("current election Queue", OverseerCollectionProcessor.getSortedElectionNodes(client.getZkStateReader().getZkClient()));
+    log.info ("current election Queue",
+        OverseerCollectionProcessor.getSortedElectionNodes(client.getZkStateReader().getZkClient(),
+            OverseerElectionContext.PATH + LeaderElector.ELECTION_NODE));
     ChaosMonkey.stop(leaderJetty);
     timeout = System.currentTimeMillis() + 10000;
     leaderchanged = false;

Modified: lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/RollingRestartTest.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/RollingRestartTest.java?rev=1632594&r1=1632593&r2=1632594&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/RollingRestartTest.java (original)
+++ lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/RollingRestartTest.java Fri Oct 17 14:50:14 2014
@@ -106,7 +106,9 @@ public class RollingRestartTest extends 
           if (!success) {
             leader = OverseerCollectionProcessor.getLeaderNode(cloudClient.getZkStateReader().getZkClient());
             if (leader == null)
-              log.error("NOOVERSEER election queue is :" + OverseerCollectionProcessor.getSortedElectionNodes(cloudClient.getZkStateReader().getZkClient()));
+              log.error("NOOVERSEER election queue is :" +
+                  OverseerCollectionProcessor.getSortedElectionNodes(cloudClient.getZkStateReader().getZkClient(),
+                      OverseerElectionContext.PATH + LeaderElector.ELECTION_NODE));
             fail("No overseer designate as leader found after restart #" + (i + 1) + ": " + leader);
           }
         }
@@ -115,7 +117,9 @@ public class RollingRestartTest extends 
         if (!success) {
           leader = OverseerCollectionProcessor.getLeaderNode(cloudClient.getZkStateReader().getZkClient());
           if (leader == null)
-            log.error("NOOVERSEER election queue is :" + OverseerCollectionProcessor.getSortedElectionNodes(cloudClient.getZkStateReader().getZkClient()));
+            log.error("NOOVERSEER election queue is :" +
+                OverseerCollectionProcessor.getSortedElectionNodes(cloudClient.getZkStateReader().getZkClient(),
+                    OverseerElectionContext.PATH + LeaderElector.ELECTION_NODE));
           fail("No overseer leader found after restart #" + (i + 1) + ": " + leader);
         }
         

Modified: lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/TestReplicaProperties.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/TestReplicaProperties.java?rev=1632594&r1=1632593&r2=1632594&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/TestReplicaProperties.java (original)
+++ lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/TestReplicaProperties.java Fri Oct 17 14:50:14 2014
@@ -29,10 +29,13 @@ import org.apache.solr.client.solrj.Solr
 import org.apache.solr.client.solrj.impl.CloudSolrServer;
 import org.apache.solr.client.solrj.request.QueryRequest;
 import org.apache.solr.common.SolrException;
+import org.apache.solr.common.cloud.ClusterState;
+import org.apache.solr.common.cloud.Replica;
 import org.apache.solr.common.cloud.Slice;
 import org.apache.solr.common.params.CollectionParams;
 import org.apache.solr.common.params.ModifiableSolrParams;
 import org.apache.solr.common.util.NamedList;
+import org.apache.zookeeper.KeeperException;
 import org.junit.Before;
 
 @Slow
@@ -61,7 +64,7 @@ public class TestReplicaProperties exten
       // shards, replicationfactor, maxreplicaspernode
       int shards = random().nextInt(7);
       if (shards < 2) shards = 2;
-      int rFactor = random().nextInt(3);
+      int rFactor = random().nextInt(4);
       if (rFactor < 2) rFactor = 2;
       createCollection(null, COLLECTION_NAME, shards, rFactor, shards * rFactor + 1, client, null, "conf1");
     } finally {
@@ -186,11 +189,56 @@ public class TestReplicaProperties exten
       verifyPropertyVal(client, COLLECTION_NAME,
           c1_s1_r2, "property.bogus1", "whatever");
 
+      // At this point we've assigned a preferred leader. Make it happen and check that all the nodes that are
+      // leaders _also_ have the preferredLeader property set.
+
+
+      doPropertyAction(client,
+          "action", CollectionParams.CollectionAction.REBALANCELEADERS.toString(),
+          "collection", COLLECTION_NAME);
+
+      verifyLeaderAssignment(client, COLLECTION_NAME);
+
     } finally {
       client.shutdown();
     }
   }
 
+  private void verifyLeaderAssignment(CloudSolrServer client, String collectionName)
+      throws InterruptedException, KeeperException {
+    String lastFailMsg = "";
+    for (int idx = 0; idx < 300; ++idx) { // Keep trying while Overseer writes the ZK state for up to 30 seconds.
+      lastFailMsg = "";
+      client.getZkStateReader().updateClusterState(true);
+      ClusterState clusterState = client.getZkStateReader().getClusterState();
+      for (Slice slice : clusterState.getSlices(collectionName)) {
+        Boolean foundLeader = false;
+        Boolean foundPreferred = false;
+        for (Replica replica : slice.getReplicas()) {
+          Boolean isLeader = replica.getBool("leader", false);
+          Boolean isPreferred = replica.getBool("property.preferredleader", false);
+          if (isLeader != isPreferred) {
+            lastFailMsg = "Replica should NOT have preferredLeader != leader. Preferred: " + isPreferred.toString() +
+                " leader is " + isLeader.toString();
+          }
+          if (foundLeader && isLeader) {
+            lastFailMsg = "There should only be a single leader in _any_ shard! Replica " + replica.getName() +
+                " is the second leader in slice " + slice.getName();
+          }
+          if (foundPreferred && isPreferred) {
+            lastFailMsg = "There should only be a single preferredLeader in _any_ shard! Replica " + replica.getName() +
+                " is the second preferredLeader in slice " + slice.getName();
+          }
+          foundLeader = foundLeader ? foundLeader : isLeader;
+          foundPreferred = foundPreferred ? foundPreferred : isPreferred;
+        }
+      }
+      if (lastFailMsg.length() == 0) return;
+      Thread.sleep(100);
+    }
+    fail(lastFailMsg);
+  }
+
   private void addProperty(CloudSolrServer client, String... paramsIn) throws IOException, SolrServerException {
     assertTrue("paramsIn must be an even multiple of 2, it is: " + paramsIn.length, (paramsIn.length % 2) == 0);
     ModifiableSolrParams params = new ModifiableSolrParams();

Modified: lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java?rev=1632594&r1=1632593&r2=1632594&view=diff
==============================================================================
--- lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java (original)
+++ lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java Fri Oct 17 14:50:14 2014
@@ -71,7 +71,8 @@ public class ZkStateReader implements Cl
   public static final String LEADER_PROP = "leader";
   public static final String PROPERTY_PROP = "property";
   public static final String PROPERTY_VALUE_PROP = "property.value";
-  
+  public static final String MAX_AT_ONCE_PROP = "maxAtOnce";
+  public static final String MAX_WAIT_SECONDS_PROP = "maxWaitSeconds";
   public static final String COLLECTIONS_ZKNODE = "/collections";
   public static final String LIVE_NODES_ZKNODE = "/live_nodes";
   public static final String ALIASES = "/aliases.json";

Modified: lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/common/params/CollectionParams.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/common/params/CollectionParams.java?rev=1632594&r1=1632593&r2=1632594&view=diff
==============================================================================
--- lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/common/params/CollectionParams.java (original)
+++ lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/common/params/CollectionParams.java Fri Oct 17 14:50:14 2014
@@ -49,7 +49,8 @@ public interface CollectionParams 
     CLUSTERSTATUS,
     ADDREPLICAPROP,
     DELETEREPLICAPROP,
-    BALANCESLICEUNIQUE;
+    BALANCESLICEUNIQUE,
+    REBALANCELEADERS;
     
     public static CollectionAction get( String p )
     {