You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ma...@apache.org on 2015/09/01 18:13:39 UTC

svn commit: r1700603 - in /lucene/dev/trunk/solr: ./ core/src/java/org/apache/solr/cloud/ core/src/java/org/apache/solr/handler/ core/src/java/org/apache/solr/handler/admin/ core/src/test/org/apache/solr/cloud/ solrj/src/java/org/apache/solr/common/clo...

Author: markrmiller
Date: Tue Sep  1 16:13:38 2015
New Revision: 1700603

URL: http://svn.apache.org/r1700603
Log:
SOLR-7844: Zookeeper session expiry during shard leader election can cause multiple leaders.

Modified:
    lucene/dev/trunk/solr/CHANGES.txt
    lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java
    lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/LeaderElector.java
    lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionMessageHandler.java
    lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/ZkController.java
    lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/CdcrLeaderStateManager.java
    lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/admin/CoreAdminHandler.java
    lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/admin/RebalanceLeaders.java
    lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/TestRebalanceLeaders.java
    lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java
    lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/common/util/RetryUtil.java

Modified: lucene/dev/trunk/solr/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/CHANGES.txt?rev=1700603&r1=1700602&r2=1700603&view=diff
==============================================================================
--- lucene/dev/trunk/solr/CHANGES.txt (original)
+++ lucene/dev/trunk/solr/CHANGES.txt Tue Sep  1 16:13:38 2015
@@ -181,6 +181,9 @@ Bug Fixes
 
 * SOLR-7988: SolrJ could not make requests to handlers with '/admin/' prefix (noble , ludovic Boutros)
 
+* SOLR-7844: Zookeeper session expiry during shard leader election can cause multiple leaders.
+  (Mike Roberts, Mark Miller, Jessica Cheng)
+
 Optimizations
 ----------------------
 

Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java?rev=1700603&r1=1700602&r2=1700603&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java Tue Sep  1 16:13:38 2015
@@ -2,10 +2,12 @@ package org.apache.solr.cloud;
 
 import java.io.Closeable;
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
 
+import org.apache.hadoop.fs.Path;
 import org.apache.lucene.search.MatchAllDocsQuery;
 import org.apache.solr.cloud.overseer.OverseerAction;
 import org.apache.solr.common.SolrException;
@@ -30,6 +32,11 @@ import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.KeeperException.NoNodeException;
 import org.apache.zookeeper.KeeperException.NodeExistsException;
