You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ma...@apache.org on 2015/09/23 14:36:03 UTC

svn commit: r1704836 - in /lucene/dev/trunk/solr: ./ core/src/java/org/apache/solr/cloud/ core/src/java/org/apache/solr/update/processor/ core/src/test/org/apache/solr/cloud/

Author: markrmiller
Date: Wed Sep 23 12:36:02 2015
New Revision: 1704836

URL: http://svn.apache.org/viewvc?rev=1704836&view=rev
Log:
SOLR-8069: Ensure that only the valid ZooKeeper registered leader can put a replica into Leader Initiated Recovery.

Modified:
    lucene/dev/trunk/solr/CHANGES.txt
    lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/CloudDescriptor.java
    lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java
    lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/LeaderInitiatedRecoveryThread.java
    lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/ZkController.java
    lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java
    lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/HttpPartitionTest.java
    lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/TestLeaderInitiatedRecoveryThread.java

Modified: lucene/dev/trunk/solr/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/CHANGES.txt?rev=1704836&r1=1704835&r2=1704836&view=diff
==============================================================================
--- lucene/dev/trunk/solr/CHANGES.txt (original)
+++ lucene/dev/trunk/solr/CHANGES.txt Wed Sep 23 12:36:02 2015
@@ -210,6 +210,9 @@ Bug Fixes
 * SOLR-8058: Fix the exclusion filter so that collections that start with js, css, img, tpl
   can be accessed. (Upayavira, Steve Rowe, Anshum Gupta)
 
+* SOLR-8069: Ensure that only the valid ZooKeeper registered leader can put a replica into Leader 
+  Initiated Recovery. (Mark Miller, Jessica Cheng, Anshum Gupta)
+
 Optimizations
 ----------------------
 

Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/CloudDescriptor.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/CloudDescriptor.java?rev=1704836&r1=1704835&r2=1704836&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/CloudDescriptor.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/CloudDescriptor.java Wed Sep 23 12:36:02 2015
@@ -43,7 +43,7 @@ public class CloudDescriptor {
   volatile Slice.State shardState = Slice.State.ACTIVE;
   volatile String shardParent = null;
 
-  volatile boolean isLeader = false;
+  private volatile boolean isLeader = false;
   volatile Replica.State lastPublished = Replica.State.ACTIVE;
 
   public static final String NUM_SHARDS = "numShards";

Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java?rev=1704836&r1=1704835&r2=1704836&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java Wed Sep 23 12:36:02 2015
@@ -467,7 +467,7 @@ final class ShardLeaderElectionContext e
               }
               
               zkController.ensureReplicaInLeaderInitiatedRecovery(cc,
-                  collection, shardId, coreNodeProps, coreNodeName,
+                  collection, shardId, coreNodeProps, core.getCoreDescriptor(),
                   false /* forcePublishState */);
             }              
           }

Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/LeaderInitiatedRecoveryThread.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/LeaderInitiatedRecoveryThread.java?rev=1704836&r1=1704835&r2=1704836&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/LeaderInitiatedRecoveryThread.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/LeaderInitiatedRecoveryThread.java Wed Sep 23 12:36:02 2015
@@ -13,6 +13,7 @@ import org.apache.solr.common.cloud.ZkSt
 import org.apache.solr.common.params.CoreAdminParams.CoreAdminAction;
 import org.apache.solr.common.util.Utils;
 import org.apache.solr.core.CoreContainer;
+import org.apache.solr.core.CoreDescriptor;
 import org.apache.zookeeper.KeeperException;
 import org.apache.solr.util.RTimer;
 import org.slf4j.Logger;
@@ -54,7 +55,7 @@ public class LeaderInitiatedRecoveryThre
   protected String shardId;
   protected ZkCoreNodeProps nodeProps;
   protected int maxTries;
