You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by er...@apache.org on 2014/12/24 23:55:36 UTC

svn commit: r1647857 - in /lucene/dev/trunk/solr: core/src/java/org/apache/solr/cloud/ core/src/java/org/apache/solr/handler/admin/ core/src/test/org/apache/solr/cloud/ solrj/src/java/org/apache/solr/common/cloud/ solrj/src/java/org/apache/solr/common/...

Author: erick
Date: Wed Dec 24 22:55:36 2014
New Revision: 1647857

URL: http://svn.apache.org/r1647857
Log:
SOLR=6691: REBALANCELEADERS needs to change the leader election queue. Going to let this bake in trunk until 5.0 is cut

Added:
    lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/TestRebalanceLeaders.java
Modified:
    lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/LeaderElector.java
    lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionProcessor.java
    lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/ZkController.java
    lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java
    lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/admin/CoreAdminHandler.java
    lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/LeaderElectionTest.java
    lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java
    lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/common/params/CoreAdminParams.java

Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/LeaderElector.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/LeaderElector.java?rev=1647857&r1=1647856&r2=1647857&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/LeaderElector.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/LeaderElector.java Wed Dec 24 22:55:36 2014
@@ -25,6 +25,7 @@ import java.util.List;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
+import org.apache.commons.lang.StringUtils;
 import org.apache.solr.common.SolrException;
 import org.apache.solr.common.cloud.SolrZkClient;
 import org.apache.solr.common.cloud.ZkCmdExecutor;