+import org.apache.zookeeper.Op;
+import org.apache.zookeeper.OpResult;
+import org.apache.zookeeper.OpResult.SetDataResult;
+import org.apache.zookeeper.ZooDefs;
+import org.apache.zookeeper.data.Stat;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -73,16 +80,16 @@ public abstract class ElectionContext im
   }
   
   public void cancelElection() throws InterruptedException, KeeperException {
-    if( leaderSeqPath != null ){
+    if (leaderSeqPath != null) {
       try {
-        log.info("canceling election {}",leaderSeqPath );
+        log.info("Canceling election {}", leaderSeqPath);
         zkClient.delete(leaderSeqPath, -1, true);
       } catch (NoNodeException e) {
         // fine
-        log.warn("cancelElection did not find election node to remove {}" ,leaderSeqPath);
+        log.info("cancelElection did not find election node to remove {}", leaderSeqPath);
       }
     } else {
-      log.warn("cancelElection skipped as this context has not been initialized");
+      log.info("cancelElection skipped as this context has not been initialized");
     }
   }
 
@@ -104,6 +111,7 @@ class ShardLeaderElectionContextBase ext
   protected String shardId;
   protected String collection;
   protected LeaderElector leaderElector;
+  protected volatile Integer leaderZkNodeParentVersion;
   
   public ShardLeaderElectionContextBase(LeaderElector leaderElector,
       final String shardId, final String collection, final String coreNodeName,
@@ -129,25 +137,81 @@ class ShardLeaderElectionContextBase ext
   }
   
   @Override
+  public void cancelElection() throws InterruptedException, KeeperException {
+    if (leaderZkNodeParentVersion != null) {
+      try {
+        // We need to be careful and make sure we *only* delete our own leader registration node.
+        // We do this by using a multi and ensuring the parent znode of the leader registration node
+        // matches the version we expect - there is a setData call that increments the parent's znode
+        // version whenever a leader registers.
+        log.info("Removing leader registration node on cancel: {} {}", leaderPath, leaderZkNodeParentVersion);
+        List<Op> ops = new ArrayList<>(2);
+        ops.add(Op.check(new Path(leaderPath).getParent().toString(), leaderZkNodeParentVersion));
+        ops.add(Op.delete(leaderPath, -1));
+        zkClient.multi(ops, true);
+      } catch (KeeperException.NoNodeException nne) {
+        // no problem
+        log.info("No leader registration node found to remove: {}", leaderPath);
+      } catch (KeeperException.BadVersionException bve) {
+        log.info("Cannot remove leader registration node because the current registered node is not ours: {}", leaderPath);
+        // no problem
+      } catch (InterruptedException e) {
+        throw e;
+      } catch (Exception e) {
+        SolrException.log(log, e);
+      }
+      leaderZkNodeParentVersion = null;
+    } else {
+      log.info("No version found for ephemeral leader parent node, won't remove previous leader registration.");
+    }
+    super.cancelElection();
+  }
+  
+  @Override
   void runLeaderProcess(boolean weAreReplacement, int pauseBeforeStartMs)
       throws KeeperException, InterruptedException, IOException {
-    // register as leader - if an ephemeral is already there, wait just a bit
-    // to see if it goes away
+    // register as leader - if an ephemeral is already there, wait to see if it goes away
+    String parent = new Path(leaderPath).getParent().toString();
+    ZkCmdExecutor zcmd = new ZkCmdExecutor(30000);
+    zcmd.ensureExists(parent, zkClient);
+
     try {
-      RetryUtil.retryOnThrowable(NodeExistsException.class, 15000, 1000,
-          new RetryCmd() {
-            @Override
-            public void execute() throws Throwable {
-              zkClient.makePath(leaderPath, Utils.toJSON(leaderProps), CreateMode.EPHEMERAL, true);
+      RetryUtil.retryOnThrowable(NodeExistsException.class, 60000, 5000, new RetryCmd() {
+        
+        @Override
+        public void execute() throws InterruptedException, KeeperException {
+          log.info("Creating leader registration node", leaderPath);
+          List<Op> ops = new ArrayList<>(2);
+          
+          // We use a multi operation to get the parent nodes version, which will
+          // be used to make sure we only remove our own leader registration node.
+          // The setData call used to get the parent version is also the trigger to
+          // increment the version. We also do a sanity check that our leaderSeqPath exists.
+          
+          ops.add(Op.check(leaderSeqPath, -1));
+          ops.add(Op.create(leaderPath, Utils.toJSON(leaderProps), zkClient.getZkACLProvider().getACLsToAdd(leaderPath), CreateMode.EPHEMERAL));
+          ops.add(Op.setData(parent, null, -1));
+          List<OpResult> results;
+          
+          results = zkClient.multi(ops, true);
+          
+          for (OpResult result : results) {
+            if (result.getType() == ZooDefs.OpCode.setData) {
+              SetDataResult dresult = (SetDataResult) result;
+              Stat stat = dresult.getStat();
+              leaderZkNodeParentVersion = stat.getVersion();
+              return;
             }
           }
-      );
+          assert leaderZkNodeParentVersion != null;
+        }
+      });
     } catch (Throwable t) {
       if (t instanceof OutOfMemoryError) {
         throw (OutOfMemoryError) t;
       }
       throw new SolrException(ErrorCode.SERVER_ERROR, "Could not register as the leader because creating the ephemeral registration node in ZooKeeper failed", t);
-    }
+    } 
     
     assert shardId != null;
     ZkNodeProps m = ZkNodeProps.fromKeyVals(Overseer.QUEUE_OPERATION,
@@ -158,6 +222,10 @@ class ShardLeaderElectionContextBase ext
         leaderProps.getProperties().get(ZkStateReader.CORE_NAME_PROP),
         ZkStateReader.STATE_PROP, Replica.State.ACTIVE.toString());
     Overseer.getInQueue(zkClient).offer(Utils.toJSON(m));
+  }
+
+  public LeaderElector getLeaderElector() {
+    return leaderElector;
   }  
 }
 
@@ -203,7 +271,6 @@ final class ShardLeaderElectionContext e
     ActionThrottle lt;
     try (SolrCore core = cc.getCore(coreName)) {
       if (core == null) {
-        cancelElection();
         throw new SolrException(ErrorCode.SERVER_ERROR, "SolrCore not found:" + coreName + " in " + cc.getCoreNames());
       }
       MDCLoggingContext.setCore(core);
@@ -225,6 +292,13 @@ final class ShardLeaderElectionContext e
         waitForReplicasToComeUp(leaderVoteWait);
       }
       
+      if (isClosed) {
+        // Solr is shutting down or the ZooKeeper session expired while waiting for replicas. If the later, 
+        // we cannot be sure we are still the leader, so we should bail out. The OnReconnect handler will 
+        // re-register the cores and handle a new leadership election.
+        return;
+      }
+      
       try (SolrCore core = cc.getCore(coreName)) {
         
         if (core == null) {
@@ -312,34 +386,38 @@ final class ShardLeaderElectionContext e
       }
       
       boolean isLeader = true;
-      try {
-        super.runLeaderProcess(weAreReplacement, 0);
-      } catch (Exception e) {
-        isLeader = false;
-        SolrException.log(log, "There was a problem trying to register as the leader", e);
-        
-        try (SolrCore core = cc.getCore(coreName)) {
+      if (!isClosed) {
+        try {
+          super.runLeaderProcess(weAreReplacement, 0);
+        } catch (Exception e) {
+          isLeader = false;
+          SolrException.log(log, "There was a problem trying to register as the leader", e);
           
-          if (core == null) {
-            log.debug("SolrCore not found:" + coreName + " in " + cc.getCoreNames());
-            return;
+          try (SolrCore core = cc.getCore(coreName)) {
+            
+            if (core == null) {
+              log.debug("SolrCore not found:" + coreName + " in " + cc.getCoreNames());
+              return;
+            }
+            
+            core.getCoreDescriptor().getCloudDescriptor().setLeader(false);
+            
+            // we could not publish ourselves as leader - try and rejoin election
+            rejoinLeaderElection(core);
           }
-          
-          core.getCoreDescriptor().getCloudDescriptor().setLeader(false);
-          
-          // we could not publish ourselves as leader - try and rejoin election
-          rejoinLeaderElection(core);
         }
-      }
-      
-      if (isLeader) {
-        // check for any replicas in my shard that were set to down by the previous leader
-        try {
-          startLeaderInitiatedRecoveryOnReplicas(coreName);
-        } catch (Exception exc) {
-          // don't want leader election to fail because of
-          // an error trying to tell others to recover
+        
+        if (isLeader) {
+          // check for any replicas in my shard that were set to down by the previous leader
+          try {
+            startLeaderInitiatedRecoveryOnReplicas(coreName);
+          } catch (Exception exc) {
+            // don't want leader election to fail because of
+            // an error trying to tell others to recover
+          }
         }
+      } else {
+        cancelElection();
       }
     } finally {
       MDCLoggingContext.clear();

Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/LeaderElector.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/LeaderElector.java?rev=1700603&r1=1700602&r2=1700603&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/LeaderElector.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/LeaderElector.java Tue Sep  1 16:13:38 2015
@@ -22,10 +22,12 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.List;
+import java.util.Map;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
 import org.apache.commons.lang.StringUtils;
+import org.apache.solr.cloud.ZkController.ContextKey;
 import org.apache.solr.common.SolrException;
 import org.apache.solr.common.cloud.SolrZkClient;
 import org.apache.solr.common.cloud.ZkCmdExecutor;
@@ -70,11 +72,21 @@ public  class LeaderElector {
 
   private ElectionWatcher watcher;
 
+  private Map<ContextKey,ElectionContext> electionContexts;
+  private ContextKey contextKey;
+
   public LeaderElector(SolrZkClient zkClient) {
     this.zkClient = zkClient;
     zkCmdExecutor = new ZkCmdExecutor(zkClient.getZkClientTimeout());
   }
   
+  public LeaderElector(SolrZkClient zkClient, ContextKey key, Map<ContextKey,ElectionContext> electionContexts) {
+    this.zkClient = zkClient;
+    zkCmdExecutor = new ZkCmdExecutor(zkClient.getZkClientTimeout());
+    this.electionContexts = electionContexts;
+    this.contextKey = key;
+  }
+
   public ElectionContext getContext() {
     return context;
   }
@@ -140,20 +152,6 @@ public  class LeaderElector {
         retryElection(context, false);//join at the tail again
         return;
       }
-      // first we delete the node advertising the old leader in case the ephem is still there
-      try {
-        zkClient.delete(context.leaderPath, -1, true);
-      }catch (KeeperException.NoNodeException nne){
-        //no problem
-      }catch (InterruptedException e){
-        throw e;
-      } catch (Exception e) {
-        //failed to delete the leader node
-        log.error("leader elect delete error",e);
-        retryElection(context, false);
-        return;
-        // fine
-      }
 
       try {
         runIamLeaderProcess(context, replacement);
@@ -423,6 +421,9 @@ public  class LeaderElector {
   void retryElection(ElectionContext context, boolean joinAtHead) throws KeeperException, InterruptedException, IOException {
     ElectionWatcher watcher = this.watcher;
     ElectionContext ctx = context.copy();
+    if (electionContexts != null) {
+      electionContexts.put(contextKey, ctx);
+    }
     if (watcher != null) watcher.cancel();
     this.context.cancelElection();
     this.context = ctx;

Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionMessageHandler.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionMessageHandler.java?rev=1700603&r1=1700602&r2=1700603&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionMessageHandler.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionMessageHandler.java Tue Sep  1 16:13:38 2015
@@ -97,7 +97,7 @@ import static org.apache.solr.common.clo
 import static org.apache.solr.common.cloud.ZkStateReader.CORE_NAME_PROP;
 import static org.apache.solr.common.cloud.ZkStateReader.ELECTION_NODE_PROP;
 import static org.apache.solr.common.cloud.ZkStateReader.MAX_SHARDS_PER_NODE;
-import static org.apache.solr.common.cloud.ZkStateReader.NODE_NAME_PROP;
+import static org.apache.solr.common.cloud.ZkStateReader.CORE_NODE_NAME_PROP;
 import static org.apache.solr.common.cloud.ZkStateReader.PROPERTY_PROP;
 import static org.apache.solr.common.cloud.ZkStateReader.PROPERTY_VALUE_PROP;
 import static org.apache.solr.common.cloud.ZkStateReader.REJOIN_AT_HEAD_PROP;
@@ -309,7 +309,7 @@ public class OverseerCollectionMessageHa
   @SuppressWarnings("unchecked")
   private void processRebalanceLeaders(ZkNodeProps message) throws KeeperException, InterruptedException {
     checkRequired(message, COLLECTION_PROP, SHARD_ID_PROP, CORE_NAME_PROP, ELECTION_NODE_PROP,
-        NODE_NAME_PROP, BASE_URL_PROP, REJOIN_AT_HEAD_PROP);
+        CORE_NODE_NAME_PROP, BASE_URL_PROP, REJOIN_AT_HEAD_PROP);
 
     ModifiableSolrParams params = new ModifiableSolrParams();
     params.set(COLLECTION_PROP, message.getStr(COLLECTION_PROP));
@@ -317,7 +317,7 @@ public class OverseerCollectionMessageHa
     params.set(REJOIN_AT_HEAD_PROP, message.getStr(REJOIN_AT_HEAD_PROP));
     params.set(CoreAdminParams.ACTION, CoreAdminAction.REJOINLEADERELECTION.toString());
     params.set(CORE_NAME_PROP, message.getStr(CORE_NAME_PROP));
-    params.set(NODE_NAME_PROP, message.getStr(NODE_NAME_PROP));
+    params.set(CORE_NODE_NAME_PROP, message.getStr(CORE_NODE_NAME_PROP));
     params.set(ELECTION_NODE_PROP, message.getStr(ELECTION_NODE_PROP));
     params.set(BASE_URL_PROP, message.getStr(BASE_URL_PROP));
 

Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/ZkController.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/ZkController.java?rev=1700603&r1=1700602&r2=1700603&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/ZkController.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/ZkController.java Tue Sep  1 16:13:38 2015
@@ -170,8 +170,6 @@ public final class ZkController {
   private final ZkCmdExecutor cmdExecutor;
   private final ZkStateReader zkStateReader;
 
-  private final LeaderElector leaderElector;
-
   private final String zkServerAddress;          // example: 127.0.0.1:54062/solr
 
   private final int localHostPort;      // example: 54065
@@ -372,6 +370,7 @@ public final class ZkController {
         } catch (Exception e) {
           log.error("Error trying to stop any Overseer threads", e);
         }
+        closeOutstandingElections(registerOnReconnect);
         markAllAsNotLeader(registerOnReconnect);
       }
     }, zkACLProvider);
@@ -383,7 +382,6 @@ public final class ZkController {
     this.overseerCompletedMap = Overseer.getCompletedMap(zkClient);
     this.overseerFailureMap = Overseer.getFailureMap(zkClient);
     cmdExecutor = new ZkCmdExecutor(clientTimeout);
-    leaderElector = new LeaderElector(zkClient);
     zkStateReader = new ZkStateReader(zkClient, new Runnable() {
       @Override
       public void run() {
@@ -480,6 +478,32 @@ public final class ZkController {
       }
     }
   }
+  
+  private void closeOutstandingElections(final CurrentCoreDescriptorProvider registerOnReconnect) {
+    
+    List<CoreDescriptor> descriptors = registerOnReconnect.getCurrentDescriptors();
+    if (descriptors != null) {
+      for (CoreDescriptor descriptor : descriptors) {
+        closeExistingElectionContext(descriptor);
+      }
+    }
+  }
+  
+  private ContextKey closeExistingElectionContext(CoreDescriptor cd) {
+    // look for old context - if we find it, cancel it
+    String collection = cd.getCloudDescriptor().getCollectionName();
+    final String coreNodeName = cd.getCloudDescriptor().getCoreNodeName();
+    
+    ContextKey contextKey = new ContextKey(collection, coreNodeName);
+    ElectionContext prevContext = electionContexts.get(contextKey);
+    
+    if (prevContext != null) {
+      prevContext.close();
+      electionContexts.remove(contextKey);
+    }
+    
+    return contextKey;
+  }
 
   private void markAllAsNotLeader(
       final CurrentCoreDescriptorProvider registerOnReconnect) {
@@ -1068,11 +1092,12 @@ public final class ZkController {
     props.put(ZkStateReader.BASE_URL_PROP, getBaseUrl());
     props.put(ZkStateReader.CORE_NAME_PROP, cd.getName());
     props.put(ZkStateReader.NODE_NAME_PROP, getNodeName());
+    props.put(ZkStateReader.CORE_NODE_NAME_PROP, coreNodeName);
 
 
     ZkNodeProps ourProps = new ZkNodeProps(props);
 
-
+    LeaderElector leaderElector = new LeaderElector(zkClient, contextKey, electionContexts);
     ElectionContext context = new ShardLeaderElectionContext(leaderElector, shardId,
         collection, coreNodeName, ourProps, this, cc);
 
@@ -1876,23 +1901,36 @@ public final class ZkController {
 
   public void rejoinShardLeaderElection(SolrParams params) {
     try {
+      
       String collectionName = params.get(COLLECTION_PROP);
       String shardId = params.get(SHARD_ID_PROP);
-      String nodeName = params.get(NODE_NAME_PROP);
+      String coreNodeName = params.get(CORE_NODE_NAME_PROP);
       String coreName = params.get(CORE_NAME_PROP);
       String electionNode = params.get(ELECTION_NODE_PROP);
       String baseUrl = params.get(BASE_URL_PROP);
 
-      ZkNodeProps zkProps = new ZkNodeProps(CORE_NAME_PROP, coreName, NODE_NAME_PROP, nodeName, COLLECTION_PROP, collectionName,
-          SHARD_ID_PROP, shardId, ELECTION_NODE_PROP, electionNode, BASE_URL_PROP, baseUrl);
-
-      ShardLeaderElectionContext context = new ShardLeaderElectionContext(leaderElector, shardId, collectionName,
-          nodeName, zkProps, this, getCoreContainer());
-      LeaderElector elect = new LeaderElector(this.zkClient);
-      context.leaderSeqPath = context.electionPath + LeaderElector.ELECTION_NODE + "/" + electionNode;
-      elect.setup(context);
-
-      elect.retryElection(context, params.getBool(REJOIN_AT_HEAD_PROP));
+      try (SolrCore core = cc.getCore(coreName)) {
+        MDCLoggingContext.setCore(core);
+        
+        log.info("Rejoin the shard leader election.");
+        
+        ContextKey contextKey = new ContextKey(collectionName, coreNodeName);
+        
+        ElectionContext prevContext = electionContexts.get(contextKey);
+        if (prevContext != null) prevContext.cancelElection();
+        
+        ZkNodeProps zkProps = new ZkNodeProps(BASE_URL_PROP, baseUrl, CORE_NAME_PROP, coreName, NODE_NAME_PROP, getNodeName(), CORE_NODE_NAME_PROP, coreNodeName);
+            
+        LeaderElector elect = ((ShardLeaderElectionContextBase) prevContext).getLeaderElector();
+        ShardLeaderElectionContext context = new ShardLeaderElectionContext(elect, shardId, collectionName,
+            coreNodeName, zkProps, this, getCoreContainer());
+            
+        context.leaderSeqPath = context.electionPath + LeaderElector.ELECTION_NODE + "/" + electionNode;
+        elect.setup(context);
+        electionContexts.put(contextKey, context);
+        
+        elect.retryElection(context, params.getBool(REJOIN_AT_HEAD_PROP));
+      }
     } catch (Exception e) {
       throw new SolrException(ErrorCode.SERVER_ERROR, "Unable to rejoin election", e);
     }

Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/CdcrLeaderStateManager.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/CdcrLeaderStateManager.java?rev=1700603&r1=1700602&r2=1700603&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/CdcrLeaderStateManager.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/CdcrLeaderStateManager.java Tue Sep  1 16:13:38 2015
@@ -98,7 +98,7 @@ class CdcrLeaderStateManager extends Cdc
   private String getZnodePath() {
     String myShardId = core.getCoreDescriptor().getCloudDescriptor().getShardId();
     String myCollection = core.getCoreDescriptor().getCloudDescriptor().getCollectionName();
-    return "/collections/" + myCollection + "/leaders/" + myShardId;
+    return "/collections/" + myCollection + "/leaders/" + myShardId + "/leader";
   }
 
   void setAmILeader(boolean amILeader) {

Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/admin/CoreAdminHandler.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/admin/CoreAdminHandler.java?rev=1700603&r1=1700602&r2=1700603&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/admin/CoreAdminHandler.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/admin/CoreAdminHandler.java Tue Sep  1 16:13:38 2015
@@ -308,7 +308,7 @@ public class CoreAdminHandler extends Re
           if (zkController != null) {
             zkController.rejoinShardLeaderElection(req.getParams());
           } else {
-            log.warn("zkController is null in CoreAdminHandler.handleRequestInternal:REJOINLEADERELCTIONS. No action taken.");
+            log.warn("zkController is null in CoreAdminHandler.handleRequestInternal:REJOINLEADERELECTION. No action taken.");
           }
           break;
         case INVOKE:

Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/admin/RebalanceLeaders.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/admin/RebalanceLeaders.java?rev=1700603&r1=1700602&r2=1700603&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/admin/RebalanceLeaders.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/admin/RebalanceLeaders.java Tue Sep  1 16:13:38 2015
@@ -1,5 +1,18 @@
 package org.apache.solr.handler.admin;
 
+import static org.apache.solr.cloud.Overseer.QUEUE_OPERATION;
+import static org.apache.solr.common.cloud.ZkStateReader.COLLECTION_PROP;
+import static org.apache.solr.common.cloud.ZkStateReader.CORE_NAME_PROP;
+import static org.apache.solr.common.cloud.ZkStateReader.CORE_NODE_NAME_PROP;
+import static org.apache.solr.common.cloud.ZkStateReader.ELECTION_NODE_PROP;
+import static org.apache.solr.common.cloud.ZkStateReader.LEADER_PROP;
+import static org.apache.solr.common.cloud.ZkStateReader.MAX_AT_ONCE_PROP;
+import static org.apache.solr.common.cloud.ZkStateReader.MAX_WAIT_SECONDS_PROP;
+import static org.apache.solr.common.cloud.ZkStateReader.REJOIN_AT_HEAD_PROP;
+import static org.apache.solr.common.cloud.ZkStateReader.SHARD_ID_PROP;
+import static org.apache.solr.common.params.CollectionParams.CollectionAction.REBALANCELEADERS;
+import static org.apache.solr.common.params.CommonAdminParams.ASYNC;
+
 /*
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
@@ -39,21 +52,12 @@ import org.apache.solr.core.CoreContaine
 import org.apache.solr.request.SolrQueryRequest;
 import org.apache.solr.response.SolrQueryResponse;
 import org.apache.zookeeper.KeeperException;
-
-import static org.apache.solr.cloud.Overseer.QUEUE_OPERATION;
-import static org.apache.solr.common.cloud.ZkStateReader.COLLECTION_PROP;
-import static org.apache.solr.common.cloud.ZkStateReader.CORE_NAME_PROP;
-import static org.apache.solr.common.cloud.ZkStateReader.ELECTION_NODE_PROP;
-import static org.apache.solr.common.cloud.ZkStateReader.LEADER_PROP;
-import static org.apache.solr.common.cloud.ZkStateReader.MAX_AT_ONCE_PROP;
-import static org.apache.solr.common.cloud.ZkStateReader.MAX_WAIT_SECONDS_PROP;
-import static org.apache.solr.common.cloud.ZkStateReader.NODE_NAME_PROP;
-import static org.apache.solr.common.cloud.ZkStateReader.REJOIN_AT_HEAD_PROP;
-import static org.apache.solr.common.cloud.ZkStateReader.SHARD_ID_PROP;
-import static org.apache.solr.common.params.CollectionParams.CollectionAction.REBALANCELEADERS;
-import static org.apache.solr.common.params.CommonAdminParams.ASYNC;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 class RebalanceLeaders {
+  private static Logger log = LoggerFactory.getLogger(RebalanceLeaders.class);
+  
   final SolrQueryRequest req;
   final SolrQueryResponse rsp;
   final CollectionsHandler collectionsHandler;
@@ -72,7 +76,7 @@ class RebalanceLeaders {
     String collectionName = req.getParams().get(COLLECTION_PROP);
     if (StringUtils.isBlank(collectionName)) {
       throw new SolrException(SolrException.ErrorCode.BAD_REQUEST,
-          String.format(Locale.ROOT, "The " + COLLECTION_PROP + " is required for the REASSIGNLEADERS command."));
+          String.format(Locale.ROOT, "The " + COLLECTION_PROP + " is required for the Rebalance Leaders command."));
     }
     coreContainer.getZkController().getZkStateReader().updateClusterState();
     ClusterState clusterState = coreContainer.getZkController().getClusterState();
@@ -88,9 +92,9 @@ class RebalanceLeaders {
 
     boolean keepGoing = true;
     for (Slice slice : dc.getSlices()) {
-      insurePreferredIsLeader(results, slice, currentRequests);
+      ensurePreferredIsLeader(results, slice, currentRequests);
       if (currentRequests.size() == max) {
-        CollectionsHandler.log.info("Queued " + max + " leader reassignments, waiting for some to complete.");
+        log.info("Queued " + max + " leader reassignments, waiting for some to complete.");
         keepGoing = waitForLeaderChange(currentRequests, maxWaitSecs, false, results);
         if (keepGoing == false) {
           break; // If we've waited longer than specified, don't continue to wait!
@@ -101,15 +105,15 @@ class RebalanceLeaders {
       keepGoing = waitForLeaderChange(currentRequests, maxWaitSecs, true, results);
     }
     if (keepGoing == true) {
-      CollectionsHandler.log.info("All leader reassignments completed.");
+      log.info("All leader reassignments completed.");
     } else {
-      CollectionsHandler.log.warn("Exceeded specified timeout of ." + maxWaitSecs + "' all leaders may not have been reassigned");
+      log.warn("Exceeded specified timeout of ." + maxWaitSecs + "' all leaders may not have been reassigned");
     }
 
     rsp.getValues().addAll(results);
   }
 
-  private void insurePreferredIsLeader(NamedList<Object> results,
+  private void ensurePreferredIsLeader(NamedList<Object> results,
                                        Slice slice, Map<String, String> currentRequests) throws KeeperException, InterruptedException {
     final String inactivePreferreds = "inactivePreferreds";
     final String alreadyLeaders = "alreadyLeaders";
@@ -164,8 +168,8 @@ class RebalanceLeaders {
           ZkStateReader.getShardLeadersElectPath(collectionName, slice.getName()));
 
       if (electionNodes.size() < 2) { // if there's only one node in the queue, should already be leader and we shouldn't be here anyway.
-        CollectionsHandler.log.warn("Rebalancing leaders and slice " + slice.getName() + " has less than two elements in the leader " +
-            "election queue, but replica " + replica.getName() + " doesn't think it's the leader. Do nothing");
+        log.info("Rebalancing leaders and slice " + slice.getName() + " has less than two elements in the leader " +
+            "election queue, but replica " + replica.getName() + " doesn't think it's the leader.");
         return;
       }
 
@@ -210,10 +214,6 @@ class RebalanceLeaders {
       return; // let's not continue if we didn't get what we expect. Possibly we're offline etc..
     }
 
-    List<String> electionNodesTmp = OverseerTaskProcessor.getSortedElectionNodes(zkStateReader.getZkClient(),
-        ZkStateReader.getShardLeadersElectPath(collectionName, slice.getName()));
-
-
     // Now find other nodes that have the same sequence number as this node and re-queue them at the end of the queue.
     electionNodes = OverseerTaskProcessor.getSortedElectionNodes(zkStateReader.getZkClient(),
         ZkStateReader.getShardLeadersElectPath(collectionName, slice.getName()));
@@ -250,6 +250,7 @@ class RebalanceLeaders {
     }
     return -1;
   }
+  
   private void rejoinElection(String collectionName, Slice slice, String electionNode, String core,
                               boolean rejoinAtHead) throws KeeperException, InterruptedException {
     Replica replica = slice.getReplica(LeaderElector.getNodeName(electionNode));
@@ -258,7 +259,7 @@ class RebalanceLeaders {
     propMap.put(SHARD_ID_PROP, slice.getName());
     propMap.put(QUEUE_OPERATION, REBALANCELEADERS.toLower());
     propMap.put(CORE_NAME_PROP, core);
-    propMap.put(NODE_NAME_PROP, replica.getName());
+    propMap.put(CORE_NODE_NAME_PROP, replica.getName());
     propMap.put(ZkStateReader.BASE_URL_PROP, replica.getProperties().get(ZkStateReader.BASE_URL_PROP));
     propMap.put(REJOIN_AT_HEAD_PROP, Boolean.toString(rejoinAtHead)); // Get ourselves to be first in line.
     propMap.put(ELECTION_NODE_PROP, electionNode);

Modified: lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/TestRebalanceLeaders.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/TestRebalanceLeaders.java?rev=1700603&r1=1700602&r2=1700603&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/TestRebalanceLeaders.java (original)
+++ lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/TestRebalanceLeaders.java Tue Sep  1 16:13:38 2015
@@ -16,6 +16,13 @@ package org.apache.solr.cloud;
  * limitations under the License.
  */
 
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
 import org.apache.solr.client.solrj.SolrRequest;
 import org.apache.solr.client.solrj.SolrServerException;
 import org.apache.solr.client.solrj.impl.CloudSolrClient;
@@ -29,17 +36,12 @@ import org.apache.solr.common.util.Utils
 import org.apache.solr.util.TimeOut;
 import org.apache.zookeeper.KeeperException;
 import org.junit.Test;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.TimeUnit;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 
 public class TestRebalanceLeaders extends AbstractFullDistribZkTestBase {
-
+  private static Logger log = LoggerFactory.getLogger(TestRebalanceLeaders.class);
   public static final String COLLECTION_NAME = "testcollection";
 
   public TestRebalanceLeaders() {
@@ -71,7 +73,6 @@ public class TestRebalanceLeaders extend
     waitForRecoveriesToFinish(COLLECTION_NAME, false);
 
     listCollection();
-
     rebalanceLeaderTest();
   }
 
@@ -117,16 +118,21 @@ public class TestRebalanceLeaders extend
   // 3> The node that ZooKeeper thinks is the leader is the one we think should be the leader.
   void checkConsistency() throws InterruptedException, KeeperException {
     TimeOut timeout = new TimeOut(timeoutMs, TimeUnit.MILLISECONDS);
-
-    while (! timeout.hasTimedOut()) {
-      if (checkAppearOnce() &&
-          checkElectionZero() &&
-          checkZkLeadersAgree()) {
+    boolean checkAppearOnce = false;
+    boolean checkElectionZero = false;
+    boolean checkZkLeadersAgree = false;
+    while (!timeout.hasTimedOut()) {
+      checkAppearOnce = checkAppearOnce();
+      checkElectionZero = checkElectionZero();
+      checkZkLeadersAgree = checkZkLeadersAgree();
+      if (checkAppearOnce && checkElectionZero && checkZkLeadersAgree) {
         return;
       }
       Thread.sleep(1000);
     }
-    fail("Checking the rebalance leader command failed");
+
+    fail("Checking the rebalance leader command failed, checkAppearOnce=" + checkAppearOnce + " checkElectionZero="
+        + checkElectionZero + " checkZkLeadersAgree=" + checkZkLeadersAgree);
   }
 
 
@@ -211,25 +217,26 @@ public class TestRebalanceLeaders extend
 
   // Do who we _think_ should be the leader agree with the leader nodes?
   Boolean checkZkLeadersAgree() throws KeeperException, InterruptedException {
-    for (Map.Entry<String, Replica> ent : expected.entrySet()) {
-
-      String path = "/collections/" + COLLECTION_NAME + "/leaders/" + ent.getKey();
+    for (Map.Entry<String,Replica> ent : expected.entrySet()) {
+      
+      String path = "/collections/" + COLLECTION_NAME + "/leaders/" + ent.getKey() + "/leader";
       byte[] data = getZkData(cloudClient, path);
-      if (data == null) return false;
-
+      if (data == null) {
+        log.warn("path to check not found {}", path);
+        return false;
+      }
+      
       String repCore = null;
       String zkCore = null;
-
-      if (data == null) {
+      
+      Map m = (Map) Utils.fromJSON(data);
+      zkCore = (String) m.get("core");
+      repCore = ent.getValue().getStr("core");
+      if (zkCore.equals(repCore) == false) {
+        log.warn("leader in zk does not match what we expect: {} != {}", zkCore, repCore);
         return false;
-      } else {
-        Map m = (Map) Utils.fromJSON(data);
-        zkCore = (String) m.get("core");
-        repCore = ent.getValue().getStr("core");
-        if (zkCore.equals(repCore) == false) {
-          return false;
-        }
       }
+      
     }
     return true;
   }

Modified: lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java?rev=1700603&r1=1700602&r2=1700603&view=diff
==============================================================================
--- lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java (original)
+++ lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java Tue Sep  1 16:13:38 2015
@@ -643,7 +643,7 @@ public class ZkStateReader implements Cl
   public static String getShardLeadersPath(String collection, String shardId) {
     return COLLECTIONS_ZKNODE + "/" + collection + "/"
         + SHARD_LEADERS_ZKNODE + (shardId != null ? ("/" + shardId)
-        : "");
+        : "") + "/leader";
   }
 
   /**

Modified: lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/common/util/RetryUtil.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/common/util/RetryUtil.java?rev=1700603&r1=1700602&r2=1700603&view=diff
==============================================================================
--- lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/common/util/RetryUtil.java (original)
+++ lucene/dev/trunk/solr/solrj/src/java/org/apache/solr/common/util/RetryUtil.java Tue Sep  1 16:13:38 2015
@@ -1,5 +1,8 @@
 package org.apache.solr.common.util;
 
+import java.util.Collections;
+import java.util.Set;
+
 /*
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
@@ -19,18 +22,34 @@ package org.apache.solr.common.util;
 
 import java.util.concurrent.TimeUnit;
 
+import org.apache.solr.common.SolrException;
+import org.apache.solr.common.SolrException.ErrorCode;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 public class RetryUtil {
+  private static final Logger log = LoggerFactory.getLogger(RetryUtil.class);
+  
   public static interface RetryCmd {
     public void execute() throws Throwable;
   }
   
+  public static interface BooleanRetryCmd {
+    public boolean execute();
+  }
+  
   public static void retryOnThrowable(Class clazz, long timeoutms, long intervalms, RetryCmd cmd) throws Throwable {
+    retryOnThrowable(Collections.singleton(clazz), timeoutms, intervalms, cmd);
+  }
+  
+  public static void retryOnThrowable(Set<Class> classes, long timeoutms, long intervalms, RetryCmd cmd) throws Throwable {
     long timeout = System.nanoTime() + TimeUnit.NANOSECONDS.convert(timeoutms, TimeUnit.MILLISECONDS);
     while (true) {
       try {
         cmd.execute();
       } catch (Throwable t) {
-        if (clazz.isInstance(t) && System.nanoTime() < timeout) {
+        if (isInstanceOf(classes, t) && System.nanoTime() < timeout) {
+          log.info("Retry due to Throwable, " + t.getClass().getName() + " " + t.getMessage());
           Thread.sleep(intervalms);
           continue;
         }
@@ -40,4 +59,29 @@ public class RetryUtil {
       break;
     }
   }
+  
+  private static boolean isInstanceOf(Set<Class> classes, Throwable t) {
+    for (Class c : classes) {
+      if (c.isInstance(t)) {
+        return true;
+      }
+    }
+    return false;
+  }
+  
+  public static void retryOnBoolean(long timeoutms, long intervalms, BooleanRetryCmd cmd) {
+    long timeout = System.nanoTime() + TimeUnit.NANOSECONDS.convert(timeoutms, TimeUnit.MILLISECONDS);
+    while (true) {
+      boolean resp = cmd.execute();
+      if (!resp && System.nanoTime() < timeout) {
+        continue;
+      } else if (System.nanoTime() >= timeout) {
+        throw new SolrException(ErrorCode.SERVER_ERROR, "Timed out while retrying operation");
+      }
+      
+      // success
+      break;
+    }
+  }
+  
 }