-  protected String leaderCoreNodeName;
+  private CoreDescriptor leaderCd;
   
   public LeaderInitiatedRecoveryThread(ZkController zkController, 
                                        CoreContainer cc, 
@@ -62,7 +63,7 @@ public class LeaderInitiatedRecoveryThre
                                        String shardId, 
                                        ZkCoreNodeProps nodeProps,
                                        int maxTries,
-                                       String leaderCoreNodeName)
+                                       CoreDescriptor leaderCd)
   {
     super("LeaderInitiatedRecoveryThread-"+nodeProps.getCoreName());
     this.zkController = zkController;
@@ -71,8 +72,7 @@ public class LeaderInitiatedRecoveryThre
     this.shardId = shardId;    
     this.nodeProps = nodeProps;
     this.maxTries = maxTries;
-    this.leaderCoreNodeName = leaderCoreNodeName;
-    
+    this.leaderCd = leaderCd;
     setDaemon(true);
   }
   
@@ -171,7 +171,7 @@ public class LeaderInitiatedRecoveryThre
   protected void updateLIRState(String replicaCoreNodeName) {
     zkController.updateLeaderInitiatedRecoveryState(collection,
         shardId,
-        replicaCoreNodeName, Replica.State.DOWN, leaderCoreNodeName, true);
+        replicaCoreNodeName, Replica.State.DOWN, leaderCd, true);
   }
 
   protected void sendRecoveryCommandWithRetry() throws Exception {    
@@ -257,6 +257,7 @@ public class LeaderInitiatedRecoveryThre
           break;
         }
 
+        String leaderCoreNodeName = leaderCd.getCloudDescriptor().getCoreNodeName();
         // stop trying if I'm no longer the leader
         if (leaderCoreNodeName != null && collection != null) {
           String leaderCoreNodeNameFromZk = null;
@@ -273,6 +274,13 @@ public class LeaderInitiatedRecoveryThre
             continueTrying = false;
             break;
           }
+          if (!leaderCd.getCloudDescriptor().isLeader()) {
+            log.warn("Stop trying to send recovery command to downed replica core=" + coreNeedingRecovery +
+                ",coreNodeName=" + replicaCoreNodeName + " on " + replicaNodeName + " because " +
+                leaderCoreNodeName + " is no longer the leader!");
+            continueTrying = false;
+            break;
+          }
         }
 
         // additional safeguard against the replica trying to be in the active state