@@ -86,7 +87,7 @@ public  class LeaderElector {
    *
    * @param replacement has someone else been the leader already?
    */
-  private void checkIfIamLeader(final int seq, final ElectionContext context, boolean replacement) throws KeeperException,
+  private void checkIfIamLeader(final ElectionContext context, boolean replacement) throws KeeperException,
       InterruptedException, IOException {
     context.checkIfIamLeaderFired();
     // get all other numbers...
@@ -99,10 +100,44 @@ public  class LeaderElector {
       log.warn("Our node is no longer in line to be leader");
       return;
     }
+    // We can't really rely on the sequence number stored in the old watcher, it may be stale, thus this check.
+
+    int seq = -1;
+
+    // See if we've already been re-added, and this is an old context. In which case, use our current sequence number.
+    String newLeaderSeq = "";
+    for (String elec : seqs) {
+      if (getNodeName(elec).equals(getNodeName(context.leaderSeqPath)) && seq < getSeq(elec)) {
+        seq = getSeq(elec); // so use the current sequence number.
+        newLeaderSeq = elec;
+        break;
+      }
+    }
+
+    // Now, if we've been re-added, presumably we've also set up watchers and all that kind of thing, so we're done
+    if (StringUtils.isNotBlank(newLeaderSeq) && seq > getSeq(context.leaderSeqPath)) {
+      log.info("Node " + context.leaderSeqPath + " already in queue as " + newLeaderSeq + " nothing to do.");
+      return;
+    }
+
+    // Fallback in case we're all coming in here fresh and there is no node for this core already in the election queue.
+    if (seq == -1) {
+      seq = getSeq(context.leaderSeqPath);
+    }
+
     if (seq <= intSeqs.get(0)) {
-      if(seq == intSeqs.get(0) && !context.leaderSeqPath.equals(holdElectionPath+"/"+seqs.get(0)) ) {//somebody else already  became the leader with the same sequence id , not me
-        log.info("was going be leader {} , seq(0) {}",context.leaderSeqPath,holdElectionPath+"/"+seqs.get(0));//but someone else jumped the line
-        retryElection(context,false);//join at the tail again
+      if (seq == intSeqs.get(0) && !context.leaderSeqPath.equals(holdElectionPath + "/" + seqs.get(0))) {//somebody else already  became the leader with the same sequence id , not me
+        log.info("was going to be leader {} , seq(0) {}", context.leaderSeqPath, holdElectionPath + "/" + seqs.get(0));//but someone else jumped the line
+
+        // The problem is that deleting the ZK node that's watched by others
+        // results in an unpredictable sequencing of the events and sometime the context that comes in for checking
+        // this happens to be after the node has already taken over leadership. So just leave out of here.
+        // This caused one of the tests to fail on having two nodes with the same name in the queue. I'm not sure
+        // the assumption that this is a bad state is valid.
+        if (getNodeName(context.leaderSeqPath).equals(getNodeName(seqs.get(0)))) {
+          return;
+        }
+        retryElection(context, false);//join at the tail again
         return;
       }
       // first we delete the node advertising the old leader in case the ephem is still there
@@ -129,21 +164,22 @@ public  class LeaderElector {
       }
     } else {
       // I am not the leader - watch the node below me
-      int i = 1;
-      for (; i < intSeqs.size(); i++) {
-        int s = intSeqs.get(i);
-        if (seq < s) {
-          // we found who we come before - watch the guy in front
+      int toWatch = -1;
+      for (int idx = 0; idx < intSeqs.size(); idx++) {
+        if (intSeqs.get(idx) < seq && ! getNodeName(context.leaderSeqPath).equals(getNodeName(seqs.get(idx)))) {
+          toWatch = idx;
+        }
+        if (intSeqs.get(idx) >= seq) {
           break;
         }
       }
-      int index = i - 2;
-      if (index < 0) {
+      if (toWatch < 0) {
         log.warn("Our node is no longer in line to be leader");
         return;
       }
       try {
-        String watchedNode = holdElectionPath + "/" + seqs.get(index);
+        String watchedNode = holdElectionPath + "/" + seqs.get(toWatch);
+
         zkClient.getData(watchedNode, watcher = new ElectionWatcher(context.leaderSeqPath , watchedNode,seq, context) , null, true);
       } catch (KeeperException.SessionExpiredException e) {
         throw e;
@@ -151,7 +187,7 @@ public  class LeaderElector {
         log.warn("Failed setting watch", e);
         // we couldn't set our watch - the node before us may already be down?
         // we need to check if we are the leader again
-        checkIfIamLeader(seq, context, true);
+        checkIfIamLeader(context, true);
       }
     }
   }
@@ -309,15 +345,13 @@ public  class LeaderElector {
         }
       }
     }
-    int seq = getSeq(leaderSeqPath);
-    checkIfIamLeader(seq, context, replacement);
-    
-    return seq;
+    checkIfIamLeader(context, replacement);
+
+    return getSeq(context.leaderSeqPath);
   }
 
   private class ElectionWatcher implements Watcher {
     final String myNode,watchedNode;
-    final int seq;
     final ElectionContext context;
 
     private boolean canceled = false;
@@ -325,11 +359,10 @@ public  class LeaderElector {
     private ElectionWatcher(String myNode, String watchedNode, int seq, ElectionContext context) {
       this.myNode = myNode;
       this.watchedNode = watchedNode;
-      this.seq = seq;
       this.context = context;
     }
 
-    void cancel(String leaderSeqPath){
+    void cancel() {
       canceled = true;
 
     }
@@ -354,7 +387,7 @@ public  class LeaderElector {
       }
       try {
         // am I the next leader?
-        checkIfIamLeader(seq, context, true);
+        checkIfIamLeader(context, true);
       } catch (Exception e) {
         log.warn("", e);
       }
@@ -390,7 +423,7 @@ public  class LeaderElector {
   void retryElection(ElectionContext context, boolean joinAtHead) throws KeeperException, InterruptedException, IOException {
     ElectionWatcher watcher = this.watcher;
     ElectionContext ctx = context.copy();
-    if(watcher!= null) watcher.cancel(this.context.leaderSeqPath);
+    if (watcher != null) watcher.cancel();
     this.context.cancelElection();
     this.context = ctx;
     joinElection(ctx, true, joinAtHead);

Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionProcessor.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionProcessor.java?rev=1647857&r1=1647856&r2=1647857&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionProcessor.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionProcessor.java Wed Dec 24 22:55:36 2014
@@ -21,6 +21,9 @@ import static org.apache.solr.cloud.Assi
 import static org.apache.solr.common.cloud.ZkStateReader.BASE_URL_PROP;
 import static org.apache.solr.common.cloud.ZkStateReader.COLLECTION_PROP;
 import static org.apache.solr.common.cloud.ZkStateReader.CORE_NAME_PROP;
+import static org.apache.solr.common.cloud.ZkStateReader.ELECTION_NODE_PROP;
+import static org.apache.solr.common.cloud.ZkStateReader.NODE_NAME_PROP;
+import static org.apache.solr.common.cloud.ZkStateReader.REJOIN_AT_HEAD_PROP;
 import static org.apache.solr.common.cloud.ZkStateReader.REPLICA_PROP;
 import static org.apache.solr.common.cloud.ZkStateReader.PROPERTY_PROP;
 import static org.apache.solr.common.cloud.ZkStateReader.PROPERTY_VALUE_PROP;
@@ -659,7 +662,7 @@ public class OverseerCollectionProcessor
             balanceProperty(message);
             break;
           case REBALANCELEADERS:
-            processAssignLeaders(message);
+            processRebalanceLeaders(message);
             break;
           default:
             throw new SolrException(ErrorCode.BAD_REQUEST, "Unknown operation:"
@@ -687,42 +690,36 @@ public class OverseerCollectionProcessor
   }
 
   @SuppressWarnings("unchecked")
-  // re-purpose BALANCELEADERS to reassign a single leader over here
-  private void processAssignLeaders(ZkNodeProps message) throws KeeperException, InterruptedException {
-    String collectionName = message.getStr(COLLECTION_PROP);
-    String shardId = message.getStr(SHARD_ID_PROP);
-    String baseURL = message.getStr(BASE_URL_PROP);
-    String coreName = message.getStr(CORE_NAME_PROP);
-
-    if (StringUtils.isBlank(collectionName) || StringUtils.isBlank(shardId) || StringUtils.isBlank(baseURL) ||
-        StringUtils.isBlank(coreName)) {
-      throw new SolrException(ErrorCode.BAD_REQUEST,
-          String.format(Locale.ROOT, "The '%s', '%s', '%s' and '%s' parameters are required when assigning a leader",
-              COLLECTION_PROP, SHARD_ID_PROP, BASE_URL_PROP, CORE_NAME_PROP));
-    }
-    SolrZkClient zkClient = zkStateReader.getZkClient();
-    DistributedQueue inQueue = Overseer.getInQueue(zkClient);
-    Map<String, Object> propMap = new HashMap<>();
-    propMap.put(Overseer.QUEUE_OPERATION, OverseerAction.LEADER.toLower());
-    propMap.put(COLLECTION_PROP, collectionName);
-    propMap.put(SHARD_ID_PROP, shardId);
-    propMap.put(BASE_URL_PROP, baseURL);
-    propMap.put(CORE_NAME_PROP, coreName);
-    inQueue.offer(zkStateReader.toJSON(propMap));
-  }
+  private void processRebalanceLeaders(ZkNodeProps message) throws KeeperException, InterruptedException {
+    checkRequired(message, COLLECTION_PROP, SHARD_ID_PROP, CORE_NAME_PROP, ELECTION_NODE_PROP,
+        NODE_NAME_PROP, BASE_URL_PROP, REJOIN_AT_HEAD_PROP);
 
+    ModifiableSolrParams params = new ModifiableSolrParams();
+    params.set(COLLECTION_PROP, message.getStr(COLLECTION_PROP));
+    params.set(SHARD_ID_PROP, message.getStr(SHARD_ID_PROP));
+    params.set(REJOIN_AT_HEAD_PROP, message.getStr(REJOIN_AT_HEAD_PROP));
+    params.set(CoreAdminParams.ACTION, CoreAdminAction.REJOINLEADERELECTION.toString());
+    params.set(CORE_NAME_PROP, message.getStr(CORE_NAME_PROP));
+    params.set(NODE_NAME_PROP, message.getStr(NODE_NAME_PROP));
+    params.set(ELECTION_NODE_PROP, message.getStr(ELECTION_NODE_PROP));
+    params.set(BASE_URL_PROP, message.getStr(BASE_URL_PROP));
+
+    String baseUrl = message.getStr(BASE_URL_PROP);
+    ShardRequest sreq = new ShardRequest();
+    sreq.nodeName = message.getStr(ZkStateReader.CORE_NAME_PROP);
+    // yes, they must use same admin handler path everywhere...
+    params.set("qt", adminPath);
+    sreq.purpose = ShardRequest.PURPOSE_PRIVATE;
+    sreq.shards = new String[] {baseUrl};
+    sreq.actualShards = sreq.shards;
+    sreq.params = params;
+    ShardHandler shardHandler = shardHandlerFactory.getShardHandler();
+    shardHandler.submit(sreq, baseUrl, sreq.params);
+  }
 
   @SuppressWarnings("unchecked")
   private void processReplicaAddPropertyCommand(ZkNodeProps message) throws KeeperException, InterruptedException {
-    if (StringUtils.isBlank(message.getStr(COLLECTION_PROP)) ||
-        StringUtils.isBlank(message.getStr(SHARD_ID_PROP)) ||
-        StringUtils.isBlank(message.getStr(REPLICA_PROP)) ||
-        StringUtils.isBlank(message.getStr(PROPERTY_PROP)) ||
-        StringUtils.isBlank(message.getStr(PROPERTY_VALUE_PROP))) {
-      throw new SolrException(ErrorCode.BAD_REQUEST,
-          String.format(Locale.ROOT, "The '%s', '%s', '%s', '%s', and '%s' parameters are required for all replica properties add/delete operations",
-              COLLECTION_PROP, SHARD_ID_PROP, REPLICA_PROP, PROPERTY_PROP, PROPERTY_VALUE_PROP));
-    }
+    checkRequired(message, COLLECTION_PROP, SHARD_ID_PROP, REPLICA_PROP, PROPERTY_PROP, PROPERTY_VALUE_PROP);
     SolrZkClient zkClient = zkStateReader.getZkClient();
     DistributedQueue inQueue = Overseer.getInQueue(zkClient);
     Map<String, Object> propMap = new HashMap<>();
@@ -733,14 +730,7 @@ public class OverseerCollectionProcessor
   }
 
   private void processReplicaDeletePropertyCommand(ZkNodeProps message) throws KeeperException, InterruptedException {
-    if (StringUtils.isBlank(message.getStr(COLLECTION_PROP)) ||
-        StringUtils.isBlank(message.getStr(SHARD_ID_PROP)) ||
-        StringUtils.isBlank(message.getStr(REPLICA_PROP)) ||
-        StringUtils.isBlank(message.getStr(PROPERTY_PROP))) {
-      throw new SolrException(ErrorCode.BAD_REQUEST,
-          String.format(Locale.ROOT, "The '%s', '%s', '%s', and '%s' parameters are required for all replica properties add/delete operations",
-              COLLECTION_PROP, SHARD_ID_PROP, REPLICA_PROP, PROPERTY_PROP));
-    }
+    checkRequired(message, COLLECTION_PROP, SHARD_ID_PROP, REPLICA_PROP, PROPERTY_PROP);
     SolrZkClient zkClient = zkStateReader.getZkClient();
     DistributedQueue inQueue = Overseer.getInQueue(zkClient);
     Map<String, Object> propMap = new HashMap<>();

Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/ZkController.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/ZkController.java?rev=1647857&r1=1647856&r2=1647857&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/ZkController.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/ZkController.java Wed Dec 24 22:55:36 2014
@@ -80,6 +80,15 @@ import org.apache.solr.core.SolrResource
 import org.apache.solr.handler.component.ShardHandler;
 import org.apache.solr.update.UpdateLog;
 import org.apache.solr.update.UpdateShardHandler;
+
+import static org.apache.solr.common.cloud.ZkStateReader.BASE_URL_PROP;
+import static org.apache.solr.common.cloud.ZkStateReader.COLLECTION_PROP;
+import static org.apache.solr.common.cloud.ZkStateReader.CORE_NAME_PROP;
+import static org.apache.solr.common.cloud.ZkStateReader.ELECTION_NODE_PROP;
+import static org.apache.solr.common.cloud.ZkStateReader.NODE_NAME_PROP;
+import static org.apache.solr.common.cloud.ZkStateReader.REJOIN_AT_HEAD_PROP;
+import static org.apache.solr.common.cloud.ZkStateReader.SHARD_ID_PROP;
+
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.KeeperException.ConnectionLossException;
@@ -1022,7 +1031,7 @@ public final class ZkController {
  
     ZkNodeProps ourProps = new ZkNodeProps(props);
 
-    
+
     ElectionContext context = new ShardLeaderElectionContext(leaderElector, shardId,
         collection, coreNodeName, ourProps, this, cc);
 
@@ -1860,6 +1869,31 @@ public final class ZkController {
 
   }
 
+  public void rejoinShardLeaderElection(SolrParams params) {
+    try {
+      String collectionName = params.get(COLLECTION_PROP);
+      String shardId = params.get(SHARD_ID_PROP);
+      String nodeName = params.get(NODE_NAME_PROP);
+      String coreName = params.get(CORE_NAME_PROP);
+      String electionNode = params.get(ELECTION_NODE_PROP);
+      String baseUrl = params.get(BASE_URL_PROP);
+
+      ZkNodeProps zkProps = new ZkNodeProps(CORE_NAME_PROP, coreName, NODE_NAME_PROP, nodeName, COLLECTION_PROP, collectionName,
+          SHARD_ID_PROP, shardId, ELECTION_NODE_PROP, electionNode, BASE_URL_PROP, baseUrl);
+
+      ShardLeaderElectionContext context = new ShardLeaderElectionContext(leaderElector, shardId, collectionName,
+          nodeName, zkProps, this, getCoreContainer());
+      LeaderElector elect = new LeaderElector(this.zkClient);
+      context.leaderSeqPath = context.electionPath + LeaderElector.ELECTION_NODE + "/" + electionNode;
+      elect.setup(context);
+
+      elect.retryElection(context, params.getBool(REJOIN_AT_HEAD_PROP));
+    } catch (Exception e) {
+      throw new SolrException(ErrorCode.SERVER_ERROR, "Unable to rejoin election", e);
+    }
+
+  }
+
   public void checkOverseerDesignate() {
     try {
       byte[] data = zkClient.getData(ZkStateReader.ROLES, null, new Stat(), true);
@@ -2280,7 +2314,7 @@ public final class ZkController {
 
   private void setConfWatcher(String zkDir, Watcher watcher) {
     try {
-      zkClient.exists(zkDir,watcher,true);
+      zkClient.exists(zkDir, watcher, true);
     } catch (KeeperException e) {
       log.error("failed to set watcher for conf dir {} ", zkDir);
     } catch (InterruptedException e) {

Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java?rev=1647857&r1=1647856&r2=1647857&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java Wed Dec 24 22:55:36 2014
@@ -32,10 +32,11 @@ import static org.apache.solr.cloud.Over
 import static org.apache.solr.cloud.OverseerCollectionProcessor.SHARDS_PROP;
 import static org.apache.solr.common.cloud.ZkNodeProps.makeMap;
 import static org.apache.solr.common.cloud.ZkStateReader.ACTIVE;
-import static org.apache.solr.common.cloud.ZkStateReader.BASE_URL_PROP;
 import static org.apache.solr.common.cloud.ZkStateReader.COLLECTION_PROP;
 import static org.apache.solr.common.cloud.ZkStateReader.CORE_NAME_PROP;
+import static org.apache.solr.common.cloud.ZkStateReader.ELECTION_NODE_PROP;
 import static org.apache.solr.common.cloud.ZkStateReader.LEADER_PROP;
+import static org.apache.solr.common.cloud.ZkStateReader.REJOIN_AT_HEAD_PROP;
 import static org.apache.solr.common.cloud.ZkStateReader.SHARD_ID_PROP;
 import static org.apache.solr.common.cloud.ZkStateReader.PROPERTY_PROP;
 import static org.apache.solr.common.cloud.ZkStateReader.PROPERTY_VALUE_PROP;
@@ -44,6 +45,7 @@ import static org.apache.solr.common.clo
 import static org.apache.solr.common.cloud.ZkStateReader.REPLICA_PROP;
 import static org.apache.solr.common.cloud.ZkStateReader.MAX_AT_ONCE_PROP;
 import static org.apache.solr.common.cloud.ZkStateReader.MAX_WAIT_SECONDS_PROP;
+import static org.apache.solr.common.cloud.ZkStateReader.NODE_NAME_PROP;
 import static org.apache.solr.common.cloud.ZkStateReader.STATE_PROP;
 import static org.apache.solr.common.params.CollectionParams.CollectionAction.ADDROLE;
 import static org.apache.solr.common.params.CollectionParams.CollectionAction.ADDREPLICAPROP;
@@ -69,6 +71,7 @@ import java.nio.charset.StandardCharsets
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.Iterator;
+import java.util.List;
 import java.util.Locale;
 import java.util.Map;
 import java.util.Set;
@@ -82,6 +85,7 @@ import org.apache.solr.client.solrj.requ
 import org.apache.solr.client.solrj.request.CoreAdminRequest.RequestSyncShard;
 import org.apache.solr.cloud.DistributedQueue;
 import org.apache.solr.cloud.DistributedQueue.QueueEvent;
+import org.apache.solr.cloud.LeaderElector;
 import org.apache.solr.cloud.Overseer;
 import org.apache.solr.cloud.OverseerCollectionProcessor;
 import org.apache.solr.cloud.OverseerSolrResponse;
@@ -295,78 +299,25 @@ public class CollectionsHandler extends
     if (dc == null) {
       throw new SolrException(ErrorCode.BAD_REQUEST, "Collection '" + collectionName + "' does not exist, no action taken.");
     }
-    Map<String, String> current = new HashMap<>();
+    Map<String, String> currentRequests = new HashMap<>();
     int max = req.getParams().getInt(MAX_AT_ONCE_PROP, Integer.MAX_VALUE);
     if (max <= 0) max = Integer.MAX_VALUE;
     int maxWaitSecs = req.getParams().getInt(MAX_WAIT_SECONDS_PROP, 60);
     NamedList<Object> results = new NamedList<>();
-    SolrQueryResponse rspIgnore = new SolrQueryResponse();
-    final String inactivePreferreds = "inactivePreferreds";
-    final String alreadyLeaders = "alreadyLeaders";
+
     boolean keepGoing = true;
     for (Slice slice : dc.getSlices()) {
-      for (Replica replica : slice.getReplicas()) {
-        // Tell the replica to become the leader if we're the preferred leader AND active AND not the leader already
-        if (replica.getBool(SliceMutator.PREFERRED_LEADER_PROP, false) == false) {
-          continue;
-        }
-        if (StringUtils.equalsIgnoreCase(replica.getStr(STATE_PROP), ACTIVE) == false) {
-          NamedList<Object> inactives = (NamedList<Object>) results.get(inactivePreferreds);
-          if (inactives == null) {
-            inactives = new NamedList<>();
-            results.add(inactivePreferreds, inactives);
-          }
-          NamedList<Object> res = new NamedList<>();
-          res.add("status", "skipped");
-          res.add("msg", "Node is a referredLeader, but it's inactive. Skipping");
-          res.add("nodeName", replica.getNodeName());
-          inactives.add(replica.getName(), res);
-          break; // Don't try to assign if we're not active!
-        }        // OK, we're the one, get in the queue to become the leader.
-        if (replica.getBool(LEADER_PROP, false)) {
-          NamedList<Object> noops = (NamedList<Object>) results.get(alreadyLeaders);
-          if (noops == null) {
-            noops = new NamedList<>();
-            results.add(alreadyLeaders, noops);
-          }
-          NamedList<Object> res = new NamedList<>();
-          res.add("status", "success");
-          res.add("msg", "Already leader");
-          res.add("nodeName", replica.getNodeName());
-          noops.add(replica.getName(), res);
-          break; // already the leader, do nothing.
-        }
-        Map<String, Object> propMap = new HashMap<>();
-        propMap.put(Overseer.QUEUE_OPERATION, REBALANCELEADERS.toLower());
-        propMap.put(COLLECTION_PROP, collectionName);
-        propMap.put(SHARD_ID_PROP, slice.getName());
-        propMap.put(BASE_URL_PROP, replica.get(BASE_URL_PROP));
-
-        String coreName = (String) replica.get(CORE_NAME_PROP);
-        // Put it in the waiting list.
-        String asyncId = REBALANCELEADERS.toLower() + "_" + coreName;
-        current.put(asyncId, String.format(Locale.ROOT, "Collection: '%s', Shard: '%s', Core: '%s', BaseUrl: '%s'",
-            collectionName, slice.getName(), coreName, replica.get(BASE_URL_PROP)));
-
-        propMap.put(CORE_NAME_PROP, coreName);
-        propMap.put(ASYNC, asyncId);
-
-        ZkNodeProps m = new ZkNodeProps(propMap);
-        log.info("Queueing collection '" + collectionName + "' slice '" + slice.getName() + "' replica '" +
-                coreName + "' to become leader.");
-        handleResponse(REBALANCELEADERS.toLower(), m, rspIgnore); // Want to construct my own response here.
-        break; // Done with this slice, skip the rest of the replicas.
-      }
-      if (current.size() == max) {
-        log.info("Queued " + max + " leader reassgnments, waiting for some to complete.");
-        keepGoing = waitForLeaderChange(current, maxWaitSecs, false, results);
+      insurePreferredIsLeader(req, results, slice, currentRequests);
+      if (currentRequests.size() == max) {
+        log.info("Queued " + max + " leader reassignments, waiting for some to complete.");
+        keepGoing = waitForLeaderChange(currentRequests, maxWaitSecs, false, results);
         if (keepGoing == false) {
           break; // If we've waited longer than specified, don't continue to wait!
         }
       }
     }
     if (keepGoing == true) {
-      keepGoing = waitForLeaderChange(current, maxWaitSecs, true, results);
+      keepGoing = waitForLeaderChange(currentRequests, maxWaitSecs, true, results);
     }
     if (keepGoing == true) {
       log.info("All leader reassignments completed.");
@@ -377,6 +328,166 @@ public class CollectionsHandler extends
     rsp.getValues().addAll(results);
   }
 
+  private void insurePreferredIsLeader(SolrQueryRequest req, NamedList<Object> results,
+                                              Slice slice, Map<String, String> currentRequests) throws KeeperException, InterruptedException {
+    final String inactivePreferreds = "inactivePreferreds";
+    final String alreadyLeaders = "alreadyLeaders";
+    String collectionName = req.getParams().get(COLLECTION_PROP);
+
+    for (Replica replica : slice.getReplicas()) {
+      // Tell the replica to become the leader if we're the preferred leader AND active AND not the leader already
+      if (replica.getBool(SliceMutator.PREFERRED_LEADER_PROP, false) == false) {
+        continue;
+      }
+      // OK, we are the preferred leader, are we the actual leader?
+      if (replica.getBool(LEADER_PROP, false)) {
+        //We're a preferred leader, but we're _also_ the leader, don't need to do anything.
+        NamedList<Object> noops = (NamedList<Object>) results.get(alreadyLeaders);
+        if (noops == null) {
+          noops = new NamedList<>();
+          results.add(alreadyLeaders, noops);
+        }
+        NamedList<Object> res = new NamedList<>();
+        res.add("status", "success");
+        res.add("msg", "Already leader");
+        res.add("shard", slice.getName());
+        res.add("nodeName", replica.getNodeName());
+        noops.add(replica.getName(), res);
+        return; // already the leader, do nothing.
+      }
+
+      // We're the preferred leader, but someone else is leader. Only become leader if we're active.
+      if (StringUtils.equalsIgnoreCase(replica.getStr(STATE_PROP), ACTIVE) == false) {
+        NamedList<Object> inactives = (NamedList<Object>) results.get(inactivePreferreds);
+        if (inactives == null) {
+          inactives = new NamedList<>();
+          results.add(inactivePreferreds, inactives);
+        }
+        NamedList<Object> res = new NamedList<>();
+        res.add("status", "skipped");
+        res.add("msg", "Node is a referredLeader, but it's inactive. Skipping");
+        res.add("shard", slice.getName());
+        res.add("nodeName", replica.getNodeName());
+        inactives.add(replica.getName(), res);
+        return; // Don't try to become the leader if we're not active!
+      }
+
+      // Replica is the preferred leader but not the actual leader, do something about that.
+      // "Something" is
+      // 1> if the preferred leader isn't first in line, tell it to re-queue itself.
+      // 2> tell the actual leader to re-queue itself.
+
+      ZkStateReader zkStateReader = coreContainer.getZkController().getZkStateReader();
+
+      List<String> electionNodes = OverseerCollectionProcessor.getSortedElectionNodes(zkStateReader.getZkClient(),
+          ZkStateReader.getShardLeadersElectPath(collectionName, slice.getName()));
+
+      if (electionNodes.size() < 2) { // if there's only one node in the queue, should already be leader and we shouldn't be here anyway.
+        log.warn("Rebalancing leaders and slice " + slice.getName() + " has less than two elements in the leader " +
+            "election queue, but replica " + replica.getName() + " doesn't think it's the leader. Do nothing");
+        return;
+      }
+
+      // Ok, the sorting for election nodes is a bit strange. If the sequence numbers are the same, then the whole
+      // string is used, but that sorts nodes with the same sequence number by their session IDs from ZK.
+      // While this is determinate, it's not quite what we need, so re-queue nodes that aren't us and are
+      // watching the leader node..
+
+      String firstWatcher = electionNodes.get(1);
+
+      if (LeaderElector.getNodeName(firstWatcher).equals(replica.getName()) == false) {
+        makeReplicaFirstWatcher(collectionName, slice, replica);
+      }
+
+      String coreName = slice.getReplica(LeaderElector.getNodeName(electionNodes.get(0))).getStr(CORE_NAME_PROP);
+      rejoinElection(collectionName, slice, electionNodes.get(0), coreName, false);
+      waitForNodeChange(collectionName, slice, electionNodes.get(0));
+
+
+      return; // Done with this slice, skip the rest of the replicas.
+    }
+  }
+  // Put the replica in at the head of the queue and send all nodes with the same sequence number to the back of the list
+  void makeReplicaFirstWatcher(String collectionName, Slice slice, Replica replica)
+      throws KeeperException, InterruptedException {
+
+    ZkStateReader zkStateReader = coreContainer.getZkController().getZkStateReader();
+    List<String> electionNodes = OverseerCollectionProcessor.getSortedElectionNodes(zkStateReader.getZkClient(),
+        ZkStateReader.getShardLeadersElectPath(collectionName, slice.getName()));
+
+    // First, queue up the preferred leader at the head of the queue.
+    int newSeq = -1;
+    for (String electionNode : electionNodes) {
+      if (LeaderElector.getNodeName(electionNode).equals(replica.getName())) {
+        String coreName = slice.getReplica(LeaderElector.getNodeName(electionNode)).getStr(CORE_NAME_PROP);
+        rejoinElection(collectionName, slice, electionNode, coreName, true);
+        newSeq = waitForNodeChange(collectionName, slice, electionNode);
+        break;
+      }
+    }
+    if (newSeq == -1) {
+      return; // let's not continue if we didn't get what we expect. Possibly we're offline etc..
+    }
+
+    List<String> electionNodesTmp = OverseerCollectionProcessor.getSortedElectionNodes(zkStateReader.getZkClient(),
+        ZkStateReader.getShardLeadersElectPath(collectionName, slice.getName()));
+
+
+    // Now find other nodes that have the same sequence number as this node and re-queue them at the end of the queue.
+    electionNodes = OverseerCollectionProcessor.getSortedElectionNodes(zkStateReader.getZkClient(),
+        ZkStateReader.getShardLeadersElectPath(collectionName, slice.getName()));
+
+    for (String thisNode : electionNodes) {
+      if (LeaderElector.getSeq(thisNode) > newSeq) {
+        break;
+      }
+      if (LeaderElector.getNodeName(thisNode).equals(replica.getName())) {
+        continue;
+      }
+      if (LeaderElector.getSeq(thisNode) == newSeq) {
+        String coreName = slice.getReplica(LeaderElector.getNodeName(thisNode)).getStr(CORE_NAME_PROP);
+        rejoinElection(collectionName, slice, thisNode, coreName, false);
+        waitForNodeChange(collectionName, slice, thisNode);
+      }
+    }
+  }
+
+  int waitForNodeChange(String collectionName, Slice slice, String electionNode) throws InterruptedException, KeeperException {
+    String nodeName = LeaderElector.getNodeName(electionNode);
+    int oldSeq = LeaderElector.getSeq(electionNode);
+     for (int idx = 0; idx < 600; ++idx) {
+      ZkStateReader zkStateReader = coreContainer.getZkController().getZkStateReader();
+      List<String> electionNodes = OverseerCollectionProcessor.getSortedElectionNodes(zkStateReader.getZkClient(),
+          ZkStateReader.getShardLeadersElectPath(collectionName, slice.getName()));
+      for (String testNode : electionNodes) {
+        if (LeaderElector.getNodeName(testNode).equals(nodeName) && oldSeq != LeaderElector.getSeq(testNode)) {
+          return LeaderElector.getSeq(testNode);
+        }
+      }
+
+      Thread.sleep(100);
+    }
+    return -1;
+  }
+  private void rejoinElection(String collectionName, Slice slice, String electionNode, String core,
+                              boolean rejoinAtHead) throws KeeperException, InterruptedException {
+    Replica replica = slice.getReplica(LeaderElector.getNodeName(electionNode));
+    Map<String, Object> propMap = new HashMap<>();
+    propMap.put(COLLECTION_PROP, collectionName);
+    propMap.put(SHARD_ID_PROP, slice.getName());
+    propMap.put(Overseer.QUEUE_OPERATION, REBALANCELEADERS.toLower());
+    propMap.put(CORE_NAME_PROP, core);
+    propMap.put(NODE_NAME_PROP, replica.getName());
+    propMap.put(ZkStateReader.BASE_URL_PROP, replica.getProperties().get(ZkStateReader.BASE_URL_PROP));
+    propMap.put(REJOIN_AT_HEAD_PROP, Boolean.toString(rejoinAtHead)); // Get ourselves to be first in line.
+    propMap.put(ELECTION_NODE_PROP, electionNode);
+    String asyncId = REBALANCELEADERS.toLower() + "_" + core + "_" + Math.abs(System.nanoTime());
+    propMap.put(ASYNC, asyncId);
+    ZkNodeProps m = new ZkNodeProps(propMap);
+    SolrQueryResponse rspIgnore = new SolrQueryResponse(); // I'm constructing my own response
+    handleResponse(REBALANCELEADERS.toLower(), m, rspIgnore); // Want to construct my own response here.
+  }
+
   // currentAsyncIds - map of request IDs and reporting data (value)
   // maxWaitSecs - How long are we going to wait? Defaults to 30 seconds.
   // waitForAll - if true, do not return until all assignments have been made.

Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/admin/CoreAdminHandler.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/admin/CoreAdminHandler.java?rev=1647857&r1=1647856&r2=1647857&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/admin/CoreAdminHandler.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/admin/CoreAdminHandler.java Wed Dec 24 22:55:36 2014
@@ -288,6 +288,16 @@ public class CoreAdminHandler extends Re
         }
         case LOAD:
           break;
+
+        case REJOINLEADERELECTION:
+          ZkController zkController = coreContainer.getZkController();
+
+          if (zkController != null) {
+            zkController.rejoinShardLeaderElection(req.getParams());
+          } else {
+            log.warn("zkController is null in CoreAdminHandler.handleRequestInternal:REJOINLEADERELCTIONS. No action taken.");
+          }
+          break;
       }
     }
     rsp.setHttpCaching(false);

Modified: lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/LeaderElectionTest.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/LeaderElectionTest.java?rev=1647857&r1=1647856&r2=1647857&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/LeaderElectionTest.java (original)
+++ lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/LeaderElectionTest.java Wed Dec 24 22:55:36 2014
@@ -230,7 +230,7 @@ public class LeaderElectionTest extends
     props = new ZkNodeProps(ZkStateReader.BASE_URL_PROP,
         "http://127.0.0.1/solr/", ZkStateReader.CORE_NAME_PROP, "2");
     ElectionContext context = new ShardLeaderElectionContextBase(second,
-        "slice1", "collection2", "dummynode1", props, zkStateReader);
+        "slice1", "collection2", "dummynode2", props, zkStateReader);
     second.setup(context);
     second.joinElection(context, false);
     Thread.sleep(1000);

Added: lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/TestRebalanceLeaders.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/TestRebalanceLeaders.java?rev=1647857&view=auto
==============================================================================
--- lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/TestRebalanceLeaders.java (added)
+++ lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/TestRebalanceLeaders.java Wed Dec 24 22:55:36 2014
@@ -0,0 +1,340 @@
+package org.apache.solr.cloud;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.solr.client.solrj.SolrRequest;
+import org.apache.solr.client.solrj.SolrServerException;
+import org.apache.solr.client.solrj.impl.CloudSolrServer;
+import org.apache.solr.client.solrj.request.QueryRequest;
+import org.apache.solr.common.cloud.ClusterState;
+import org.apache.solr.common.cloud.DocCollection;
+import org.apache.solr.common.cloud.Replica;
+import org.apache.solr.common.cloud.Slice;
+import org.apache.solr.common.cloud.ZkStateReader;
+import org.apache.solr.common.params.CollectionParams;
+import org.apache.solr.common.params.ModifiableSolrParams;
+import org.apache.solr.common.util.NamedList;
+import org.apache.zookeeper.KeeperException;
+import org.junit.Before;
+
+
+public class TestRebalanceLeaders extends AbstractFullDistribZkTestBase {
+
+  public static final String COLLECTION_NAME = "testcollection";
+
+  public TestRebalanceLeaders() {
+    schemaString = "schema15.xml";      // we need a string id
+  }
+
+  @Override
+  @Before
+  public void setUp() throws Exception {
+    fixShardCount = true;
+    sliceCount = 4;
+    shardCount = 4;
+    super.setUp();
+  }
+
+  int reps = 10;
+  int timeoutMs = 60000;
+  Map<String, List<Replica>> initial = new HashMap<>();
+
+  Map<String, Replica> expected = new HashMap<>();
+
+
+  @Override
+  public void doTest() throws Exception {
+    CloudSolrServer client = createCloudClient(null);
+    reps = random().nextInt(9) + 1; // make sure and do at least one.
+    try {
+      // Mix up a bunch of different combinations of shards and replicas in order to exercise boundary cases.
+      // shards, replicationfactor, maxreplicaspernode
+      int shards = random().nextInt(7);
+      if (shards < 2) shards = 2;
+      int rFactor = random().nextInt(4);
+      if (rFactor < 2) rFactor = 2;
+      createCollection(null, COLLECTION_NAME, shards, rFactor, shards * rFactor + 1, client, null, "conf1");
+    } finally {
+      //remove collections
+      client.shutdown();
+    }
+
+    waitForCollection(cloudClient.getZkStateReader(), COLLECTION_NAME, 2);
+    waitForRecoveriesToFinish(COLLECTION_NAME, false);
+
+    listCollection();
+
+    rebalanceLeaderTest();
+  }
+
+  private void listCollection() throws IOException, SolrServerException {
+    //CloudSolrServer client = createCloudClient(null);
+    try {
+      ModifiableSolrParams params = new ModifiableSolrParams();
+      params.set("action", CollectionParams.CollectionAction.LIST.toString());
+      SolrRequest request = new QueryRequest(params);
+      request.setPath("/admin/collections");
+
+      NamedList<Object> rsp = cloudClient.request(request);
+      List<String> collections = (List<String>) rsp.get("collections");
+      assertTrue("control_collection was not found in list", collections.contains("control_collection"));
+      assertTrue(DEFAULT_COLLECTION + " was not found in list", collections.contains(DEFAULT_COLLECTION));
+      assertTrue(COLLECTION_NAME + " was not found in list", collections.contains(COLLECTION_NAME));
+    } finally {
+      //remove collections
+      //client.shutdown();
+    }
+  }
+
+  void recordInitialState() throws InterruptedException {
+    Map<String, Slice> slices = cloudClient.getZkStateReader().getClusterState().getCollection(COLLECTION_NAME).getSlicesMap();
+
+    // Assemble a list of all the replicas for all the shards in a convenient way to look at them.
+    for (Map.Entry<String, Slice> ent : slices.entrySet()) {
+      initial.put(ent.getKey(), new ArrayList<>(ent.getValue().getReplicas()));
+    }
+  }
+
+  void rebalanceLeaderTest() throws InterruptedException, IOException, SolrServerException, KeeperException {
+    recordInitialState();
+    for (int idx = 0; idx < reps; ++idx) {
+      issueCommands();
+      checkConsistency();
+    }
+  }
+
+  // After we've called the rebalance command, we want to insure that:
+  // 1> all replicas appear once and only once in the respective leader election queue
+  // 2> All the replicas we _think_ are leaders are in the 0th position in the leader election queue.
+  // 3> The node that ZooKeeper thinks is the leader is the one we think should be the leader.
+  void checkConsistency() throws InterruptedException, KeeperException {
+    long start = System.currentTimeMillis();
+
+    while ((System.currentTimeMillis() - start) < timeoutMs) {
+      if (checkAppearOnce() &&
+          checkElectionZero() &&
+          checkZkLeadersAgree()) {
+        return;
+      }
+      Thread.sleep(1000);
+    }
+    fail("Checking the rebalance leader command failed");
+  }
+
+
+  // Do all the nodes appear exactly once in the leader election queue and vice-versa?
+  Boolean checkAppearOnce() throws KeeperException, InterruptedException {
+
+    for (Map.Entry<String, List<Replica>> ent : initial.entrySet()) {
+      List<String> leaderQueue = cloudClient.getZkStateReader().getZkClient().getChildren("/collections/" + COLLECTION_NAME +
+          "/leader_elect/" + ent.getKey() + "/election", null, true);
+
+      if (leaderQueue.size() != ent.getValue().size()) {
+        return false;
+      }
+      // Check that each election node has a corresponding replica.
+      for (String electionNode : leaderQueue) {
+        if (checkReplicaName(LeaderElector.getNodeName(electionNode), ent.getValue())) {
+          continue;
+        }
+        return false;
+      }
+      // Check that each replica has an election node.
+      for (Replica rep : ent.getValue()) {
+        if (checkElectionNode(rep.getName(), leaderQueue)) {
+          continue;
+        }
+        return false;
+      }
+    }
+    return true;
+  }
+
+  // Check that the given name is in the leader election queue
+  Boolean checkElectionNode(String repName, List<String> leaderQueue) {
+    for (String electionNode : leaderQueue) {
+      if (repName.equals(LeaderElector.getNodeName(electionNode))) {
+        return true;
+      }
+    }
+    return false;
+  }
+
+  // Check that the name passed in corresponds to a replica.
+  Boolean checkReplicaName(String toCheck, List<Replica> replicas) {
+    for (Replica rep : replicas) {
+      if (toCheck.equals(rep.getName())) {
+        return true;
+      }
+    }
+    return false;
+  }
+
+  // Get the shard leader election from ZK and sort it. The node may not actually be there, so retry
+  List<String> getOverseerSort(String key) {
+    List<String> ret = null;
+    try {
+      ret = OverseerCollectionProcessor.getSortedElectionNodes(cloudClient.getZkStateReader().getZkClient(),
+          "/collections/" + COLLECTION_NAME + "/leader_elect/" + key + "/election");
+      return ret;
+    } catch (KeeperException e) {
+      cloudClient.connect();
+    } catch (InterruptedException e) {
+      return null;
+    }
+    return null;
+  }
+
+  // Is every node we think is the leader in the zeroth position in the leader election queue?
+  Boolean checkElectionZero() {
+    for (Map.Entry<String, Replica> ent : expected.entrySet()) {
+
+      List<String> leaderQueue = getOverseerSort(ent.getKey());
+      if (leaderQueue == null) return false;
+
+      String electName = LeaderElector.getNodeName(leaderQueue.get(0));
+      String coreName = ent.getValue().getName();
+      if (electName.equals(coreName) == false) {
+        return false;
+      }
+    }
+    return true;
+  }
+
+  // Do who we _think_ should be the leader agree with the leader nodes?
+  Boolean checkZkLeadersAgree() throws KeeperException, InterruptedException {
+    for (Map.Entry<String, Replica> ent : expected.entrySet()) {
+
+      String path = "/collections/" + COLLECTION_NAME + "/leaders/" + ent.getKey();
+      byte[] data = getZkData(cloudClient, path);
+      if (data == null) return false;
+
+      String repCore = null;
+      String zkCore = null;
+
+      if (data == null) {
+        return false;
+      } else {
+        Map m = (Map) ZkStateReader.fromJSON(data);
+        zkCore = (String) m.get("core");
+        repCore = ent.getValue().getStr("core");
+        if (zkCore.equals(repCore) == false) {
+          return false;
+        }
+      }
+    }
+    return true;
+  }
+
+  byte[] getZkData(CloudSolrServer server, String path) {
+    org.apache.zookeeper.data.Stat stat = new org.apache.zookeeper.data.Stat();
+    long start = System.currentTimeMillis();
+    try {
+      byte[] data = server.getZkStateReader().getZkClient().getData(path, null, stat, true);
+      if (data != null) {
+        return data;
+      }
+    } catch (KeeperException.NoNodeException e) {
+      try {
+        Thread.sleep(1000);
+      } catch (InterruptedException e1) {
+        return null;
+      }
+    } catch (InterruptedException | KeeperException e) {
+      return null;
+    }
+    return null;
+  }
+
+  // It's OK not to check the return here since the subsequent tests will fail.
+  void issueCommands() throws IOException, SolrServerException, KeeperException, InterruptedException {
+
+    // Find a replica to make the preferredLeader. NOTE: may be one that's _already_ leader!
+    expected.clear();
+    for (Map.Entry<String, List<Replica>> ent : initial.entrySet()) {
+      List<Replica> replicas = ent.getValue();
+      Replica rep = replicas.get(Math.abs(random().nextInt()) % replicas.size());
+      expected.put(ent.getKey(), rep);
+      issuePreferred(ent.getKey(), rep);
+    }
+
+    if (waitForAllPreferreds() == false) {
+      fail("Waited for timeout for preferredLeader assignments to be made and they werent.");
+    }
+    //fillExpectedWithCurrent();
+    // Now rebalance the leaders
+    ModifiableSolrParams params = new ModifiableSolrParams();
+    params.set("action", CollectionParams.CollectionAction.REBALANCELEADERS.toString());
+
+    // Insure we get error returns when omitting required parameters
+    params.set("collection", COLLECTION_NAME);
+    params.set("maxAtOnce", "10");
+    SolrRequest request = new QueryRequest(params);
+    request.setPath("/admin/collections");
+    cloudClient.request(request);
+  }
+
+  void issuePreferred(String slice, Replica rep) throws IOException, SolrServerException, InterruptedException {
+    ModifiableSolrParams params = new ModifiableSolrParams();
+    params.set("action", CollectionParams.CollectionAction.ADDREPLICAPROP.toString());
+
+    // Insure we get error returns when omitting required parameters
+
+    params.set("collection", COLLECTION_NAME);
+    params.set("shard", slice);
+    params.set("replica", rep.getName());
+    params.set("property", "preferredLeader");
+    params.set("property.value", "true");
+
+    SolrRequest request = new QueryRequest(params);
+    request.setPath("/admin/collections");
+    cloudClient.request(request);
+  }
+
+  boolean waitForAllPreferreds() throws KeeperException, InterruptedException {
+    boolean goAgain = true;
+    long start = System.currentTimeMillis();
+    while (System.currentTimeMillis() - start < timeoutMs) {
+      goAgain = false;
+      cloudClient.getZkStateReader().updateClusterState(true);
+      Map<String, Slice> slices = cloudClient.getZkStateReader().getClusterState().getCollection(COLLECTION_NAME).getSlicesMap();
+
+      for (Map.Entry<String, Replica> ent : expected.entrySet()) {
+        Replica me = slices.get(ent.getKey()).getReplica(ent.getValue().getName());
+        if (me.getBool("property.preferredleader", false) == false) {
+          goAgain = true;
+          break;
+        }
+      }
+      if (goAgain) {
+        Thread.sleep(250);
+      } else {
+        return true;
+      }
+    }
+    return false;
+  }
+
+}
+

Modified: lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java?rev=1647857&r1=1647856&r2=1647857&view=diff
==============================================================================
--- lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java (original)
+++ lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java Wed Dec 24 22:55:36 2014
@@ -62,6 +62,7 @@ public class ZkStateReader implements Cl
   public static final String STATE_PROP = "state";
   public static final String CORE_NAME_PROP = "core";
   public static final String COLLECTION_PROP = "collection";
+  public static final String ELECTION_NODE_PROP = "election_node";
   public static final String SHARD_ID_PROP = "shard";
   public static final String REPLICA_PROP = "replica";
   public static final String SHARD_RANGE_PROP = "shard_range";
@@ -78,6 +79,7 @@ public class ZkStateReader implements Cl
   public static final String ALIASES = "/aliases.json";
   public static final String CLUSTER_STATE = "/clusterstate.json";
   public static final String CLUSTER_PROPS = "/clusterprops.json";
+  public static final String REJOIN_AT_HEAD_PROP = "rejoinAtHead";
 
   public static final String REPLICATION_FACTOR = "replicationFactor";
   public static final String MAX_SHARDS_PER_NODE = "maxShardsPerNode";
@@ -102,9 +104,10 @@ public class ZkStateReader implements Cl
 
   private static final long SOLRCLOUD_UPDATE_DELAY = Long.parseLong(System.getProperty("solrcloud.update.delay", "5000"));
 
-  public static final String LEADER_ELECT_ZKNODE = "/leader_elect";
+  public static final String LEADER_ELECT_ZKNODE = "leader_elect";
 
   public static final String SHARD_LEADERS_ZKNODE = "leaders";
+  public static final String ELECTION_NODE = "election";
 
   private final Set<String> watchedCollections = new HashSet<String>();
 
@@ -658,6 +661,16 @@ public class ZkStateReader implements Cl
         : "");
   }
 
+  /**
+   * Get path where shard leader elections ephemeral nodes are.
+   */
+  public static String getShardLeadersElectPath(String collection, String shardId) {
+    return COLLECTIONS_ZKNODE + "/" + collection + "/"
+        + LEADER_ELECT_ZKNODE  + (shardId != null ? ("/" + shardId + "/" + ELECTION_NODE)
+        : "");
+  }
+
+
   public List<ZkCoreNodeProps> getReplicaProps(String collection,
       String shardId, String thisCoreNodeName) {
     return getReplicaProps(collection, shardId, thisCoreNodeName, null);

Modified: lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/common/params/CoreAdminParams.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/common/params/CoreAdminParams.java?rev=1647857&r1=1647856&r2=1647857&view=diff
==============================================================================
--- lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/common/params/CoreAdminParams.java (original)
+++ lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/common/params/CoreAdminParams.java Wed Dec 24 22:55:36 2014
@@ -135,7 +135,8 @@ public abstract class CoreAdminParams
     LOAD_ON_STARTUP,
     TRANSIENT,
     OVERSEEROP,
-    REQUESTSTATUS;
+    REQUESTSTATUS,
+    REJOINLEADERELECTION;
     
     public static CoreAdminAction get( String p )
     {