You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by sh...@apache.org on 2015/09/09 20:07:46 UTC

svn commit: r1702067 - in /lucene/dev/trunk/solr: ./ core/src/java/org/apache/solr/cloud/ core/src/java/org/apache/solr/update/processor/ core/src/test/org/apache/solr/cloud/

Author: shalin
Date: Wed Sep  9 18:07:45 2015
New Revision: 1702067

URL: http://svn.apache.org/r1702067
Log:
SOLR-7819: ZK connection loss or session timeout do not stall indexing threads anymore and LIR activity is moved to a background thread

Added:
    lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/TestLeaderInitiatedRecoveryThread.java   (with props)
Modified:
    lucene/dev/trunk/solr/CHANGES.txt
    lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java
    lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/LeaderInitiatedRecoveryThread.java
    lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/ZkController.java
    lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java
    lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/HttpPartitionTest.java
    lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/ZkControllerTest.java

Modified: lucene/dev/trunk/solr/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/CHANGES.txt?rev=1702067&r1=1702066&r2=1702067&view=diff
==============================================================================
--- lucene/dev/trunk/solr/CHANGES.txt (original)
+++ lucene/dev/trunk/solr/CHANGES.txt Wed Sep  9 18:07:45 2015
@@ -182,6 +182,10 @@ Bug Fixes
 * SOLR-8001: Fixed bugs in field(foo,min) and field(foo,max) when some docs have no values
   (David Smiley, hossman)
 
+* SOLR-7819: ZK connection loss or session timeout do not stall indexing threads anymore. All activity
+  related to leader initiated recovery is performed by a dedicated LIR thread in the background.
+  (Ramkumar Aiyengar, shalin)
+
 
 Optimizations
 ----------------------

Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java?rev=1702067&r1=1702066&r2=1702067&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java Wed Sep  9 18:07:45 2015
@@ -4,7 +4,6 @@ import java.io.Closeable;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
-import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.hadoop.fs.Path;
@@ -467,20 +466,9 @@ final class ShardLeaderElectionContext e
                 }
               }
               
-              LeaderInitiatedRecoveryThread lirThread = 
-                  new LeaderInitiatedRecoveryThread(zkController,
-                                                    cc,
-                                                    collection,
-                                                    shardId,
-                                                    coreNodeProps,
-                                                    120,
-                                                    coreNodeName);
-              zkController.ensureReplicaInLeaderInitiatedRecovery(
+              zkController.ensureReplicaInLeaderInitiatedRecovery(cc,
                   collection, shardId, coreNodeProps, coreNodeName,
-                  false /* forcePublishState */, true /* retryOnConnLoss */);
-
-              ExecutorService executor = cc.getUpdateShardHandler().getUpdateExecutor();
-              executor.execute(lirThread);
+                  false /* forcePublishState */);
             }              
           }
         }

Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/LeaderInitiatedRecoveryThread.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/LeaderInitiatedRecoveryThread.java?rev=1702067&r1=1702066&r2=1702067&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/LeaderInitiatedRecoveryThread.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/LeaderInitiatedRecoveryThread.java Wed Sep  9 18:07:45 2015
@@ -8,9 +8,12 @@ import org.apache.solr.common.SolrExcept
 import org.apache.solr.common.SolrException.ErrorCode;
 import org.apache.solr.common.cloud.Replica;
 import org.apache.solr.common.cloud.ZkCoreNodeProps;
+import org.apache.solr.common.cloud.ZkNodeProps;
 import org.apache.solr.common.cloud.ZkStateReader;
 import org.apache.solr.common.params.CoreAdminParams.CoreAdminAction;
+import org.apache.solr.common.util.Utils;
 import org.apache.solr.core.CoreContainer;