@@ -297,9 +305,9 @@ public class LeaderInitiatedRecoveryThre
                   " on node "+replicaNodeName+" ack'd the leader initiated recovery state, "
                       + "no need to keep trying to send recovery command");
             } else {
-              String leaderCoreNodeName = zkStateReader.getLeaderRetry(collection, shardId, 5000).getName();
+              String lcnn = zkStateReader.getLeaderRetry(collection, shardId, 5000).getName();
               List<ZkCoreNodeProps> replicaProps = 
-                  zkStateReader.getReplicaProps(collection, shardId, leaderCoreNodeName);
+                  zkStateReader.getReplicaProps(collection, shardId, lcnn);
               if (replicaProps != null && replicaProps.size() > 0) {
                 for (ZkCoreNodeProps prop : replicaProps) {
                   final Replica replica = (Replica) prop.getNodeProps();

Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/ZkController.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/ZkController.java?rev=1704836&r1=1704835&r2=1704836&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/ZkController.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/ZkController.java Wed Sep 23 12:36:02 2015
@@ -1205,7 +1205,7 @@ public final class ZkController {
           if (state == Replica.State.ACTIVE) {
             // trying to become active, so leader-initiated state must be recovering
             if (lirState == Replica.State.RECOVERING) {
-              updateLeaderInitiatedRecoveryState(collection, shardId, coreNodeName, Replica.State.ACTIVE, null, true);
+              updateLeaderInitiatedRecoveryState(collection, shardId, coreNodeName, Replica.State.ACTIVE, cd, true);
             } else if (lirState == Replica.State.DOWN) {
               throw new SolrException(ErrorCode.INVALID_STATE,
                   "Cannot publish state of core '" + cd.getName() + "' as active without recovering first!");
@@ -1213,7 +1213,7 @@ public final class ZkController {
           } else if (state == Replica.State.RECOVERING) {
             // if it is currently DOWN, then trying to enter into recovering state is good
             if (lirState == Replica.State.DOWN) {
-              updateLeaderInitiatedRecoveryState(collection, shardId, coreNodeName, Replica.State.RECOVERING, null, true);
+              updateLeaderInitiatedRecoveryState(collection, shardId, coreNodeName, Replica.State.RECOVERING, cd, true);
             }
           }
         }
@@ -1981,7 +1981,7 @@ public final class ZkController {
   public boolean ensureReplicaInLeaderInitiatedRecovery(
       final CoreContainer container,
       final String collection, final String shardId, final ZkCoreNodeProps replicaCoreProps,
-      String leaderCoreNodeName, boolean forcePublishState)
+      CoreDescriptor leaderCd, boolean forcePublishState)
       throws KeeperException, InterruptedException {
     final String replicaUrl = replicaCoreProps.getCoreUrl();
 
@@ -2020,7 +2020,7 @@ public final class ZkController {
                 shardId,
                 replicaCoreProps,
                 120,
-                leaderCoreNodeName); // core node name of current leader
+                leaderCd);
         ExecutorService executor = container.getUpdateShardHandler().getUpdateExecutor();
         try {
           MDC.put("DistributedUpdateProcessor.replicaUrlToRecover", replicaCoreProps.getCoreUrl());
@@ -2115,14 +2115,19 @@ public final class ZkController {
   }
 
   public void updateLeaderInitiatedRecoveryState(String collection, String shardId, String coreNodeName,
-      Replica.State state, String leaderCoreNodeName, boolean retryOnConnLoss) {
+      Replica.State state, CoreDescriptor leaderCd, boolean retryOnConnLoss) {
     if (collection == null || shardId == null || coreNodeName == null) {
       log.warn("Cannot set leader-initiated recovery state znode to "
           + state.toString() + " using: collection=" + collection
           + "; shardId=" + shardId + "; coreNodeName=" + coreNodeName);
       return; // if we don't have complete data about a core in cloud mode, do nothing
     }
+    
+    assert leaderCd != null;
+    assert leaderCd.getCloudDescriptor() != null;
 
+    String leaderCoreNodeName = leaderCd.getCloudDescriptor().getCoreNodeName();
+    
     String znodePath = getLeaderInitiatedRecoveryZnodePath(collection, shardId, coreNodeName);
 
     if (state == Replica.State.ACTIVE) {
@@ -2158,7 +2163,7 @@ public final class ZkController {
 
     try {
       if (state == Replica.State.DOWN) {
-        markShardAsDownIfLeader(collection, shardId, leaderCoreNodeName, znodePath, znodeData, retryOnConnLoss);
+        markShardAsDownIfLeader(collection, shardId, leaderCd, znodePath, znodeData, retryOnConnLoss);
       } else {
         // must retry on conn loss otherwise future election attempts may assume wrong LIR state
         if (zkClient.exists(znodePath, true)) {
@@ -2184,18 +2189,36 @@ public final class ZkController {
    * in ZK. This ensures that a long running network partition caused by GC etc
    * doesn't let us mark a node as down *after* we've already lost our session
    */
-  private void markShardAsDownIfLeader(String collection, String shardId, String leaderCoreNodeName,
+  private void markShardAsDownIfLeader(String collection, String shardId, CoreDescriptor leaderCd,
                                        String znodePath, byte[] znodeData,
                                        boolean retryOnConnLoss) throws KeeperException, InterruptedException {
-    String leaderSeqPath = getLeaderSeqPath(collection, leaderCoreNodeName);
-    if (leaderSeqPath == null) {
-      throw new NotLeaderException(ErrorCode.SERVER_ERROR,
-          "Failed to update data to 'down' for znode: " + znodePath +
-              " because the zookeeper leader sequence for leader: " + leaderCoreNodeName + " is null");
+    
+
+    if (!leaderCd.getCloudDescriptor().isLeader()) {
+      log.info("No longer leader, aborting attempt to mark shard down as part of LIR");
+      throw new NotLeaderException(ErrorCode.SERVER_ERROR, "Locally, we do not think we are the leader.");
     }
+    
+    ContextKey key = new ContextKey(collection, leaderCd.getCloudDescriptor().getCoreNodeName());
+    ElectionContext context = electionContexts.get(key);
+    
+    // we make sure we locally think we are the leader before and after getting the context - then
+    // we only try zk if we still think we are the leader and have our leader context
+    if (context == null || !leaderCd.getCloudDescriptor().isLeader()) {
+      log.info("No longer leader, aborting attempt to mark shard down as part of LIR");
+      throw new NotLeaderException(ErrorCode.SERVER_ERROR, "Locally, we do not think we are the leader.");
+    }
+    
+    // we think we are the leader - get the expected shard leader version
+    // we use this version and multi to ensure *only* the current zk registered leader
+    // for a shard can put a replica into LIR
+    
+    Integer leaderZkNodeParentVersion = ((ShardLeaderElectionContextBase)context).leaderZkNodeParentVersion;
+    
+    // TODO: should we do this optimistically to avoid races?
     if (zkClient.exists(znodePath, retryOnConnLoss)) {
       List<Op> ops = new ArrayList<>(2);
-      ops.add(Op.check(leaderSeqPath, -1)); // version doesn't matter, the seq path is unique
+      ops.add(Op.check(new org.apache.hadoop.fs.Path(((ShardLeaderElectionContextBase)context).leaderPath).getParent().toString(), leaderZkNodeParentVersion));
       ops.add(Op.setData(znodePath, znodeData, -1));
       zkClient.multi(ops, retryOnConnLoss);
     } else {
@@ -2205,8 +2228,10 @@ public final class ZkController {
       } catch (KeeperException.NodeExistsException nee) {
         // if it exists, that's great!
       }
+      
+      // we only create the entry if the context we are using is registered as the current leader in ZK
       List<Op> ops = new ArrayList<>(2);
-      ops.add(Op.check(leaderSeqPath, -1)); // version doesn't matter, the seq path is unique
+      ops.add(Op.check(new org.apache.hadoop.fs.Path(((ShardLeaderElectionContextBase)context).leaderPath).getParent().toString(), leaderZkNodeParentVersion));
       ops.add(Op.create(znodePath, znodeData, zkClient.getZkACLProvider().getACLsToAdd(znodePath),
           CreateMode.PERSISTENT));
       zkClient.multi(ops, retryOnConnLoss);

Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java?rev=1704836&r1=1704835&r2=1704836&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java Wed Sep 23 12:36:02 2015
@@ -886,7 +886,7 @@ public class DistributedUpdateProcessor
                 collection,
                 shardId,
                 stdNode.getNodeProps(),
-                leaderCoreNodeName,
+                req.getCore().getCoreDescriptor(),
                 false /* forcePublishState */
             );
           } catch (Exception exc) {

Modified: lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/HttpPartitionTest.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/HttpPartitionTest.java?rev=1704836&r1=1704835&r2=1704836&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/HttpPartitionTest.java (original)
+++ lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/HttpPartitionTest.java Wed Sep 23 12:36:02 2015
@@ -17,6 +17,19 @@ package org.apache.solr.cloud;
  * limitations under the License.
  */
 
+import java.io.File;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
 import org.apache.lucene.util.LuceneTestCase.Slow;
 import org.apache.solr.JSONTestUtil;
 import org.apache.solr.SolrTestCaseJ4.SuppressSSL;
@@ -40,23 +53,12 @@ import org.apache.solr.core.CoreContaine
 import org.apache.solr.core.SolrCore;
 import org.apache.solr.servlet.SolrDispatchFilter;
 import org.apache.solr.update.UpdateLog;
+import org.apache.solr.util.MockCoreContainer.MockCoreDescriptor;
 import org.apache.solr.util.RTimer;
 import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.File;
-import java.io.IOException;
-import java.nio.charset.StandardCharsets;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.TimeUnit;
-
 /**
  * Simulates HTTP partitions between a leader and replica but the replica does
  * not lose its ZooKeeper connection.
@@ -145,7 +147,22 @@ public class HttpPartitionTest extends A
     ZkCoreNodeProps replicaCoreNodeProps = new ZkCoreNodeProps(notLeader);
     String replicaUrl = replicaCoreNodeProps.getCoreUrl();
 
-    zkController.updateLeaderInitiatedRecoveryState(testCollectionName, shardId, notLeader.getName(), Replica.State.DOWN, leader.getName(), true);
+    MockCoreDescriptor cd = new MockCoreDescriptor() {
+      public CloudDescriptor getCloudDescriptor() {
+        return new CloudDescriptor(leader.getStr(ZkStateReader.CORE_NAME_PROP), new Properties(), this) {
+          @Override
+          public String getCoreNodeName() {
+            return leader.getName();
+          }
+          @Override
+          public boolean isLeader() {
+            return true;
+          }
+        };
+      }
+    };
+    
+    zkController.updateLeaderInitiatedRecoveryState(testCollectionName, shardId, notLeader.getName(), Replica.State.DOWN, cd, true);
     Map<String,Object> lirStateMap = zkController.getLeaderInitiatedRecoveryStateObject(testCollectionName, shardId, notLeader.getName());
     assertNotNull(lirStateMap);
     assertSame(Replica.State.DOWN, Replica.State.getState((String) lirStateMap.get(ZkStateReader.STATE_PROP)));

Modified: lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/TestLeaderInitiatedRecoveryThread.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/TestLeaderInitiatedRecoveryThread.java?rev=1704836&r1=1704835&r2=1704836&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/TestLeaderInitiatedRecoveryThread.java (original)
+++ lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/TestLeaderInitiatedRecoveryThread.java Wed Sep 23 12:36:02 2015
@@ -1,5 +1,7 @@
 package org.apache.solr.cloud;
 
+import java.util.Properties;
+
 /*
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
@@ -23,8 +25,10 @@ import org.apache.solr.SolrTestCaseJ4;
 import org.apache.solr.common.SolrException;
 import org.apache.solr.common.cloud.Replica;
 import org.apache.solr.common.cloud.ZkCoreNodeProps;
+import org.apache.solr.common.cloud.ZkStateReader;
 import org.apache.solr.servlet.SolrDispatchFilter;
 import org.apache.solr.util.TimeOut;
+import org.apache.solr.util.MockCoreContainer.MockCoreDescriptor;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.data.Stat;
 
@@ -57,13 +61,29 @@ public class TestLeaderInitiatedRecovery
     assertNotNull(notLeader);
     Replica replica = cloudClient.getZkStateReader().getClusterState().getReplica(DEFAULT_COLLECTION, notLeader.coreNodeName);
     ZkCoreNodeProps replicaCoreNodeProps = new ZkCoreNodeProps(replica);
+    
+    MockCoreDescriptor cd = new MockCoreDescriptor() {
+      public CloudDescriptor getCloudDescriptor() {
+        return new CloudDescriptor(shardToLeaderJetty.get(SHARD1).info.getStr(ZkStateReader.CORE_NAME_PROP), new Properties(), this) {
+          @Override
+          public String getCoreNodeName() {
+            return shardToLeaderJetty.get(SHARD1).info.getStr(ZkStateReader.CORE_NODE_NAME_PROP);
+          }
+          @Override
+          public boolean isLeader() {
+            return true;
+          }
+        };
+      }
+    };
 
     /*
      1. Test that publishDownState throws exception when zkController.isReplicaInRecoveryHandling == false
       */
     try {
+      
       LeaderInitiatedRecoveryThread thread = new LeaderInitiatedRecoveryThread(zkController, filter.getCores(),
-          DEFAULT_COLLECTION, SHARD1, replicaCoreNodeProps, 1, leaderCoreNodeName);
+          DEFAULT_COLLECTION, SHARD1, replicaCoreNodeProps, 1, cd);
       assertFalse(zkController.isReplicaInRecoveryHandling(replicaCoreNodeProps.getCoreUrl()));
       thread.run();
       fail("publishDownState should not have succeeded because replica url is not marked in leader initiated recovery in ZkController");
@@ -76,7 +96,7 @@ public class TestLeaderInitiatedRecovery
      2. Test that a non-live replica cannot be put into LIR or down state
       */
     LeaderInitiatedRecoveryThread thread = new LeaderInitiatedRecoveryThread(zkController, filter.getCores(),
-        DEFAULT_COLLECTION, SHARD1, replicaCoreNodeProps, 1, leaderCoreNodeName);
+        DEFAULT_COLLECTION, SHARD1, replicaCoreNodeProps, 1, cd);
     // kill the replica
     int children = cloudClient.getZkStateReader().getZkClient().getChildren("/live_nodes", null, true).size();
     ChaosMonkey.stop(notLeader.jetty);
@@ -107,7 +127,7 @@ public class TestLeaderInitiatedRecovery
     waitForRecoveriesToFinish(true);
 
     thread = new LeaderInitiatedRecoveryThread(zkController, filter.getCores(),
-        DEFAULT_COLLECTION, SHARD1, replicaCoreNodeProps, 1, leaderCoreNodeName) {
+        DEFAULT_COLLECTION, SHARD1, replicaCoreNodeProps, 1, cd) {
       @Override
       protected void updateLIRState(String replicaCoreNodeName) {
         throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "", new KeeperException.ConnectionLossException());
@@ -122,7 +142,7 @@ public class TestLeaderInitiatedRecovery
      4. Test that if ZK connection loss or session expired then thread should not attempt to publish down state even if forcePublish=true
       */
     thread = new LeaderInitiatedRecoveryThread(zkController, filter.getCores(),
-        DEFAULT_COLLECTION, SHARD1, replicaCoreNodeProps, 1, leaderCoreNodeName) {
+        DEFAULT_COLLECTION, SHARD1, replicaCoreNodeProps, 1, cd) {
       @Override
       protected void updateLIRState(String replicaCoreNodeName) {
         throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "", new KeeperException.SessionExpiredException());
@@ -137,7 +157,7 @@ public class TestLeaderInitiatedRecovery
      5. Test that any exception other then ZK connection loss or session expired should publish down state only if forcePublish=true
       */
     thread = new LeaderInitiatedRecoveryThread(zkController, filter.getCores(),
-        DEFAULT_COLLECTION, SHARD1, replicaCoreNodeProps, 1, leaderCoreNodeName) {
+        DEFAULT_COLLECTION, SHARD1, replicaCoreNodeProps, 1, cd) {
       @Override
       protected void updateLIRState(String replicaCoreNodeName) {
         throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "bogus exception");
@@ -171,11 +191,12 @@ public class TestLeaderInitiatedRecovery
     /*
     6. Test that non-leader cannot set LIR nodes
      */
+    
     filter = (SolrDispatchFilter) notLeader.jetty.getDispatchFilter().getFilter();
     zkController = filter.getCores().getZkController();
 
     thread = new LeaderInitiatedRecoveryThread(zkController, filter.getCores(),
-        DEFAULT_COLLECTION, SHARD1, replicaCoreNodeProps, 1, leaderCoreNodeName) {
+        DEFAULT_COLLECTION, SHARD1, replicaCoreNodeProps, 1, filter.getCores().getCores().iterator().next().getCoreDescriptor()) {
       @Override
       protected void updateLIRState(String replicaCoreNodeName) {
         try {
@@ -197,7 +218,7 @@ public class TestLeaderInitiatedRecovery
     filter = (SolrDispatchFilter) leaderRunner.jetty.getDispatchFilter().getFilter();
     zkController = filter.getCores().getZkController();
     thread = new LeaderInitiatedRecoveryThread(zkController, filter.getCores(),
-        DEFAULT_COLLECTION, SHARD1, replicaCoreNodeProps, 1, leaderCoreNodeName);
+        DEFAULT_COLLECTION, SHARD1, replicaCoreNodeProps, 1, filter.getCores().getCores().iterator().next().getCoreDescriptor());
     thread.publishDownState(replicaCoreNodeProps.getCoreName(), replica.getName(), replica.getNodeName(), replicaCoreNodeProps.getCoreUrl(), false);
     timeOut = new TimeOut(30, TimeUnit.SECONDS);
     while (!timeOut.hasTimedOut()) {