+import org.apache.zookeeper.KeeperException;
 import org.apache.solr.util.RTimer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -75,24 +78,108 @@ public class LeaderInitiatedRecoveryThre
   
   public void run() {
     RTimer timer = new RTimer();
-    try {
-      sendRecoveryCommandWithRetry();
-    } catch (Exception exc) {
-      log.error(getName()+" failed due to: "+exc, exc);
-      if (exc instanceof SolrException) {
-        throw (SolrException)exc;
-      } else {
-        throw new SolrException(ErrorCode.SERVER_ERROR, exc);
+
+    String replicaCoreName = nodeProps.getCoreName();
+    String replicaCoreNodeName = ((Replica) nodeProps.getNodeProps()).getName();
+    String replicaNodeName = nodeProps.getNodeName();
+    final String replicaUrl = nodeProps.getCoreUrl();
+
+    if (!zkController.isReplicaInRecoveryHandling(replicaUrl)) {
+      throw new SolrException(ErrorCode.INVALID_STATE, "Replica: " + replicaUrl + " should have been marked under leader initiated recovery in ZkController but wasn't.");
+    }
+
+    boolean sendRecoveryCommand = publishDownState(replicaCoreName, replicaCoreNodeName, replicaNodeName, replicaUrl, false);
+
+    if (sendRecoveryCommand)  {
+      try {
+        sendRecoveryCommandWithRetry();
+      } catch (Exception exc) {
+        log.error(getName()+" failed due to: "+exc, exc);
+        if (exc instanceof SolrException) {
+          throw (SolrException)exc;
+        } else {
+          throw new SolrException(ErrorCode.SERVER_ERROR, exc);
+        }
+      } finally {
+        zkController.removeReplicaFromLeaderInitiatedRecoveryHandling(replicaUrl);
       }
+    } else  {
+      // replica is no longer in recovery on this node (may be handled on another node)
+      zkController.removeReplicaFromLeaderInitiatedRecoveryHandling(replicaUrl);
     }
     log.info("{} completed successfully after running for {}ms", getName(), timer.getTime());
   }
-  
+
+  public boolean publishDownState(String replicaCoreName, String replicaCoreNodeName, String replicaNodeName, String replicaUrl, boolean forcePublishState) {
+    boolean sendRecoveryCommand = true;
+    boolean publishDownState = false;
+
+    if (zkController.getZkStateReader().getClusterState().liveNodesContain(replicaNodeName)) {
+      try {
+        // create a znode that requires the replica needs to "ack" to verify it knows it was out-of-sync
+        updateLIRState(replicaCoreNodeName);
+
+        log.info("Put replica core={} coreNodeName={} on " +
+            replicaNodeName + " into leader-initiated recovery.", replicaCoreName, replicaCoreNodeName);
+        publishDownState = true;
+      } catch (Exception e) {
+        Throwable setLirZnodeFailedCause = SolrException.getRootCause(e);
+        log.error("Leader failed to set replica " +
+            nodeProps.getCoreUrl() + " state to DOWN due to: " + setLirZnodeFailedCause, setLirZnodeFailedCause);
+        if (setLirZnodeFailedCause instanceof KeeperException.SessionExpiredException
+            || setLirZnodeFailedCause instanceof KeeperException.ConnectionLossException
+            || setLirZnodeFailedCause instanceof ZkController.NotLeaderException) {
+          // our session is expired, which means our state is suspect, so don't go
+          // putting other replicas in recovery (see SOLR-6511)
+          sendRecoveryCommand = false;
+          forcePublishState = false; // no need to force publish any state in this case
+        } // else will go ahead and try to send the recovery command once after this error
+      }
+    } else  {
+      log.info("Node " + replicaNodeName +
+              " is not live, so skipping leader-initiated recovery for replica: core={} coreNodeName={}",
+          replicaCoreName, replicaCoreNodeName);
+      // publishDownState will be false to avoid publishing the "down" state too many times
+      // as many errors can occur together and will each call into this method (SOLR-6189)
+      forcePublishState = false; // no need to force publish the state because replica is not live
+      sendRecoveryCommand = false; // no need to send recovery messages as well
+    }
+
+    try {
+      if (publishDownState || forcePublishState) {
+        ZkNodeProps m = new ZkNodeProps(Overseer.QUEUE_OPERATION, "state",
+            ZkStateReader.STATE_PROP, Replica.State.DOWN.toString(),
+            ZkStateReader.BASE_URL_PROP, nodeProps.getBaseUrl(),
+            ZkStateReader.CORE_NAME_PROP, nodeProps.getCoreName(),
+            ZkStateReader.NODE_NAME_PROP, nodeProps.getNodeName(),
+            ZkStateReader.SHARD_ID_PROP, shardId,
+            ZkStateReader.COLLECTION_PROP, collection);
+        log.warn("Leader is publishing core={} coreNodeName ={} state={} on behalf of un-reachable replica {}",
+            replicaCoreName, replicaCoreNodeName, Replica.State.DOWN.toString(), replicaUrl);
+        zkController.getOverseerJobQueue().offer(Utils.toJSON(m));
+      }
+    } catch (Exception e) {
+      log.error("Could not publish 'down' state for replicaUrl: {}", replicaUrl, e);
+    }
+
+    return sendRecoveryCommand;
+  }
+
+  /*
+  protected scope for testing purposes
+   */
+  protected void updateLIRState(String replicaCoreNodeName) {
+    zkController.updateLeaderInitiatedRecoveryState(collection,
+        shardId,
+        replicaCoreNodeName, Replica.State.DOWN, leaderCoreNodeName, true);
+  }
+
   protected void sendRecoveryCommandWithRetry() throws Exception {    
     int tries = 0;
     long waitBetweenTriesMs = 5000L;
     boolean continueTrying = true;
-        
+
+    String replicaCoreName = nodeProps.getCoreName();
     String recoveryUrl = nodeProps.getBaseUrl();
     String replicaNodeName = nodeProps.getNodeName();
     String coreNeedingRecovery = nodeProps.getCoreName();
@@ -224,11 +311,8 @@ public class LeaderInitiatedRecoveryThre
                         // OK, so the replica thinks it is active, but it never ack'd the leader initiated recovery
                         // so its state cannot be trusted and it needs to be told to recover again ... and we keep looping here
                         log.warn("Replica core={} coreNodeName={} set to active but the leader thinks it should be in recovery;"
-                            + " forcing it back to down state to re-run the leader-initiated recovery process; props: "+replicaProps.get(0), coreNeedingRecovery, replicaCoreNodeName);
-                        zkController.ensureReplicaInLeaderInitiatedRecovery(
-                            collection, shardId, nodeProps, leaderCoreNodeName,
-                            true /* forcePublishState */, true /* retryOnConnLoss */
-                        );
+                            + " forcing it back to down state to re-run the leader-initiated recovery process; props: " + replicaProps.get(0), coreNeedingRecovery, replicaCoreNodeName);
+                        publishDownState(replicaCoreName, replicaCoreNodeName, replicaNodeName, replicaUrl, true);
                       }
                     }
                     break;

Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/ZkController.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/ZkController.java?rev=1702067&r1=1702066&r2=1702067&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/ZkController.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/ZkController.java Wed Sep  9 18:07:45 2015
@@ -17,8 +17,6 @@ package org.apache.solr.cloud;
  * limitations under the License.
  */
 
-import static org.apache.solr.common.cloud.ZkStateReader.*;
-
 import java.io.IOException;
 import java.io.UnsupportedEncodingException;
 import java.net.InetAddress;
@@ -46,6 +44,7 @@ import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
+import com.google.common.base.Strings;
 import org.apache.commons.lang.StringUtils;
 import org.apache.solr.client.solrj.impl.HttpSolrClient;
 import org.apache.solr.client.solrj.request.CoreAdminRequest.WaitForState;
@@ -98,8 +97,16 @@ import org.apache.zookeeper.Watcher;
 import org.apache.zookeeper.data.Stat;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.slf4j.MDC;
 
-import com.google.common.base.Strings;
+import static org.apache.solr.common.cloud.ZkStateReader.BASE_URL_PROP;
+import static org.apache.solr.common.cloud.ZkStateReader.COLLECTION_PROP;
+import static org.apache.solr.common.cloud.ZkStateReader.CORE_NAME_PROP;
+import static org.apache.solr.common.cloud.ZkStateReader.ELECTION_NODE_PROP;
+import static org.apache.solr.common.cloud.ZkStateReader.NODE_NAME_PROP;
+import static org.apache.solr.common.cloud.ZkStateReader.REJOIN_AT_HEAD_PROP;
+import static org.apache.solr.common.cloud.ZkStateReader.SHARD_ID_PROP;
+import static org.apache.solr.common.cloud.ZkStateReader.CORE_NODE_NAME_PROP;
 
 /**
  * Handle ZooKeeper interactions.
@@ -1198,7 +1205,7 @@ public final class ZkController {
           if (state == Replica.State.ACTIVE) {
             // trying to become active, so leader-initiated state must be recovering
             if (lirState == Replica.State.RECOVERING) {
-              updateLeaderInitiatedRecoveryState(collection, shardId, coreNodeName, Replica.State.ACTIVE, null);
+              updateLeaderInitiatedRecoveryState(collection, shardId, coreNodeName, Replica.State.ACTIVE, null, true);
             } else if (lirState == Replica.State.DOWN) {
               throw new SolrException(ErrorCode.INVALID_STATE,
                   "Cannot publish state of core '" + cd.getName() + "' as active without recovering first!");
@@ -1206,7 +1213,7 @@ public final class ZkController {
           } else if (state == Replica.State.RECOVERING) {
             // if it is currently DOWN, then trying to enter into recovering state is good
             if (lirState == Replica.State.DOWN) {
-              updateLeaderInitiatedRecoveryState(collection, shardId, coreNodeName, Replica.State.RECOVERING, null);
+              updateLeaderInitiatedRecoveryState(collection, shardId, coreNodeName, Replica.State.RECOVERING, null, true);
             }
           }
         }
@@ -1972,8 +1979,9 @@ public final class ZkController {
    * to it.
    */
   public boolean ensureReplicaInLeaderInitiatedRecovery(
+      final CoreContainer container,
       final String collection, final String shardId, final ZkCoreNodeProps replicaCoreProps,
-      String leaderCoreNodeName, boolean forcePublishState, boolean retryOnConnLoss)
+      String leaderCoreNodeName, boolean forcePublishState)
       throws KeeperException, InterruptedException {
     final String replicaUrl = replicaCoreProps.getCoreUrl();
 
@@ -1991,7 +1999,6 @@ public final class ZkController {
     // about the same replica having trouble and we only need to send the "needs"
     // recovery signal once
     boolean nodeIsLive = true;
-    boolean publishDownState = false;
     String replicaNodeName = replicaCoreProps.getNodeName();
     String replicaCoreNodeName = ((Replica) replicaCoreProps.getNodeProps()).getName();
     assert replicaCoreNodeName != null : "No core name for replica " + replicaNodeName;
@@ -2003,16 +2010,30 @@ public final class ZkController {
         }
       }
 
-      // if the replica's state is not DOWN right now, make it so ...
-      // we only really need to try to send the recovery command if the node itself is "live"
+      // we only really need to try to start the LIR process if the node itself is "live"
       if (getZkStateReader().getClusterState().liveNodesContain(replicaNodeName)) {
+
+        LeaderInitiatedRecoveryThread lirThread =
+            new LeaderInitiatedRecoveryThread(this,
+                container,
+                collection,
+                shardId,
+                replicaCoreProps,
+                120,
+                leaderCoreNodeName); // core node name of current leader
+        ExecutorService executor = container.getUpdateShardHandler().getUpdateExecutor();
+        try {
+          MDC.put("DistributedUpdateProcessor.replicaUrlToRecover", replicaCoreProps.getCoreUrl());
+          executor.execute(lirThread);
+        } finally {
+          MDC.remove("DistributedUpdateProcessor.replicaUrlToRecover");
+        }
+
         // create a znode that requires the replica needs to "ack" to verify it knows it was out-of-sync
-        updateLeaderInitiatedRecoveryState(collection, shardId, replicaCoreNodeName, Replica.State.DOWN, leaderCoreNodeName);
         replicasInLeaderInitiatedRecovery.put(replicaUrl,
             getLeaderInitiatedRecoveryZnodePath(collection, shardId, replicaCoreNodeName));
         log.info("Put replica core={} coreNodeName={} on " +
             replicaNodeName + " into leader-initiated recovery.", replicaCoreProps.getCoreName(), replicaCoreNodeName);
-        publishDownState = true;
       } else {
         nodeIsLive = false; // we really don't need to send the recovery request if the node is NOT live
         log.info("Node " + replicaNodeName +
@@ -2023,20 +2044,6 @@ public final class ZkController {
       }
     }
 
-    if (publishDownState || forcePublishState) {
-      String replicaCoreName = replicaCoreProps.getCoreName();
-      ZkNodeProps m = new ZkNodeProps(Overseer.QUEUE_OPERATION, "state",
-          ZkStateReader.STATE_PROP, Replica.State.DOWN.toString(),
-          ZkStateReader.BASE_URL_PROP, replicaCoreProps.getBaseUrl(),
-          ZkStateReader.CORE_NAME_PROP, replicaCoreProps.getCoreName(),
-          ZkStateReader.NODE_NAME_PROP, replicaCoreProps.getNodeName(),
-          ZkStateReader.SHARD_ID_PROP, shardId,
-          ZkStateReader.COLLECTION_PROP, collection);
-      log.warn("Leader is publishing core={} coreNodeName ={} state={} on behalf of un-reachable replica {}; forcePublishState? " + forcePublishState,
-          replicaCoreName, replicaCoreNodeName, Replica.State.DOWN.toString(), replicaUrl);
-      overseerJobQueue.offer(Utils.toJSON(m));
-    }
-
     return nodeIsLive;
   }
 
@@ -2107,8 +2114,8 @@ public final class ZkController {
     return stateObj;
   }
 
-  private void updateLeaderInitiatedRecoveryState(String collection, String shardId, String coreNodeName, 
-      Replica.State state, String leaderCoreNodeName) {
+  public void updateLeaderInitiatedRecoveryState(String collection, String shardId, String coreNodeName,
+      Replica.State state, String leaderCoreNodeName, boolean retryOnConnLoss) {
     if (collection == null || shardId == null || coreNodeName == null) {
       log.warn("Cannot set leader-initiated recovery state znode to "
           + state.toString() + " using: collection=" + collection
@@ -2121,7 +2128,7 @@ public final class ZkController {
     if (state == Replica.State.ACTIVE) {
       // since we're marking it active, we don't need this znode anymore, so delete instead of update
       try {
-        zkClient.delete(znodePath, -1, false);
+        zkClient.delete(znodePath, -1, retryOnConnLoss);
       } catch (Exception justLogIt) {
         log.warn("Failed to delete znode " + znodePath, justLogIt);
       }
@@ -2134,24 +2141,30 @@ public final class ZkController {
     } catch (Exception exc) {
       log.warn(exc.getMessage(), exc);
     }
-    if (stateObj == null)
+    if (stateObj == null) {
       stateObj = Utils.makeMap();
+    }
 
     stateObj.put(ZkStateReader.STATE_PROP, state.toString());
     // only update the createdBy value if it's not set
-    if (stateObj.get("createdByNodeName") == null)
-      stateObj.put("createdByNodeName", String.valueOf(this.nodeName));
+    if (stateObj.get("createdByNodeName") == null) {
+      stateObj.put("createdByNodeName", this.nodeName);
+    }
+    if (stateObj.get("createdByCoreNodeName") == null && leaderCoreNodeName != null)  {
+      stateObj.put("createdByCoreNodeName", leaderCoreNodeName);
+    }
 
     byte[] znodeData = Utils.toJSON(stateObj);
 
     try {
       if (state == Replica.State.DOWN) {
-        markShardAsDownIfLeader(collection, shardId, leaderCoreNodeName, znodePath, znodeData);
+        markShardAsDownIfLeader(collection, shardId, leaderCoreNodeName, znodePath, znodeData, retryOnConnLoss);
       } else {
+        // must retry on conn loss otherwise future election attempts may assume wrong LIR state
         if (zkClient.exists(znodePath, true)) {
-          zkClient.setData(znodePath, znodeData, true);
+          zkClient.setData(znodePath, znodeData, retryOnConnLoss);
         } else {
-          zkClient.makePath(znodePath, znodeData, true);
+          zkClient.makePath(znodePath, znodeData, retryOnConnLoss);
         }
       }
       log.info("Wrote {} to {}", state.toString(), znodePath);
@@ -2172,22 +2185,23 @@ public final class ZkController {
    * doesn't let us mark a node as down *after* we've already lost our session
    */
   private void markShardAsDownIfLeader(String collection, String shardId, String leaderCoreNodeName,
-                                       String znodePath, byte[] znodeData) throws KeeperException, InterruptedException {
+                                       String znodePath, byte[] znodeData,
+                                       boolean retryOnConnLoss) throws KeeperException, InterruptedException {
     String leaderSeqPath = getLeaderSeqPath(collection, leaderCoreNodeName);
     if (leaderSeqPath == null) {
-      throw new SolrException(ErrorCode.SERVER_ERROR,
+      throw new NotLeaderException(ErrorCode.SERVER_ERROR,
           "Failed to update data to 'down' for znode: " + znodePath +
               " because the zookeeper leader sequence for leader: " + leaderCoreNodeName + " is null");
     }
-    if (zkClient.exists(znodePath, true)) {
+    if (zkClient.exists(znodePath, retryOnConnLoss)) {
       List<Op> ops = new ArrayList<>(2);
       ops.add(Op.check(leaderSeqPath, -1)); // version doesn't matter, the seq path is unique
       ops.add(Op.setData(znodePath, znodeData, -1));
-      zkClient.multi(ops, true);
+      zkClient.multi(ops, retryOnConnLoss);
     } else {
       String parentZNodePath = getLeaderInitiatedRecoveryZnodePath(collection, shardId);
       try {
-        zkClient.makePath(parentZNodePath, true);
+        zkClient.makePath(parentZNodePath, retryOnConnLoss);
       } catch (KeeperException.NodeExistsException nee) {
         // if it exists, that's great!
       }
@@ -2195,7 +2209,7 @@ public final class ZkController {
       ops.add(Op.check(leaderSeqPath, -1)); // version doesn't matter, the seq path is unique
       ops.add(Op.create(znodePath, znodeData, zkClient.getZkACLProvider().getACLsToAdd(znodePath),
           CreateMode.PERSISTENT));
-      zkClient.multi(ops, true);
+      zkClient.multi(ops, retryOnConnLoss);
     }
   }
 
@@ -2473,4 +2487,13 @@ public final class ZkController {
     ElectionContext context = electionContexts.get(key);
     return context != null ? context.leaderSeqPath : null;
   }
+
+  /**
+   * Thrown during leader initiated recovery process if current node is not leader
+   */
+  public static class NotLeaderException extends SolrException  {
+    public NotLeaderException(ErrorCode code, String msg) {
+      super(code, msg);
+    }
+  }
 }

Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java?rev=1702067&r1=1702066&r2=1702067&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java Wed Sep  9 18:07:45 2015
@@ -17,6 +17,21 @@ package org.apache.solr.update.processor
  * limitations under the License.
  */
 
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.ReentrantLock;
+
 import org.apache.lucene.util.BytesRef;
 import org.apache.lucene.util.CharsRefBuilder;
 import org.apache.solr.client.solrj.request.UpdateRequest;
@@ -49,7 +64,6 @@ import org.apache.solr.common.params.Upd
 import org.apache.solr.common.util.Hash;
 import org.apache.solr.common.util.NamedList;
 import org.apache.solr.common.util.Utils;
-import org.apache.solr.core.CoreContainer;
 import org.apache.solr.core.CoreDescriptor;
 import org.apache.solr.handler.component.RealTimeGetComponent;
 import org.apache.solr.request.SolrQueryRequest;
@@ -72,23 +86,6 @@ import org.apache.solr.update.VersionInf
 import org.apache.zookeeper.KeeperException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.slf4j.MDC;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Set;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.locks.ReentrantLock;
 
 import static org.apache.solr.update.processor.DistributingUpdateProcessorFactory.DISTRIB_UPDATE_PARAM;
 
@@ -836,8 +833,6 @@ public class DistributedUpdateProcessor
         break;
       }
 
-      int maxTries = 1;       
-      boolean sendRecoveryCommand = true;
       String collection = null;
       String shardId = null;
 
@@ -878,33 +873,24 @@ public class DistributedUpdateProcessor
         if (cloudDesc.getCoreNodeName().equals(leaderCoreNodeName) && foundErrorNodeInReplicaList) {
           try {
             // if false, then the node is probably not "live" anymore
-            sendRecoveryCommand =
-                zkController.ensureReplicaInLeaderInitiatedRecovery(collection,
-                    shardId,
-                    stdNode.getNodeProps(),
-                    leaderCoreNodeName,
-                    false /* forcePublishState */,
-                    false /* retryOnConnLoss */
-                );
-
-            // we want to try more than once, ~10 minutes
-            if (sendRecoveryCommand) {
-              maxTries = 120;
-            } // else the node is no longer "live" so no need to send any recovery command
+            // and we do not need to send a recovery message
+            Throwable rootCause = SolrException.getRootCause(error.e);
+            log.error("Setting up to try to start recovery on replica {}", replicaUrl, rootCause);
+            zkController.ensureReplicaInLeaderInitiatedRecovery(
+                req.getCore().getCoreDescriptor().getCoreContainer(),
+                collection,
+                shardId,
+                stdNode.getNodeProps(),
+                leaderCoreNodeName,
+                false /* forcePublishState */
+            );
           } catch (Exception exc) {
             Throwable setLirZnodeFailedCause = SolrException.getRootCause(exc);
             log.error("Leader failed to set replica " +
                 error.req.node.getUrl() + " state to DOWN due to: " + setLirZnodeFailedCause, setLirZnodeFailedCause);
-            if (setLirZnodeFailedCause instanceof KeeperException.SessionExpiredException ||
-                setLirZnodeFailedCause instanceof KeeperException.ConnectionLossException) {
-              // our session is expired, which means our state is suspect, so don't go
-              // putting other replicas in recovery (see SOLR-6511)
-              sendRecoveryCommand = false;
-            } // else will go ahead and try to send the recovery command once after this error
           }
         } else {
           // not the leader anymore maybe or the error'd node is not my replica?
-          sendRecoveryCommand = false;
           if (!foundErrorNodeInReplicaList) {
             log.warn("Core "+cloudDesc.getCoreNodeName()+" belonging to "+collection+" "+
                 shardId+", does not have error'd node " + stdNode.getNodeProps().getCoreUrl() + " as a replica. " +
@@ -914,30 +900,6 @@ public class DistributedUpdateProcessor
                 shardId+", no request recovery command will be sent!");
           }
         }
-      } // else not a StdNode, recovery command still gets sent once
-            
-      if (!sendRecoveryCommand)
-        continue; // the replica is already in recovery handling or is not live   
-
-      Throwable rootCause = SolrException.getRootCause(error.e);
-      log.error("Setting up to try to start recovery on replica {}", replicaUrl, rootCause);
-
-      // try to send the recovery command to the downed replica in a background thread
-      CoreContainer coreContainer = req.getCore().getCoreDescriptor().getCoreContainer();
-      LeaderInitiatedRecoveryThread lirThread =
-          new LeaderInitiatedRecoveryThread(zkController,
-              coreContainer,
-              collection,
-              shardId,
-              error.req.node.getNodeProps(),
-              maxTries,
-              cloudDesc.getCoreNodeName()); // core node name of current leader
-      ExecutorService executor = coreContainer.getUpdateShardHandler().getUpdateExecutor();
-      try {
-        MDC.put("DistributedUpdateProcessor.replicaUrlToRecover", error.req.node.getNodeProps().getCoreUrl());
-        executor.execute(lirThread);
-      } finally {
-        MDC.remove("DistributedUpdateProcessor.replicaUrlToRecover");
       }
     }
 

Modified: lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/HttpPartitionTest.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/HttpPartitionTest.java?rev=1702067&r1=1702066&r2=1702067&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/HttpPartitionTest.java (original)
+++ lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/HttpPartitionTest.java Wed Sep  9 18:07:45 2015
@@ -140,14 +140,10 @@ public class HttpPartitionTest extends A
     ZkCoreNodeProps replicaCoreNodeProps = new ZkCoreNodeProps(notLeader);
     String replicaUrl = replicaCoreNodeProps.getCoreUrl();
 
-    assertTrue(!zkController.isReplicaInRecoveryHandling(replicaUrl));
-    assertTrue(zkController.ensureReplicaInLeaderInitiatedRecovery(testCollectionName, shardId, replicaCoreNodeProps, leader.getName(), false, true));
-    assertTrue(zkController.isReplicaInRecoveryHandling(replicaUrl));
+    zkController.updateLeaderInitiatedRecoveryState(testCollectionName, shardId, notLeader.getName(), Replica.State.DOWN, leader.getName(), true);
     Map<String,Object> lirStateMap = zkController.getLeaderInitiatedRecoveryStateObject(testCollectionName, shardId, notLeader.getName());
     assertNotNull(lirStateMap);
     assertSame(Replica.State.DOWN, Replica.State.getState((String) lirStateMap.get(ZkStateReader.STATE_PROP)));
-    zkController.removeReplicaFromLeaderInitiatedRecoveryHandling(replicaUrl);
-    assertTrue(!zkController.isReplicaInRecoveryHandling(replicaUrl));
 
     // test old non-json format handling
     SolrZkClient zkClient = zkController.getZkClient();

Added: lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/TestLeaderInitiatedRecoveryThread.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/TestLeaderInitiatedRecoveryThread.java?rev=1702067&view=auto
==============================================================================
--- lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/TestLeaderInitiatedRecoveryThread.java (added)
+++ lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/TestLeaderInitiatedRecoveryThread.java Wed Sep  9 18:07:45 2015
@@ -0,0 +1,224 @@
+package org.apache.solr.cloud;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.util.concurrent.TimeUnit;
+
+import org.apache.solr.SolrTestCaseJ4;
+import org.apache.solr.common.SolrException;
+import org.apache.solr.common.cloud.Replica;
+import org.apache.solr.common.cloud.ZkCoreNodeProps;
+import org.apache.solr.servlet.SolrDispatchFilter;
+import org.apache.solr.util.TimeOut;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.data.Stat;
+
+/**
+ * Test for {@link LeaderInitiatedRecoveryThread}
+ */
+@SolrTestCaseJ4.SuppressSSL
+public class TestLeaderInitiatedRecoveryThread extends AbstractFullDistribZkTestBase {
+
+  public TestLeaderInitiatedRecoveryThread() {
+    sliceCount = 1;
+    fixShardCount(2);
+  }
+
+  public void testPublishDownState() throws Exception {
+    waitForRecoveriesToFinish(true);
+
+    final String leaderCoreNodeName = shardToLeaderJetty.get(SHARD1).coreNodeName;
+    final CloudJettyRunner leaderRunner = shardToLeaderJetty.get(SHARD1);
+    SolrDispatchFilter filter = (SolrDispatchFilter) leaderRunner.jetty.getDispatchFilter().getFilter();
+    ZkController zkController = filter.getCores().getZkController();
+
+    CloudJettyRunner notLeader = null;
+    for (CloudJettyRunner cloudJettyRunner : shardToJetty.get(SHARD1)) {
+      if (cloudJettyRunner != leaderRunner) {
+        notLeader = cloudJettyRunner;
+        break;
+      }
+    }
+    assertNotNull(notLeader);
+    Replica replica = cloudClient.getZkStateReader().getClusterState().getReplica(DEFAULT_COLLECTION, notLeader.coreNodeName);
+    ZkCoreNodeProps replicaCoreNodeProps = new ZkCoreNodeProps(replica);
+
+    /*
+     1. Test that publishDownState throws exception when zkController.isReplicaInRecoveryHandling == false
+      */
+    try {
+      LeaderInitiatedRecoveryThread thread = new LeaderInitiatedRecoveryThread(zkController, filter.getCores(),
+          DEFAULT_COLLECTION, SHARD1, replicaCoreNodeProps, 1, leaderCoreNodeName);
+      assertFalse(zkController.isReplicaInRecoveryHandling(replicaCoreNodeProps.getCoreUrl()));
+      thread.run();
+      fail("publishDownState should not have succeeded because replica url is not marked in leader initiated recovery in ZkController");
+    } catch (SolrException e) {
+      assertTrue(e.code() == SolrException.ErrorCode.INVALID_STATE.code);
+    }
+
+
+    /*
+     2. Test that a non-live replica cannot be put into LIR or down state
+      */
+    LeaderInitiatedRecoveryThread thread = new LeaderInitiatedRecoveryThread(zkController, filter.getCores(),
+        DEFAULT_COLLECTION, SHARD1, replicaCoreNodeProps, 1, leaderCoreNodeName);
+    // kill the replica
+    int children = cloudClient.getZkStateReader().getZkClient().getChildren("/live_nodes", null, true).size();
+    ChaosMonkey.stop(notLeader.jetty);
+    TimeOut timeOut = new TimeOut(60, TimeUnit.SECONDS);
+    while (!timeOut.hasTimedOut()) {
+      if (children > cloudClient.getZkStateReader().getZkClient().getChildren("/live_nodes", null, true).size()) {
+        break;
+      }
+      Thread.sleep(500);
+    }
+    assertTrue(children > cloudClient.getZkStateReader().getZkClient().getChildren("/live_nodes", null, true).size());
+
+    int cversion = getOverseerCversion();
+    // Thread should not publish LIR and down state for node which is not live, regardless of whether forcePublish is true or false
+    assertFalse(thread.publishDownState(replicaCoreNodeProps.getCoreName(), replica.getName(), replica.getNodeName(), replicaCoreNodeProps.getCoreUrl(), false));
+    // lets assert that we did not publish anything to overseer queue, simplest way is to assert that cversion of overseer queue zk node is still the same
+    assertEquals(cversion, getOverseerCversion());
+
+    assertFalse(thread.publishDownState(replicaCoreNodeProps.getCoreName(), replica.getName(), replica.getNodeName(), replicaCoreNodeProps.getCoreUrl(), true));
+    // lets assert that we did not publish anything to overseer queue
+    assertEquals(cversion, getOverseerCversion());
+
+
+    /*
+    3. Test that if ZK connection loss then thread should not attempt to publish down state even if forcePublish=true
+     */
+    ChaosMonkey.start(notLeader.jetty);
+    waitForRecoveriesToFinish(true);
+
+    thread = new LeaderInitiatedRecoveryThread(zkController, filter.getCores(),
+        DEFAULT_COLLECTION, SHARD1, replicaCoreNodeProps, 1, leaderCoreNodeName) {
+      @Override
+      protected void updateLIRState(String replicaCoreNodeName) {
+        throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "", new KeeperException.ConnectionLossException());
+      }
+    };
+    assertFalse(thread.publishDownState(replicaCoreNodeProps.getCoreName(), replica.getName(), replica.getNodeName(), replicaCoreNodeProps.getCoreUrl(), false));
+    assertFalse(thread.publishDownState(replicaCoreNodeProps.getCoreName(), replica.getName(), replica.getNodeName(), replicaCoreNodeProps.getCoreUrl(), true));
+    assertNull(zkController.getLeaderInitiatedRecoveryState(DEFAULT_COLLECTION, SHARD1, replica.getName()));
+
+
+    /*
+     4. Test that if ZK connection loss or session expired then thread should not attempt to publish down state even if forcePublish=true
+      */
+    thread = new LeaderInitiatedRecoveryThread(zkController, filter.getCores(),
+        DEFAULT_COLLECTION, SHARD1, replicaCoreNodeProps, 1, leaderCoreNodeName) {
+      @Override
+      protected void updateLIRState(String replicaCoreNodeName) {
+        throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "", new KeeperException.SessionExpiredException());
+      }
+    };
+    assertFalse(thread.publishDownState(replicaCoreNodeProps.getCoreName(), replica.getName(), replica.getNodeName(), replicaCoreNodeProps.getCoreUrl(), false));
+    assertFalse(thread.publishDownState(replicaCoreNodeProps.getCoreName(), replica.getName(), replica.getNodeName(), replicaCoreNodeProps.getCoreUrl(), true));
+    assertNull(zkController.getLeaderInitiatedRecoveryState(DEFAULT_COLLECTION, SHARD1, replica.getName()));
+
+
+    /*
+     5. Test that any exception other then ZK connection loss or session expired should publish down state only if forcePublish=true
+      */
+    thread = new LeaderInitiatedRecoveryThread(zkController, filter.getCores(),
+        DEFAULT_COLLECTION, SHARD1, replicaCoreNodeProps, 1, leaderCoreNodeName) {
+      @Override
+      protected void updateLIRState(String replicaCoreNodeName) {
+        throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "bogus exception");
+      }
+    };
+    // the following should return true because regardless of the bogus exception in setting LIR state, we still want recovery commands to be sent,
+    // however the following will not publish a down state
+    cversion = getOverseerCversion();
+    assertTrue(thread.publishDownState(replicaCoreNodeProps.getCoreName(), replica.getName(), replica.getNodeName(), replicaCoreNodeProps.getCoreUrl(), false));
+
+    // lets assert that we did not publish anything to overseer queue, simplest way is to assert that cversion of overseer queue zk node is still the same
+    assertEquals(cversion, getOverseerCversion());
+
+    assertTrue(thread.publishDownState(replicaCoreNodeProps.getCoreName(), replica.getName(), replica.getNodeName(), replicaCoreNodeProps.getCoreUrl(), true));
+    // this should have published a down state so assert that cversion has incremented
+    assertTrue(getOverseerCversion() > cversion);
+
+    timeOut = new TimeOut(30, TimeUnit.SECONDS);
+    while (!timeOut.hasTimedOut()) {
+      cloudClient.getZkStateReader().updateClusterState();
+      Replica r = cloudClient.getZkStateReader().getClusterState().getReplica(DEFAULT_COLLECTION, replica.getName());
+      if (r.getState() == Replica.State.DOWN) {
+        break;
+      }
+      Thread.sleep(500);
+    }
+
+    assertNull(zkController.getLeaderInitiatedRecoveryState(DEFAULT_COLLECTION, SHARD1, replica.getName()));
+    assertEquals(Replica.State.DOWN, cloudClient.getZkStateReader().getClusterState().getReplica(DEFAULT_COLLECTION, replica.getName()).getState());
+
+    /*
+    6. Test that non-leader cannot set LIR nodes
+     */
+    filter = (SolrDispatchFilter) notLeader.jetty.getDispatchFilter().getFilter();
+    zkController = filter.getCores().getZkController();
+
+    thread = new LeaderInitiatedRecoveryThread(zkController, filter.getCores(),
+        DEFAULT_COLLECTION, SHARD1, replicaCoreNodeProps, 1, leaderCoreNodeName) {
+      @Override
+      protected void updateLIRState(String replicaCoreNodeName) {
+        try {
+          super.updateLIRState(replicaCoreNodeName);
+        } catch (Exception e) {
+          assertTrue(e instanceof ZkController.NotLeaderException);
+          throw e;
+        }
+      }
+    };
+    cversion = getOverseerCversion();
+    assertFalse(thread.publishDownState(replicaCoreNodeProps.getCoreName(), replica.getName(), replica.getNodeName(), replicaCoreNodeProps.getCoreUrl(), false));
+    assertEquals(cversion, getOverseerCversion());
+
+    /*
+     7. assert that we can write a LIR state if everything else is fine
+      */
+    // reset the zkcontroller to the one from the leader
+    filter = (SolrDispatchFilter) leaderRunner.jetty.getDispatchFilter().getFilter();
+    zkController = filter.getCores().getZkController();
+    thread = new LeaderInitiatedRecoveryThread(zkController, filter.getCores(),
+        DEFAULT_COLLECTION, SHARD1, replicaCoreNodeProps, 1, leaderCoreNodeName);
+    thread.publishDownState(replicaCoreNodeProps.getCoreName(), replica.getName(), replica.getNodeName(), replicaCoreNodeProps.getCoreUrl(), false);
+    timeOut = new TimeOut(30, TimeUnit.SECONDS);
+    while (!timeOut.hasTimedOut()) {
+      Replica.State state = zkController.getLeaderInitiatedRecoveryState(DEFAULT_COLLECTION, SHARD1, replica.getName());
+      if (state == Replica.State.DOWN) {
+        break;
+      }
+      Thread.sleep(500);
+    }
+    assertNotNull(zkController.getLeaderInitiatedRecoveryStateObject(DEFAULT_COLLECTION, SHARD1, replica.getName()));
+    assertEquals(Replica.State.DOWN, zkController.getLeaderInitiatedRecoveryState(DEFAULT_COLLECTION, SHARD1, replica.getName()));
+
+    /*
+    7. Test that
+     */
+  }
+
+  protected int getOverseerCversion() throws KeeperException, InterruptedException {
+    Stat stat = new Stat();
+    cloudClient.getZkStateReader().getZkClient().getData("/overseer/queue", null, stat, true);
+    return stat.getCversion();
+  }
+
+}

Modified: lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/ZkControllerTest.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/ZkControllerTest.java?rev=1702067&r1=1702066&r2=1702067&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/ZkControllerTest.java (original)
+++ lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/ZkControllerTest.java Wed Sep  9 18:07:45 2015
@@ -49,6 +49,7 @@ import java.util.Map;
 import java.util.concurrent.TimeUnit;
 
 @Slow
+@SolrTestCaseJ4.SuppressSSL
 public class ZkControllerTest extends SolrTestCaseJ4 {
 
   private static final String COLLECTION_NAME = "collection1";
@@ -243,64 +244,6 @@ public class ZkControllerTest extends So
       } finally {
         if (zkController != null)
           zkController.close();
-      }
-    } finally {
-      if (cc != null) {
-        cc.shutdown();
-      }
-      server.shutdown();
-    }
-  }
-
-  /*
-  Test that:
-  1) LIR state to 'down' is not set unless publishing node is a leader
-    1a) Test that leader can publish when LIR node already exists in zk
-    1b) Test that leader can publish when LIR node does not exist - TODO
-  2) LIR state to 'active' or 'recovery' can be set regardless of whether publishing
-    node is leader or not - TODO
-   */
-  public void testEnsureReplicaInLeaderInitiatedRecovery() throws Exception  {
-    String zkDir = createTempDir("testEnsureReplicaInLeaderInitiatedRecovery").toFile().getAbsolutePath();
-    CoreContainer cc = null;
-
-    ZkTestServer server = new ZkTestServer(zkDir);
-    try {
-      server.run();
-
-      AbstractZkTestCase.tryCleanSolrZkNode(server.getZkHost());
-      AbstractZkTestCase.makeSolrZkNode(server.getZkHost());
-
-      cc = getCoreContainer();
-      ZkController zkController = null;
-
-      try {
-        CloudConfig cloudConfig = new CloudConfig.CloudConfigBuilder("127.0.0.1", 8983, "solr").build();
-        zkController = new ZkController(cc, server.getZkAddress(), TIMEOUT, cloudConfig, new CurrentCoreDescriptorProvider() {
-
-          @Override
-          public List<CoreDescriptor> getCurrentDescriptors() {
-            // do nothing
-            return null;
-          }
-        });
-        HashMap<String, Object> propMap = new HashMap<>();
-        propMap.put(ZkStateReader.BASE_URL_PROP, "http://127.0.0.1:8983/solr");
-        propMap.put(ZkStateReader.CORE_NAME_PROP, "replica1");
-        propMap.put(ZkStateReader.NODE_NAME_PROP, "127.0.0.1:8983_solr");
-        Replica replica = new Replica("replica1", propMap);
-        try {
-          // this method doesn't throw exception when node isn't leader
-          zkController.ensureReplicaInLeaderInitiatedRecovery("c1", "shard1",
-              new ZkCoreNodeProps(replica), "non_existent_leader", false, false);
-          fail("ZkController should not write LIR state for node which is not leader");
-        } catch (Exception e) {
-          assertNull("ZkController should not write LIR state for node which is not leader",
-              zkController.getLeaderInitiatedRecoveryState("c1", "shard1", "replica1"));
-        }
-      } finally {
-        if (zkController != null)
-          zkController.close();
       }
     } finally {
       if (cc != null) {