You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by rj...@apache.org on 2015/03/31 07:22:50 UTC

svn commit: r1670257 [21/39] - in /lucene/dev/branches/lucene6271: ./ dev-tools/ dev-tools/idea/.idea/libraries/ dev-tools/scripts/ lucene/ lucene/analysis/ lucene/analysis/common/ lucene/analysis/common/src/java/org/apache/lucene/analysis/miscellaneou...

Modified: lucene/dev/branches/lucene6271/solr/core/src/java/org/apache/solr/cloud/ZkController.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene6271/solr/core/src/java/org/apache/solr/cloud/ZkController.java?rev=1670257&r1=1670256&r2=1670257&view=diff
==============================================================================
--- lucene/dev/branches/lucene6271/solr/core/src/java/org/apache/solr/cloud/ZkController.java (original)
+++ lucene/dev/branches/lucene6271/solr/core/src/java/org/apache/solr/cloud/ZkController.java Tue Mar 31 05:22:40 2015
@@ -45,12 +45,15 @@ import org.apache.solr.common.cloud.ZkSt
 import org.apache.solr.common.cloud.ZooKeeperException;
 import org.apache.solr.common.params.CollectionParams;
 import org.apache.solr.common.params.SolrParams;
+import org.apache.solr.common.util.StrUtils;
 import org.apache.solr.common.util.URLUtil;
 import org.apache.solr.core.CloseHook;
+import org.apache.solr.core.CloudConfig;
 import org.apache.solr.core.CoreContainer;
 import org.apache.solr.core.CoreDescriptor;
 import org.apache.solr.core.SolrCore;
 import org.apache.solr.handler.component.ShardHandler;
+import org.apache.solr.logging.MDCUtils;
 import org.apache.solr.update.UpdateLog;
 import org.apache.solr.update.UpdateShardHandler;
 import org.apache.zookeeper.CreateMode;
@@ -58,11 +61,13 @@ import org.apache.zookeeper.KeeperExcept
 import org.apache.zookeeper.KeeperException.ConnectionLossException;
 import org.apache.zookeeper.KeeperException.NoNodeException;
 import org.apache.zookeeper.KeeperException.SessionExpiredException;
+import org.apache.zookeeper.Op;
 import org.apache.zookeeper.WatchedEvent;
 import org.apache.zookeeper.Watcher;
 import org.apache.zookeeper.data.Stat;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.slf4j.MDC;
 
 import java.io.IOException;
 import java.io.UnsupportedEncodingException;
@@ -98,29 +103,20 @@ import static org.apache.solr.common.clo
 import static org.apache.solr.common.cloud.ZkStateReader.REJOIN_AT_HEAD_PROP;
 import static org.apache.solr.common.cloud.ZkStateReader.SHARD_ID_PROP;
 
-import static org.apache.solr.common.cloud.ZkStateReader.BASE_URL_PROP;
-import static org.apache.solr.common.cloud.ZkStateReader.COLLECTION_PROP;
-import static org.apache.solr.common.cloud.ZkStateReader.CORE_NAME_PROP;
-import static org.apache.solr.common.cloud.ZkStateReader.ELECTION_NODE_PROP;
-import static org.apache.solr.common.cloud.ZkStateReader.NODE_NAME_PROP;
-import static org.apache.solr.common.cloud.ZkStateReader.REJOIN_AT_HEAD_PROP;
-import static org.apache.solr.common.cloud.ZkStateReader.SHARD_ID_PROP;
-
 /**
  * Handle ZooKeeper interactions.
- * 
+ * <p>
  * notes: loads everything on init, creates what's not there - further updates
  * are prompted with Watches.
- * 
+ * <p>
  * TODO: exceptions during close on attempts to update cloud state
- * 
  */
 public final class ZkController {
 
   private static Logger log = LoggerFactory.getLogger(ZkController.class);
 
   private final boolean SKIP_AUTO_RECOVERY = Boolean.getBoolean("solrcloud.skip.autorecovery");
-  
+
   private final DistributedQueue overseerJobQueue;
   private final DistributedQueue overseerCollectionQueue;
 
@@ -128,19 +124,19 @@ public final class ZkController {
   private final DistributedMap overseerCompletedMap;
   private final DistributedMap overseerFailureMap;
 
-  public final static String COLLECTION_PARAM_PREFIX="collection.";
-  public final static String CONFIGNAME_PROP="configName";
+  public final static String COLLECTION_PARAM_PREFIX = "collection.";
+  public final static String CONFIGNAME_PROP = "configName";
 
   static class ContextKey {
 
     private String collection;
     private String coreNodeName;
-    
+
     public ContextKey(String collection, String coreNodeName) {
       this.collection = collection;
       this.coreNodeName = coreNodeName;
     }
-    
+
     @Override
     public int hashCode() {
       final int prime = 31;
@@ -167,25 +163,26 @@ public final class ZkController {
       return true;
     }
   }
-  private final Map<ContextKey, ElectionContext> electionContexts = Collections.synchronizedMap(new HashMap<ContextKey, ElectionContext>());
-  
+
+  private final Map<ContextKey, ElectionContext> electionContexts = Collections.synchronizedMap(new HashMap<>());
+
   private final SolrZkClient zkClient;
   private final ZkCmdExecutor cmdExecutor;
   private final ZkStateReader zkStateReader;
 
   private final LeaderElector leaderElector;
-  
+
   private final String zkServerAddress;          // example: 127.0.0.1:54062/solr
 
-  private final String localHostPort;      // example: 54065
-  private final String localHostContext;   // example: solr
+  private final int localHostPort;      // example: 54065
   private final String hostName;           // example: 127.0.0.1
   private final String nodeName;           // example: 127.0.0.1:54065_solr
   private final String baseURL;            // example: http://127.0.0.1:54065/solr
 
+  private final CloudConfig cloudConfig;
 
   private LeaderElector overseerElector;
-  
+
 
   // for now, this can be null in tests, in which case recovery will be inactive, and other features
   // may accept defaults or use mocks rather than pulling things from a CoreContainer
@@ -195,16 +192,16 @@ public final class ZkController {
 
   private int leaderVoteWait;
   private int leaderConflictResolveWait;
-  
+
   private boolean genericCoreNodeNames;
 
   private int clientTimeout;
 
   private volatile boolean isClosed;
-  
+
   // keeps track of replicas that have been asked to recover by leaders running on this node
-  private final Map<String,String> replicasInLeaderInitiatedRecovery = new HashMap<String,String>();
-  
+  private final Map<String, String> replicasInLeaderInitiatedRecovery = new HashMap<String, String>();
+
   // This is an expert and unsupported development mode that does not create
   // an Overseer or register a /live node. This let's you monitor the cluster
   // and interact with zookeeper via the Solr admin UI on a node outside the cluster,
@@ -214,33 +211,33 @@ public final class ZkController {
   // keeps track of a list of objects that need to know a new ZooKeeper session was created after expiration occurred
   private List<OnReconnect> reconnectListeners = new ArrayList<OnReconnect>();
 
-  public ZkController(final CoreContainer cc, String zkServerAddress, int zkClientTimeout, int zkClientConnectTimeout, String localHost, String locaHostPort,
-                      String localHostContext, int leaderVoteWait, int leaderConflictResolveWait, boolean genericCoreNodeNames, final CurrentCoreDescriptorProvider registerOnReconnect)
+  public ZkController(final CoreContainer cc, String zkServerAddress, int zkClientConnectTimeout, CloudConfig cloudConfig, final CurrentCoreDescriptorProvider registerOnReconnect)
       throws InterruptedException, TimeoutException, IOException {
 
     if (cc == null) throw new IllegalArgumentException("CoreContainer cannot be null.");
     this.cc = cc;
-    this.genericCoreNodeNames = genericCoreNodeNames;
+
+    this.cloudConfig = cloudConfig;
+
+    this.genericCoreNodeNames = cloudConfig.getGenericCoreNodeNames();
+
     // be forgiving and strip this off leading/trailing slashes
     // this allows us to support users specifying hostContext="/" in 
     // solr.xml to indicate the root context, instead of hostContext="" 
     // which means the default of "solr"
-    localHostContext = trimLeadingAndTrailingSlashes(localHostContext);
+    String localHostContext = trimLeadingAndTrailingSlashes(cloudConfig.getSolrHostContext());
 
     this.zkServerAddress = zkServerAddress;
-    this.localHostPort = locaHostPort;
-    this.localHostContext = localHostContext;
-    this.hostName = normalizeHostName(localHost);
-    this.nodeName = generateNodeName(this.hostName,
-        this.localHostPort,
-        this.localHostContext);
+    this.localHostPort = cloudConfig.getSolrHostPort();
+    this.hostName = normalizeHostName(cloudConfig.getHost());
+    this.nodeName = generateNodeName(this.hostName, Integer.toString(this.localHostPort), localHostContext);
 
-    this.leaderVoteWait = leaderVoteWait;
-    this.leaderConflictResolveWait = leaderConflictResolveWait;
+    this.leaderVoteWait = cloudConfig.getLeaderVoteWait();
+    this.leaderConflictResolveWait = cloudConfig.getLeaderConflictResolveWait();
 
-    this.clientTimeout = zkClientTimeout;
+    this.clientTimeout = cloudConfig.getZkClientTimeout();
     DefaultConnectionStrategy strat = new DefaultConnectionStrategy();
-    String zkACLProviderClass = cc.getConfig().getZkACLProviderClass();
+    String zkACLProviderClass = cloudConfig.getZkACLProviderClass();
     ZkACLProvider zkACLProvider = null;
     if (zkACLProviderClass != null && zkACLProviderClass.trim().length() > 0) {
       zkACLProvider = cc.getResourceLoader().newInstance(zkACLProviderClass, ZkACLProvider.class);
@@ -248,7 +245,7 @@ public final class ZkController {
       zkACLProvider = new DefaultZkACLProvider();
     }
 
-    String zkCredentialsProviderClass = cc.getConfig().getZkCredentialsProviderClass();
+    String zkCredentialsProviderClass = cloudConfig.getZkCredentialsProviderClass();
     if (zkCredentialsProviderClass != null && zkCredentialsProviderClass.trim().length() > 0) {
       strat.setZkCredentialsToAddAutomatically(cc.getResourceLoader().newInstance(zkCredentialsProviderClass, ZkCredentialsProvider.class));
     } else {
@@ -256,8 +253,7 @@ public final class ZkController {
     }
     addOnReconnectListener(getConfigDirListener());
 
-    zkClient = new SolrZkClient(zkServerAddress, zkClientTimeout,
-        zkClientConnectTimeout, strat,
+    zkClient = new SolrZkClient(zkServerAddress, clientTimeout, zkClientConnectTimeout, strat,
         // on reconnect, reload cloud info
         new OnReconnect() {
 
@@ -362,7 +358,7 @@ public final class ZkController {
     this.overseerRunningMap = Overseer.getRunningMap(zkClient);
     this.overseerCompletedMap = Overseer.getCompletedMap(zkClient);
     this.overseerFailureMap = Overseer.getFailureMap(zkClient);
-    cmdExecutor = new ZkCmdExecutor(zkClientTimeout);
+    cmdExecutor = new ZkCmdExecutor(clientTimeout);
     leaderElector = new LeaderElector(zkClient);
     zkStateReader = new ZkStateReader(zkClient);
 
@@ -374,7 +370,7 @@ public final class ZkController {
   public int getLeaderVoteWait() {
     return leaderVoteWait;
   }
-  
+
   public int getLeaderConflictResolveWait() {
     return leaderConflictResolveWait;
   }
@@ -408,7 +404,7 @@ public final class ZkController {
           }
         }
       }
-        
+
       for (CoreDescriptor descriptor : descriptors) {
         // if it looks like we are going to be the leader, we don't
         // want to wait for the following stuff
@@ -416,7 +412,7 @@ public final class ZkController {
         String collection = cloudDesc.getCollectionName();
         String slice = cloudDesc.getShardId();
         try {
-          
+
           int children = zkStateReader
               .getZkClient()
               .getChildren(
@@ -439,7 +435,7 @@ public final class ZkController {
 
         final String coreZkNodeName = descriptor.getCloudDescriptor().getCoreNodeName();
         try {
-          log.debug("calling waitForLeaderToSeeDownState for coreZkNodeName={} collection={} shard={}", new Object[] {coreZkNodeName,  collection, slice});
+          log.debug("calling waitForLeaderToSeeDownState for coreZkNodeName={} collection={} shard={}", new Object[]{coreZkNodeName, collection, slice});
           waitForLeaderToSeeDownState(descriptor, coreZkNodeName);
         } catch (Exception e) {
           SolrException.log(log, "", e);
@@ -455,7 +451,7 @@ public final class ZkController {
       }
     }
   }
-  
+
   private void markAllAsNotLeader(
       final CurrentCoreDescriptorProvider registerOnReconnect) {
     List<CoreDescriptor> descriptors = registerOnReconnect
@@ -503,7 +499,7 @@ public final class ZkController {
         }
       }
     }
-    
+
   }
 
   /**
@@ -534,7 +530,7 @@ public final class ZkController {
       throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR,
           "Config file contains no data:" + zkPath);
     }
-    
+
     return bytes;
   }
 
@@ -572,19 +568,19 @@ public final class ZkController {
       }
       host = hostaddress;
     } else {
-      if(URLUtil.hasScheme(host)) {
+      if (URLUtil.hasScheme(host)) {
         host = URLUtil.removeScheme(host);
       }
     }
 
     return host;
   }
-  
+
   public String getHostName() {
     return hostName;
   }
-  
-  public String getHostPort() {
+
+  public int getHostPort() {
     return localHostPort;
   }
 
@@ -599,42 +595,55 @@ public final class ZkController {
     return zkServerAddress;
   }
 
+  /**
+   * Create the zknodes necessary for a cluster to operate
+   *
+   * @param zkClient a SolrZkClient
+   * @throws KeeperException      if there is a Zookeeper error
+   * @throws InterruptedException on interrupt
+   */
+  public static void createClusterZkNodes(SolrZkClient zkClient) throws KeeperException, InterruptedException {
+    ZkCmdExecutor cmdExecutor = new ZkCmdExecutor(zkClient.getZkClientTimeout());
+    cmdExecutor.ensureExists(ZkStateReader.LIVE_NODES_ZKNODE, zkClient);
+    cmdExecutor.ensureExists(ZkStateReader.COLLECTIONS_ZKNODE, zkClient);
+    cmdExecutor.ensureExists(ZkStateReader.ALIASES, zkClient);
+    cmdExecutor.ensureExists(ZkStateReader.CLUSTER_STATE, zkClient);
+  }
+
   private void init(CurrentCoreDescriptorProvider registerOnReconnect) {
 
     try {
       boolean createdWatchesAndUpdated = false;
       Stat stat = zkClient.exists(ZkStateReader.LIVE_NODES_ZKNODE, null, true);
-      if (stat!= null && stat.getNumChildren()>0) {
+      if (stat != null && stat.getNumChildren() > 0) {
         zkStateReader.createClusterStateWatchersAndUpdate();
         createdWatchesAndUpdated = true;
         publishAndWaitForDownStates();
       }
-      
-      // makes nodes zkNode
-      cmdExecutor.ensureExists(ZkStateReader.LIVE_NODES_ZKNODE, zkClient);
-      
+
+      createClusterZkNodes(zkClient);
+
       createEphemeralLiveNode();
-      cmdExecutor.ensureExists(ZkStateReader.COLLECTIONS_ZKNODE, zkClient);
 
       ShardHandler shardHandler;
       UpdateShardHandler updateShardHandler;
       shardHandler = cc.getShardHandlerFactory().getShardHandler();
       updateShardHandler = cc.getUpdateShardHandler();
-      
+
       if (!zkRunOnly) {
         overseerElector = new LeaderElector(zkClient);
         this.overseer = new Overseer(shardHandler, updateShardHandler,
-            CoreContainer.CORES_HANDLER_PATH, zkStateReader, this, cc.getConfig());
+            CoreContainer.CORES_HANDLER_PATH, zkStateReader, this, cloudConfig);
         ElectionContext context = new OverseerElectionContext(zkClient,
             overseer, getNodeName());
         overseerElector.setup(context);
         overseerElector.joinElection(context, false);
       }
-      
+
       if (!createdWatchesAndUpdated) {
         zkStateReader.createClusterStateWatchersAndUpdate();
       }
-      
+
     } catch (IOException e) {
       log.error("", e);
       throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
@@ -655,7 +664,7 @@ public final class ZkController {
 
   public void publishAndWaitForDownStates() throws KeeperException,
       InterruptedException {
-    
+
     ClusterState clusterState = zkStateReader.getClusterState();
     Set<String> collections = clusterState.getCollections();
     List<String> updatedNodes = new ArrayList<>();
@@ -667,7 +676,7 @@ public final class ZkController {
         for (Replica replica : replicas) {
           if (getNodeName().equals(replica.getNodeName())
               && !(replica.getStr(ZkStateReader.STATE_PROP)
-                  .equals(ZkStateReader.DOWN))) {
+              .equals(ZkStateReader.DOWN))) {
             ZkNodeProps m = new ZkNodeProps(Overseer.QUEUE_OPERATION, "state",
                 ZkStateReader.STATE_PROP, ZkStateReader.DOWN,
                 ZkStateReader.BASE_URL_PROP, getBaseUrl(),
@@ -686,7 +695,7 @@ public final class ZkController {
         }
       }
     }
-    
+
     // now wait till the updates are in our state
     long now = System.nanoTime();
     long timeout = now + TimeUnit.NANOSECONDS.convert(60, TimeUnit.SECONDS);
@@ -703,12 +712,12 @@ public final class ZkController {
             if (replica.getStr(ZkStateReader.STATE_PROP).equals(
                 ZkStateReader.DOWN)) {
               updatedNodes.remove(replica.getStr(ZkStateReader.CORE_NAME_PROP));
-              
+
             }
           }
         }
       }
-      
+
       if (updatedNodes.size() == 0) {
         foundStates = true;
         Thread.sleep(1000);
@@ -719,16 +728,16 @@ public final class ZkController {
     if (!foundStates) {
       log.warn("Timed out waiting to see all nodes published as DOWN in our cluster state.");
     }
-    
+
   }
-  
+
   /**
    * Validates if the chroot exists in zk (or if it is successfully created).
    * Optionally, if create is set to true this method will create the path in
    * case it doesn't exist
-   * 
+   *
    * @return true if the path exists or is created false if the path doesn't
-   *         exist and 'create' = false
+   * exist and 'create' = false
    */
   public static boolean checkChrootPath(String zkHost, boolean create)
       throws KeeperException, InterruptedException {
@@ -761,7 +770,7 @@ public final class ZkController {
     String nodeName = getNodeName();
     String nodePath = ZkStateReader.LIVE_NODES_ZKNODE + "/" + nodeName;
     log.info("Register node as live in ZooKeeper:" + nodePath);
-   
+
     try {
       boolean nodeDeleted = true;
       try {
@@ -787,9 +796,9 @@ public final class ZkController {
       if (e.code() != KeeperException.Code.NODEEXISTS) {
         throw e;
       }
-    }    
+    }
   }
-  
+
   public String getNodeName() {
     return nodeName;
   }
@@ -805,114 +814,120 @@ public final class ZkController {
 
   /**
    * Register shard with ZooKeeper.
-   * 
+   *
    * @return the shardId for the SolrCore
    */
-  public String register(String coreName, final CoreDescriptor desc) throws Exception {  
+  public String register(String coreName, final CoreDescriptor desc) throws Exception {
     return register(coreName, desc, false, false);
   }
-  
+
 
   /**
    * Register shard with ZooKeeper.
-   * 
+   *
    * @return the shardId for the SolrCore
    */
-  public String register(String coreName, final CoreDescriptor desc, boolean recoverReloadedCores, boolean afterExpiration) throws Exception {  
+  public String register(String coreName, final CoreDescriptor desc, boolean recoverReloadedCores, boolean afterExpiration) throws Exception {
     // pre register has published our down state
-    
     final String baseUrl = getBaseUrl();
-    
+
     final CloudDescriptor cloudDesc = desc.getCloudDescriptor();
     final String collection = cloudDesc.getCollectionName();
 
+    Map previousMDCContext = MDC.getCopyOfContextMap();
+    MDCUtils.setCollection(collection);
+
     final String coreZkNodeName = desc.getCloudDescriptor().getCoreNodeName();
     assert coreZkNodeName != null : "we should have a coreNodeName by now";
-    
-    String shardId = cloudDesc.getShardId();
 
-    Map<String,Object> props = new HashMap<>();
- // we only put a subset of props into the leader node
+    String shardId = cloudDesc.getShardId();
+    MDCUtils.setShard(shardId);
+    Map<String, Object> props = new HashMap<>();
+    // we only put a subset of props into the leader node
     props.put(ZkStateReader.BASE_URL_PROP, baseUrl);
     props.put(ZkStateReader.CORE_NAME_PROP, coreName);
     props.put(ZkStateReader.NODE_NAME_PROP, getNodeName());
 
 
     if (log.isInfoEnabled()) {
-        log.info("Register replica - core:" + coreName + " address:"
-            + baseUrl + " collection:" + cloudDesc.getCollectionName() + " shard:" + shardId);
+      log.info("Register replica - core:" + coreName + " address:"
+          + baseUrl + " collection:" + cloudDesc.getCollectionName() + " shard:" + shardId);
     }
 
     ZkNodeProps leaderProps = new ZkNodeProps(props);
-    
+
     try {
-      // If we're a preferred leader, insert ourselves at the head of the queue
-      boolean joinAtHead = false;
-      Replica replica = zkStateReader.getClusterState().getReplica(desc.getCloudDescriptor().getCollectionName(), coreZkNodeName);
-      if (replica != null) {
-        joinAtHead = replica.getBool(SliceMutator.PREFERRED_LEADER_PROP, false);
+      try {
+        // If we're a preferred leader, insert ourselves at the head of the queue
+        boolean joinAtHead = false;
+        Replica replica = zkStateReader.getClusterState().getReplica(desc.getCloudDescriptor().getCollectionName(), coreZkNodeName);
+        if (replica != null) {
+          joinAtHead = replica.getBool(SliceMutator.PREFERRED_LEADER_PROP, false);
+        }
+        joinElection(desc, afterExpiration, joinAtHead);
+      } catch (InterruptedException e) {
+        // Restore the interrupted status
+        Thread.currentThread().interrupt();
+        throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR, "", e);
+      } catch (KeeperException | IOException e) {
+        throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR, "", e);
       }
-      joinElection(desc, afterExpiration, joinAtHead);
-    } catch (InterruptedException e) {
-      // Restore the interrupted status
-      Thread.currentThread().interrupt();
-      throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR, "", e);
-    } catch (KeeperException | IOException e) {
-      throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR, "", e);
-    }
 
 
-    // in this case, we want to wait for the leader as long as the leader might 
-    // wait for a vote, at least - but also long enough that a large cluster has
-    // time to get its act together
-    String leaderUrl = getLeader(cloudDesc, leaderVoteWait + 600000);
-    
-    String ourUrl = ZkCoreNodeProps.getCoreUrl(baseUrl, coreName);
-    log.info("We are " + ourUrl + " and leader is " + leaderUrl);
-    boolean isLeader = leaderUrl.equals(ourUrl);
-
-    try (SolrCore core = cc.getCore(desc.getName())) {
-
-      // recover from local transaction log and wait for it to complete before
-      // going active
-      // TODO: should this be moved to another thread? To recoveryStrat?
-      // TODO: should this actually be done earlier, before (or as part of)
-      // leader election perhaps?
-
-      UpdateLog ulog = core.getUpdateHandler().getUpdateLog();
-      if (!core.isReloaded() && ulog != null) {
-        // disable recovery in case shard is in construction state (for shard splits)
-        Slice slice = getClusterState().getSlice(collection, shardId);
-        if (!Slice.CONSTRUCTION.equals(slice.getState()) || !isLeader) {
-          Future<UpdateLog.RecoveryInfo> recoveryFuture = core.getUpdateHandler()
-              .getUpdateLog().recoverFromLog();
-          if (recoveryFuture != null) {
-            log.info("Replaying tlog for "+ourUrl+" during startup... NOTE: This can take a while.");
-            recoveryFuture.get(); // NOTE: this could potentially block for
-            // minutes or more!
-            // TODO: public as recovering in the mean time?
-            // TODO: in the future we could do peersync in parallel with recoverFromLog
-          } else {
-            log.info("No LogReplay needed for core=" + core.getName() + " baseURL=" + baseUrl);
+      // in this case, we want to wait for the leader as long as the leader might 
+      // wait for a vote, at least - but also long enough that a large cluster has
+      // time to get its act together
+      String leaderUrl = getLeader(cloudDesc, leaderVoteWait + 600000);
+
+      String ourUrl = ZkCoreNodeProps.getCoreUrl(baseUrl, coreName);
+      log.info("We are " + ourUrl + " and leader is " + leaderUrl);
+      boolean isLeader = leaderUrl.equals(ourUrl);
+
+      try (SolrCore core = cc.getCore(desc.getName())) {
+
+        // recover from local transaction log and wait for it to complete before
+        // going active
+        // TODO: should this be moved to another thread? To recoveryStrat?
+        // TODO: should this actually be done earlier, before (or as part of)
+        // leader election perhaps?
+
+        UpdateLog ulog = core.getUpdateHandler().getUpdateLog();
+        if (!core.isReloaded() && ulog != null) {
+          // disable recovery in case shard is in construction state (for shard splits)
+          Slice slice = getClusterState().getSlice(collection, shardId);
+          if (!Slice.CONSTRUCTION.equals(slice.getState()) || !isLeader) {
+            Future<UpdateLog.RecoveryInfo> recoveryFuture = core.getUpdateHandler()
+                .getUpdateLog().recoverFromLog();
+            if (recoveryFuture != null) {
+              log.info("Replaying tlog for " + ourUrl + " during startup... NOTE: This can take a while.");
+              recoveryFuture.get(); // NOTE: this could potentially block for
+              // minutes or more!
+              // TODO: public as recovering in the mean time?
+              // TODO: in the future we could do peersync in parallel with recoverFromLog
+            } else {
+              log.info("No LogReplay needed for core=" + core.getName() + " baseURL=" + baseUrl);
+            }
+          }
+          boolean didRecovery = checkRecovery(coreName, desc, recoverReloadedCores, isLeader, cloudDesc,
+              collection, coreZkNodeName, shardId, leaderProps, core, cc);
+          if (!didRecovery) {
+            publish(desc, ZkStateReader.ACTIVE);
           }
-        }
-        boolean didRecovery = checkRecovery(coreName, desc, recoverReloadedCores, isLeader, cloudDesc,
-            collection, coreZkNodeName, shardId, leaderProps, core, cc);
-        if (!didRecovery) {
-          publish(desc, ZkStateReader.ACTIVE);
         }
       }
+
+      // make sure we have an update cluster state right away
+      zkStateReader.updateClusterState(true);
+      return shardId;
+    } finally {
+      MDCUtils.cleanupMDC(previousMDCContext);
     }
-    
-    // make sure we have an update cluster state right away
-    zkStateReader.updateClusterState(true);
-    return shardId;
   }
 
   // timeoutms is the timeout for the first call to get the leader - there is then
   // a longer wait to make sure that leader matches our local state
   private String getLeader(final CloudDescriptor cloudDesc, int timeoutms) {
-    
+
     String collection = cloudDesc.getCollectionName();
     String shardId = cloudDesc.getShardId();
     // rather than look in the cluster state file, we go straight to the zknodes
@@ -922,14 +937,14 @@ public final class ZkController {
     try {
       leaderUrl = getLeaderProps(collection, cloudDesc.getShardId(), timeoutms)
           .getCoreUrl();
-      
+
       // now wait until our currently cloud state contains the latest leader
       String clusterStateLeaderUrl = zkStateReader.getLeaderUrl(collection,
           shardId, timeoutms * 2); // since we found it in zk, we are willing to
-                                   // wait a while to find it in state
+      // wait a while to find it in state
       int tries = 0;
       final long msInSec = 1000L;
-      int maxTries = (int)Math.floor(leaderConflictResolveWait/msInSec);
+      int maxTries = (int) Math.floor(leaderConflictResolveWait / msInSec);
       while (!leaderUrl.equals(clusterStateLeaderUrl)) {
         if (tries > maxTries) {
           throw new SolrException(ErrorCode.SERVER_ERROR,
@@ -940,7 +955,7 @@ public final class ZkController {
         tries++;
         if (tries % 30 == 0) {
           String warnMsg = String.format(Locale.ENGLISH, "Still seeing conflicting information about the leader "
-              + "of shard %s for collection %s after %d seconds; our state says %s, but ZooKeeper says %s",
+                  + "of shard %s for collection %s after %d seconds; our state says %s, but ZooKeeper says %s",
               cloudDesc.getShardId(), collection, tries, clusterStateLeaderUrl, leaderUrl);
           log.warn(warnMsg);
         }
@@ -950,30 +965,30 @@ public final class ZkController {
         leaderUrl = getLeaderProps(collection, cloudDesc.getShardId(), timeoutms)
             .getCoreUrl();
       }
-      
+
     } catch (Exception e) {
       log.error("Error getting leader from zk", e);
       throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
           "Error getting leader from zk for shard " + shardId, e);
-    } 
+    }
     return leaderUrl;
   }
-  
+
   /**
    * Get leader props directly from zk nodes.
    */
   public ZkCoreNodeProps getLeaderProps(final String collection,
-      final String slice, int timeoutms) throws InterruptedException {
+                                        final String slice, int timeoutms) throws InterruptedException {
     return getLeaderProps(collection, slice, timeoutms, false);
   }
-  
+
   /**
    * Get leader props directly from zk nodes.
-   * 
+   *
    * @return leader props
    */
   public ZkCoreNodeProps getLeaderProps(final String collection,
-      final String slice, int timeoutms, boolean failImmediatelyOnExpiration) throws InterruptedException {
+                                        final String slice, int timeoutms, boolean failImmediatelyOnExpiration) throws InterruptedException {
     int iterCount = timeoutms / 1000;
     Exception exp = null;
     while (iterCount-- > 0) {
@@ -992,7 +1007,7 @@ public final class ZkController {
         }
         exp = e;
         Thread.sleep(1000);
-      }  catch (Exception e) {
+      } catch (Exception e) {
         exp = e;
         Thread.sleep(1000);
       }
@@ -1009,24 +1024,24 @@ public final class ZkController {
     // look for old context - if we find it, cancel it
     String collection = cd.getCloudDescriptor().getCollectionName();
     final String coreNodeName = cd.getCloudDescriptor().getCoreNodeName();
-    
+
     ContextKey contextKey = new ContextKey(collection, coreNodeName);
-    
+
     ElectionContext prevContext = electionContexts.get(contextKey);
-    
+
     if (prevContext != null) {
       prevContext.cancelElection();
     }
-    
+
     String shardId = cd.getCloudDescriptor().getShardId();
-    
-    Map<String,Object> props = new HashMap<>();
+
+    Map<String, Object> props = new HashMap<>();
     // we only put a subset of props into the leader node
     props.put(ZkStateReader.BASE_URL_PROP, getBaseUrl());
     props.put(ZkStateReader.CORE_NAME_PROP, cd.getName());
     props.put(ZkStateReader.NODE_NAME_PROP, getNodeName());
-    
- 
+
+
     ZkNodeProps ourProps = new ZkNodeProps(props);
 
 
@@ -1043,39 +1058,39 @@ public final class ZkController {
    * Returns whether or not a recovery was started
    */
   private boolean checkRecovery(String coreName, final CoreDescriptor desc,
-      boolean recoverReloadedCores, final boolean isLeader,
-      final CloudDescriptor cloudDesc, final String collection,
-      final String shardZkNodeName, String shardId, ZkNodeProps leaderProps,
-      SolrCore core, CoreContainer cc) {
+                                boolean recoverReloadedCores, final boolean isLeader,
+                                final CloudDescriptor cloudDesc, final String collection,
+                                final String shardZkNodeName, String shardId, ZkNodeProps leaderProps,
+                                SolrCore core, CoreContainer cc) {
     if (SKIP_AUTO_RECOVERY) {
       log.warn("Skipping recovery according to sys prop solrcloud.skip.autorecovery");
       return false;
     }
     boolean doRecovery = true;
     if (!isLeader) {
-      
+
       if (core.isReloaded() && !recoverReloadedCores) {
         doRecovery = false;
       }
-      
+
       if (doRecovery) {
         log.info("Core needs to recover:" + core.getName());
         core.getUpdateHandler().getSolrCoreState().doRecovery(cc, core.getCoreDescriptor());
         return true;
       }
-      
+
       // see if the leader told us to recover
       String lirState = getLeaderInitiatedRecoveryState(collection, shardId,
           core.getCoreDescriptor().getCloudDescriptor().getCoreNodeName());
       if (ZkStateReader.DOWN.equals(lirState)) {
-        log.info("Leader marked core "+core.getName()+" down; starting recovery process");
+        log.info("Leader marked core " + core.getName() + " down; starting recovery process");
         core.getUpdateHandler().getSolrCoreState().doRecovery(cc, core.getCoreDescriptor());
-        return true;        
+        return true;
       }
     } else {
       log.info("I am the leader, no recovery necessary");
     }
-    
+
     return false;
   }
 
@@ -1087,11 +1102,11 @@ public final class ZkController {
   public void publish(final CoreDescriptor cd, final String state) throws KeeperException, InterruptedException {
     publish(cd, state, true);
   }
-  
+
   public void publish(final CoreDescriptor cd, final String state, boolean updateLastState) throws KeeperException, InterruptedException {
     publish(cd, state, updateLastState, false);
   }
-  
+
   /**
    * Publish core state to overseer.
    */
@@ -1104,81 +1119,92 @@ public final class ZkController {
       }
     }
     String collection = cd.getCloudDescriptor().getCollectionName();
-    log.info("publishing core={} state={} collection={}", cd.getName(), state, collection);
-    //System.out.println(Thread.currentThread().getStackTrace()[3]);
-    Integer numShards = cd.getCloudDescriptor().getNumShards();
-    if (numShards == null) { //XXX sys prop hack
-      log.info("numShards not found on descriptor - reading it from system property");
-      numShards = Integer.getInteger(ZkStateReader.NUM_SHARDS_PROP);
-    }
-    
-    assert collection != null && collection.length() > 0;
-    
-    String shardId = cd.getCloudDescriptor().getShardId();
-    String coreNodeName = cd.getCloudDescriptor().getCoreNodeName();    
-    // If the leader initiated recovery, then verify that this replica has performed
-    // recovery as requested before becoming active; don't even look at lirState if going down
-    if (!ZkStateReader.DOWN.equals(state)) {
-      String lirState = getLeaderInitiatedRecoveryState(collection, shardId, coreNodeName);
-      if (lirState != null) {
-        if ("active".equals(state)) {
-          // trying to become active, so leader-initiated state must be recovering
-          if (ZkStateReader.RECOVERING.equals(lirState)) {
-            updateLeaderInitiatedRecoveryState(collection, shardId, coreNodeName, ZkStateReader.ACTIVE);
-          } else if (ZkStateReader.DOWN.equals(lirState)) {
-            throw new SolrException(ErrorCode.INVALID_STATE, 
-                "Cannot publish state of core '"+cd.getName()+"' as active without recovering first!");
-          }
-        } else if (ZkStateReader.RECOVERING.equals(state)) {
-          // if it is currently DOWN, then trying to enter into recovering state is good
-          if (ZkStateReader.DOWN.equals(lirState)) {
-            updateLeaderInitiatedRecoveryState(collection, shardId, coreNodeName, ZkStateReader.RECOVERING);
+
+    Map previousMDCContext = MDC.getCopyOfContextMap();
+    MDCUtils.setCollection(collection);
+
+    try {
+      if (cd != null && cd.getName() != null)
+        MDCUtils.setCore(cd.getName());
+      log.info("publishing core={} state={} collection={}", cd.getName(), state, collection);
+      //System.out.println(Thread.currentThread().getStackTrace()[3]);
+      Integer numShards = cd.getCloudDescriptor().getNumShards();
+      if (numShards == null) { //XXX sys prop hack
+        log.info("numShards not found on descriptor - reading it from system property");
+        numShards = Integer.getInteger(ZkStateReader.NUM_SHARDS_PROP);
+      }
+
+      assert collection != null && collection.length() > 0;
+
+      String shardId = cd.getCloudDescriptor().getShardId();
+      MDCUtils.setShard(shardId);
+      String coreNodeName = cd.getCloudDescriptor().getCoreNodeName();
+      // If the leader initiated recovery, then verify that this replica has performed
+      // recovery as requested before becoming active; don't even look at lirState if going down
+      if (!ZkStateReader.DOWN.equals(state)) {
+        String lirState = getLeaderInitiatedRecoveryState(collection, shardId, coreNodeName);
+        if (lirState != null) {
+          if (ZkStateReader.ACTIVE.equals(state)) {
+            // trying to become active, so leader-initiated state must be recovering
+            if (ZkStateReader.RECOVERING.equals(lirState)) {
+              updateLeaderInitiatedRecoveryState(collection, shardId, coreNodeName, ZkStateReader.ACTIVE, null, true);
+            } else if (ZkStateReader.DOWN.equals(lirState)) {
+              throw new SolrException(ErrorCode.INVALID_STATE,
+                  "Cannot publish state of core '" + cd.getName() + "' as active without recovering first!");
+            }
+          } else if (ZkStateReader.RECOVERING.equals(state)) {
+            // if it is currently DOWN, then trying to enter into recovering state is good
+            if (ZkStateReader.DOWN.equals(lirState)) {
+              updateLeaderInitiatedRecoveryState(collection, shardId, coreNodeName, ZkStateReader.RECOVERING, null, true);
+            }
           }
         }
       }
-    }
-    
-    Map<String, Object> props = new HashMap<String, Object>();
-    props.put(Overseer.QUEUE_OPERATION, "state");
-    props.put(ZkStateReader.STATE_PROP, state);
-    props.put(ZkStateReader.BASE_URL_PROP, getBaseUrl());
-    props.put(ZkStateReader.CORE_NAME_PROP, cd.getName());
-    props.put(ZkStateReader.ROLES_PROP, cd.getCloudDescriptor().getRoles());
-    props.put(ZkStateReader.NODE_NAME_PROP, getNodeName());
-    props.put(ZkStateReader.SHARD_ID_PROP, cd.getCloudDescriptor().getShardId());
-    props.put(ZkStateReader.COLLECTION_PROP, collection);
-    if (numShards != null) {
-      props.put(ZkStateReader.NUM_SHARDS_PROP, numShards.toString());
-    }
-    if (coreNodeName != null) {
-      props.put(ZkStateReader.CORE_NODE_NAME_PROP, coreNodeName);
-    }
-    
-    if (ClusterStateUtil.isAutoAddReplicas(getZkStateReader(), collection)) { 
-      try (SolrCore core = cc.getCore(cd.getName())) {
-        if (core != null && core.getDirectoryFactory().isSharedStorage()) {
-          props.put("dataDir", core.getDataDir());
-          UpdateLog ulog = core.getUpdateHandler().getUpdateLog();
-          if (ulog != null) {
-            props.put("ulogDir", ulog.getLogDir());
+
+      Map<String, Object> props = new HashMap<>();
+      props.put(Overseer.QUEUE_OPERATION, "state");
+      props.put(ZkStateReader.STATE_PROP, state);
+      props.put(ZkStateReader.BASE_URL_PROP, getBaseUrl());
+      props.put(ZkStateReader.CORE_NAME_PROP, cd.getName());
+      props.put(ZkStateReader.ROLES_PROP, cd.getCloudDescriptor().getRoles());
+      props.put(ZkStateReader.NODE_NAME_PROP, getNodeName());
+      props.put(ZkStateReader.SHARD_ID_PROP, cd.getCloudDescriptor().getShardId());
+      props.put(ZkStateReader.COLLECTION_PROP, collection);
+      if (numShards != null) {
+        props.put(ZkStateReader.NUM_SHARDS_PROP, numShards.toString());
+      }
+      if (coreNodeName != null) {
+        props.put(ZkStateReader.CORE_NODE_NAME_PROP, coreNodeName);
+      }
+
+      if (ClusterStateUtil.isAutoAddReplicas(getZkStateReader(), collection)) {
+        try (SolrCore core = cc.getCore(cd.getName())) {
+          if (core != null && core.getDirectoryFactory().isSharedStorage()) {
+            props.put("dataDir", core.getDataDir());
+            UpdateLog ulog = core.getUpdateHandler().getUpdateLog();
+            if (ulog != null) {
+              props.put("ulogDir", ulog.getLogDir());
+            }
           }
         }
       }
+
+      ZkNodeProps m = new ZkNodeProps(props);
+
+      if (updateLastState) {
+        cd.getCloudDescriptor().lastPublished = state;
+      }
+      overseerJobQueue.offer(ZkStateReader.toJSON(m));
+    } finally {
+      MDCUtils.cleanupMDC(previousMDCContext);
     }
-    
-    ZkNodeProps m = new ZkNodeProps(props);
-    
-    if (updateLastState) {
-      cd.getCloudDescriptor().lastPublished = state;
-    }
-    overseerJobQueue.offer(ZkStateReader.toJSON(m));
   }
-  
+
   private boolean needsToBeAssignedShardId(final CoreDescriptor desc,
-      final ClusterState state, final String coreNodeName) {
+                                           final ClusterState state, final String coreNodeName) {
 
     final CloudDescriptor cloudDesc = desc.getCloudDescriptor();
-    
+
     final String shardId = state.getShardId(getNodeName(), desc.getName());
 
     if (shardId != null) {
@@ -1193,14 +1219,14 @@ public final class ZkController {
     final String coreNodeName = cd.getCloudDescriptor().getCoreNodeName();
     final String collection = cd.getCloudDescriptor().getCollectionName();
     assert collection != null;
-    
+
     if (collection == null || collection.trim().length() == 0) {
       log.error("No collection was specified.");
       return;
     }
-    
+
     ElectionContext context = electionContexts.remove(new ContextKey(collection, coreNodeName));
-    
+
     if (context != null) {
       context.cancelElection();
     }
@@ -1209,7 +1235,7 @@ public final class ZkController {
     boolean removeWatch = true;
     // if there is no SolrCore which is a member of this collection, remove the watch
     for (SolrCore solrCore : cc.getCores()) {
-      if (((ZkSolrResourceLoader)solrCore.getResourceLoader()).getConfigSetZkPath().equals(configLocation))
+      if (((ZkSolrResourceLoader) solrCore.getResourceLoader()).getConfigSetZkPath().equals(configLocation))
         configLocation = null; //if a core uses this config dir , then set it to null
 
 
@@ -1217,7 +1243,7 @@ public final class ZkController {
           .getCloudDescriptor();
       if (cloudDesc != null
           && cloudDescriptor.getCollectionName().equals(
-              cloudDesc.getCollectionName())) {
+          cloudDesc.getCollectionName())) {
         removeWatch = false;
         break;
       }
@@ -1230,14 +1256,14 @@ public final class ZkController {
         ZkStateReader.CORE_NODE_NAME_PROP, coreNodeName);
     overseerJobQueue.offer(ZkStateReader.toJSON(m));
 
-    if(configLocation != null) {
+    if (configLocation != null) {
       synchronized (confDirectoryListeners) {
-        log.info("This conf directory is no more watched {0}",configLocation);
+        log.info("This conf directory is no more watched {0}", configLocation);
         confDirectoryListeners.remove(configLocation);
       }
     }
   }
-  
+
   public void createCollection(String collection) throws KeeperException,
       InterruptedException {
     ZkNodeProps m = new ZkNodeProps(Overseer.QUEUE_OPERATION,
@@ -1253,20 +1279,20 @@ public final class ZkController {
 
   public void createCollectionZkNode(CloudDescriptor cd) {
     String collection = cd.getCollectionName();
-    
+
     log.info("Check for collection zkNode:" + collection);
     String collectionPath = ZkStateReader.COLLECTIONS_ZKNODE + "/" + collection;
-    
+
     try {
-      if(!zkClient.exists(collectionPath, true)) {
+      if (!zkClient.exists(collectionPath, true)) {
         log.info("Creating collection in ZooKeeper:" + collection);
-       SolrParams params = cd.getParams();
+        SolrParams params = cd.getParams();
 
         try {
-          Map<String,Object> collectionProps = new HashMap<>();
+          Map<String, Object> collectionProps = new HashMap<>();
 
           // TODO: if collection.configName isn't set, and there isn't already a conf in zk, just use that?
-          String defaultConfigName = System.getProperty(COLLECTION_PARAM_PREFIX+CONFIGNAME_PROP, collection);
+          String defaultConfigName = System.getProperty(COLLECTION_PARAM_PREFIX + CONFIGNAME_PROP, collection);
 
           // params passed in - currently only done via core admin (create core commmand).
           if (params != null) {
@@ -1283,8 +1309,8 @@ public final class ZkController {
               // TODO: getting the configName from the collectionPath should fail since we already know it doesn't exist?
               getConfName(collection, collectionPath, collectionProps);
             }
-            
-          } else if(System.getProperty("bootstrap_confdir") != null) {
+
+          } else if (System.getProperty("bootstrap_confdir") != null) {
             // if we are bootstrapping a collection, default the config for
             // a new collection to the collection we are bootstrapping
             log.info("Setting config for collection:" + collection + " to " + defaultConfigName);
@@ -1292,17 +1318,17 @@ public final class ZkController {
             Properties sysProps = System.getProperties();
             for (String sprop : System.getProperties().stringPropertyNames()) {
               if (sprop.startsWith(COLLECTION_PARAM_PREFIX)) {
-                collectionProps.put(sprop.substring(COLLECTION_PARAM_PREFIX.length()), sysProps.getProperty(sprop));                
+                collectionProps.put(sprop.substring(COLLECTION_PARAM_PREFIX.length()), sysProps.getProperty(sprop));
               }
             }
-            
+
             // if the config name wasn't passed in, use the default
             if (!collectionProps.containsKey(CONFIGNAME_PROP))
-              collectionProps.put(CONFIGNAME_PROP,  defaultConfigName);
+              collectionProps.put(CONFIGNAME_PROP, defaultConfigName);
 
           } else if (Boolean.getBoolean("bootstrap_conf")) {
             // the conf name should should be the collection name of this core
-            collectionProps.put(CONFIGNAME_PROP,  cd.getCollectionName());
+            collectionProps.put(CONFIGNAME_PROP, cd.getCollectionName());
           } else {
             getConfName(collection, collectionPath, collectionProps);
           }
@@ -1321,24 +1347,23 @@ public final class ZkController {
       } else {
         log.info("Collection zkNode exists");
       }
-      
+
     } catch (KeeperException e) {
       // it's okay if another beats us creating the node
       if (e.code() == KeeperException.Code.NODEEXISTS) {
         return;
       }
       throw new SolrException(ErrorCode.SERVER_ERROR, "Error creating collection node in Zookeeper", e);
-    }
-    catch (InterruptedException e) {
+    } catch (InterruptedException e) {
       Thread.interrupted();
       throw new SolrException(ErrorCode.SERVER_ERROR, "Error creating collection node in Zookeeper", e);
     }
-    
+
   }
 
 
   private void getConfName(String collection, String collectionPath,
-      Map<String,Object> collectionProps) throws KeeperException,
+                           Map<String, Object> collectionProps) throws KeeperException,
       InterruptedException {
     // check for configName
     log.info("Looking for collection configName");
@@ -1352,7 +1377,7 @@ public final class ZkController {
           break;
         }
       }
-     
+
       // if there is only one conf, use that
       try {
         configNames = zkClient.getChildren(ZkConfigManager.CONFIGS_ZKNODE, null,
@@ -1363,16 +1388,16 @@ public final class ZkController {
       if (configNames != null && configNames.size() == 1) {
         // no config set named, but there is only 1 - use it
         log.info("Only one config set found in zk - using it:" + configNames.get(0));
-        collectionProps.put(CONFIGNAME_PROP,  configNames.get(0));
+        collectionProps.put(CONFIGNAME_PROP, configNames.get(0));
         break;
       }
-      
+
       if (configNames != null && configNames.contains(collection)) {
         log.info("Could not find explicit collection configName, but found config name matching collection name - using that set.");
-        collectionProps.put(CONFIGNAME_PROP,  collection);
+        collectionProps.put(CONFIGNAME_PROP, collection);
         break;
       }
-      
+
       log.info("Could not find collection configName - pausing for 3 seconds and trying again - try: " + retry);
       Thread.sleep(3000);
     }
@@ -1383,7 +1408,7 @@ public final class ZkController {
           "Could not find configName for collection " + collection + " found:" + configNames);
     }
   }
-  
+
   public ZkStateReader getZkStateReader() {
     return zkStateReader;
   }
@@ -1404,17 +1429,17 @@ public final class ZkController {
     int retryCount = 320;
     log.info("look for our core node name");
     while (retryCount-- > 0) {
-      Map<String,Slice> slicesMap = zkStateReader.getClusterState()
+      Map<String, Slice> slicesMap = zkStateReader.getClusterState()
           .getSlicesMap(descriptor.getCloudDescriptor().getCollectionName());
       if (slicesMap != null) {
-        
+
         for (Slice slice : slicesMap.values()) {
           for (Replica replica : slice.getReplicas()) {
             // TODO: for really large clusters, we could 'index' on this
-            
+
             String nodeName = replica.getStr(ZkStateReader.NODE_NAME_PROP);
             String core = replica.getStr(ZkStateReader.CORE_NAME_PROP);
-            
+
             String msgNodeName = getNodeName();
             String msgCore = descriptor.getName();
 
@@ -1449,23 +1474,23 @@ public final class ZkController {
         Thread.currentThread().interrupt();
       }
     }
-    
+
     throw new SolrException(ErrorCode.SERVER_ERROR,
         "Could not get shard id for core: " + cd.getName());
   }
 
 
-  public String getCoreNodeName(CoreDescriptor descriptor){
+  public String getCoreNodeName(CoreDescriptor descriptor) {
     String coreNodeName = descriptor.getCloudDescriptor().getCoreNodeName();
     if (coreNodeName == null && !genericCoreNodeNames) {
       // it's the default
       return getNodeName() + "_" + descriptor.getName();
     }
-    
+
     return coreNodeName;
   }
 
-  public void preRegister(CoreDescriptor cd ) {
+  public void preRegister(CoreDescriptor cd) {
 
     String coreNodeName = getCoreNodeName(cd);
 
@@ -1483,8 +1508,8 @@ public final class ZkController {
 
       publish(cd, ZkStateReader.DOWN, false, true);
       DocCollection collection = zkStateReader.getClusterState().getCollectionOrNull(cd.getCloudDescriptor().getCollectionName());
-      if(collection !=null && collection.getStateFormat()>1  ){
-        log.info("Registering watch for external collection {}",cd.getCloudDescriptor().getCollectionName());
+      if (collection != null && collection.getStateFormat() > 1) {
+        log.info("Registering watch for external collection {}", cd.getCloudDescriptor().getCollectionName());
         zkStateReader.addCollectionWatch(cd.getCloudDescriptor().getCollectionName());
       }
     } catch (KeeperException e) {
@@ -1495,7 +1520,7 @@ public final class ZkController {
       log.error("", e);
       throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR, "", e);
     }
-    
+
     if (cd.getCloudDescriptor().getShardId() == null && needsToBeAssignedShardId(cd, zkStateReader.getClusterState(), coreNodeName)) {
       doGetShardIdAndNodeNameProcess(cd);
     } else {
@@ -1510,21 +1535,33 @@ public final class ZkController {
       CloudDescriptor cloudDesc = cd.getCloudDescriptor();
       String coreNodeName = cloudDesc.getCoreNodeName();
       assert coreNodeName != null;
-      if (cloudDesc.getShardId() == null) throw new SolrException(ErrorCode.SERVER_ERROR ,"No shard id for :" + cd);
+      if (cloudDesc.getShardId() == null) {
+        throw new SolrException(ErrorCode.SERVER_ERROR, "No shard id for :" + cd);
+      }
       long endTime = System.nanoTime() + TimeUnit.NANOSECONDS.convert(3, TimeUnit.SECONDS);
-      String errMessage= null;
-      for (; System.nanoTime()<endTime; ) {
-        Thread.sleep(100);
-        errMessage = null;
+      String errMessage = null;
+      while (System.nanoTime() < endTime) {
         Slice slice = zkStateReader.getClusterState().getSlice(cd.getCollectionName(), cloudDesc.getShardId());
         if (slice == null) {
           errMessage = "Invalid slice : " + cloudDesc.getShardId();
           continue;
         }
-        if (slice.getReplica(coreNodeName) != null) return;
+        if (slice.getReplica(coreNodeName) != null) {
+          Replica replica = slice.getReplica(coreNodeName);
+          String baseUrl = replica.getStr(BASE_URL_PROP);
+          String coreName = replica.getStr(CORE_NAME_PROP);
+          if (baseUrl.equals(this.baseURL) && coreName.equals(cd.getName())) {
+            return;
+          } else {
+            errMessage = "replica with coreNodeName " + coreNodeName + " exists but with a different name or base_url";
+          }
+        }
+        Thread.sleep(100);
       }
-      if(errMessage == null)  errMessage = " no_such_replica in clusterstate ,replicaName :  " + coreNodeName;
-      throw new SolrException(ErrorCode.SERVER_ERROR,errMessage + "state : "+ zkStateReader.getClusterState().getCollection(cd.getCollectionName()));
+      if (errMessage == null) {
+        errMessage = "replica " + coreNodeName + " is not present in cluster state";
+      }
+      throw new SolrException(ErrorCode.SERVER_ERROR, errMessage + ". state : " + zkStateReader.getClusterState().getCollection(cd.getCollectionName()));
     }
   }
 
@@ -1534,7 +1571,7 @@ public final class ZkController {
     String collection = cloudDesc.getCollectionName();
     String shard = cloudDesc.getShardId();
     ZkCoreNodeProps leaderProps = null;
-    
+
     int retries = 6;
     for (int i = 0; i < retries; i++) {
       try {
@@ -1542,7 +1579,7 @@ public final class ZkController {
           throw new SolrException(ErrorCode.SERVICE_UNAVAILABLE,
               "We have been closed");
         }
-        
+
         // go straight to zk, not the cloud state - we must have current info
         leaderProps = getLeaderProps(collection, shard, 30000);
         break;
@@ -1565,28 +1602,28 @@ public final class ZkController {
     String myCoreNodeName = cloudDesc.getCoreNodeName();
     String myCoreName = descriptor.getName();
     String ourUrl = ZkCoreNodeProps.getCoreUrl(getBaseUrl(), myCoreName);
-    
+
     boolean isLeader = leaderProps.getCoreUrl().equals(ourUrl);
     if (!isLeader && !SKIP_AUTO_RECOVERY) {
-      
+
       // detect if this core is in leader-initiated recovery and if so, 
       // then we don't need the leader to wait on seeing the down state
       String lirState = null;
       try {
         lirState = getLeaderInitiatedRecoveryState(collection, shard, myCoreNodeName);
       } catch (Exception exc) {
-        log.error("Failed to determine if replica "+myCoreNodeName+
-            " is in leader-initiated recovery due to: "+exc, exc);
+        log.error("Failed to determine if replica " + myCoreNodeName +
+            " is in leader-initiated recovery due to: " + exc, exc);
       }
-      
+
       if (lirState != null) {
-        log.info("Replica "+myCoreNodeName+
+        log.info("Replica " + myCoreNodeName +
             " is already in leader-initiated recovery, so not waiting for leader to see down state.");
       } else {
-        
-        log.info("Replica "+myCoreNodeName+
+
+        log.info("Replica " + myCoreNodeName +
             " NOT in leader-initiated recovery, need to wait for leader to see down state.");
-            
+
         try (HttpSolrClient client = new HttpSolrClient(leaderBaseUrl)) {
           client.setConnectionTimeout(15000);
           client.setSoTimeout(120000);
@@ -1595,7 +1632,7 @@ public final class ZkController {
           prepCmd.setNodeName(getNodeName());
           prepCmd.setCoreNodeName(coreZkNodeName);
           prepCmd.setState(ZkStateReader.DOWN);
-          
+
           // let's retry a couple times - perhaps the leader just went down,
           // or perhaps he is just not quite ready for us yet
           retries = 6;
@@ -1620,8 +1657,8 @@ public final class ZkController {
                 // if there was a communication error talking to the leader, see if the leader is even alive
                 if (!zkStateReader.getClusterState().liveNodesContain(leaderProps.getNodeName())) {
                   throw new SolrException(ErrorCode.SERVICE_UNAVAILABLE,
-                      "Node "+leaderProps.getNodeName()+" hosting leader for "+
-                          shard+" in "+collection+" is not live!");
+                      "Node " + leaderProps.getNodeName() + " hosting leader for " +
+                          shard + " in " + collection + " is not live!");
                 }
               }
 
@@ -1645,7 +1682,7 @@ public final class ZkController {
     }
     return leaderProps;
   }
-  
+
   public static void linkConfSet(SolrZkClient zkClient, String collection, String confSetName) throws KeeperException, InterruptedException {
     String path = ZkStateReader.COLLECTIONS_ZKNODE + "/" + collection;
     if (log.isInfoEnabled()) {
@@ -1675,21 +1712,21 @@ public final class ZkController {
     }
     // we found existing data, let's update it
     ZkNodeProps props = null;
-    if(data != null) {
+    if (data != null) {
       props = ZkNodeProps.load(data);
-      Map<String,Object> newProps = new HashMap<>();
+      Map<String, Object> newProps = new HashMap<>();
       newProps.putAll(props.getProperties());
       newProps.put(CONFIGNAME_PROP, confSetName);
       props = new ZkNodeProps(newProps);
     } else {
       props = new ZkNodeProps(CONFIGNAME_PROP, confSetName);
     }
-    
+
     // TODO: we should consider using version
     zkClient.setData(path, ZkStateReader.toJSON(props), true);
 
   }
-  
+
   /**
    * If in SolrCloud mode, upload config sets for each SolrCore in solr.xml.
    */
@@ -1699,7 +1736,7 @@ public final class ZkController {
 
     //List<String> allCoreNames = cfg.getAllCoreNames();
     List<CoreDescriptor> cds = cc.getCoresLocator().discover(cc);
-    
+
     log.info("bootstrapping config for " + cds.size() + " cores into ZooKeeper using solr.xml from " + solrHome);
 
     for (CoreDescriptor cd : cds) {
@@ -1733,7 +1770,7 @@ public final class ZkController {
   public DistributedMap getOverseerFailureMap() {
     return overseerFailureMap;
   }
-  
+
   public int getClientTimeout() {
     return clientTimeout;
   }
@@ -1745,12 +1782,12 @@ public final class ZkController {
   public LeaderElector getOverseerElector() {
     return overseerElector;
   }
-  
+
   /**
    * Returns the nodeName that should be used based on the specified properties.
    *
-   * @param hostName - must not be null or the empty string
-   * @param hostPort - must consist only of digits, must not be null or the empty string
+   * @param hostName    - must not be null or the empty string
+   * @param hostPort    - must consist only of digits, must not be null or the empty string
    * @param hostContext - should not begin or end with a slash (leading/trailin slashes will be ignored), must not be null, may be the empty string to denote the root context
    * @lucene.experimental
    * @see ZkStateReader#getBaseUrlForNodeName
@@ -1759,56 +1796,56 @@ public final class ZkController {
                                  final String hostPort,
                                  final String hostContext) {
     try {
-      return hostName + ':' + hostPort + '_' + 
-        URLEncoder.encode(trimLeadingAndTrailingSlashes(hostContext), "UTF-8");
+      return hostName + ':' + hostPort + '_' +
+          URLEncoder.encode(trimLeadingAndTrailingSlashes(hostContext), "UTF-8");
     } catch (UnsupportedEncodingException e) {
       throw new Error("JVM Does not seem to support UTF-8", e);
     }
   }
-  
+
   /**
-   * Utility method for trimming and leading and/or trailing slashes from 
-   * its input.  May return the empty string.  May return null if and only 
+   * Utility method for trimming and leading and/or trailing slashes from
+   * its input.  May return the empty string.  May return null if and only
    * if the input is null.
    */
   public static String trimLeadingAndTrailingSlashes(final String in) {
     if (null == in) return in;
-    
+
     String out = in;
     if (out.startsWith("/")) {
       out = out.substring(1);
     }
     if (out.endsWith("/")) {
-      out = out.substring(0,out.length()-1);
+      out = out.substring(0, out.length() - 1);
     }
     return out;
   }
 
   public void rejoinOverseerElection(String electionNode, boolean joinAtHead) {
     try {
-      if(electionNode !=null){
+      if (electionNode != null) {
         //this call is from inside the JVM  . not from CoreAdminHandler
-        if(overseerElector.getContext() == null || overseerElector.getContext().leaderSeqPath == null){
+        if (overseerElector.getContext() == null || overseerElector.getContext().leaderSeqPath == null) {
           overseerElector.retryElection(new OverseerElectionContext(zkClient,
               overseer, getNodeName()), joinAtHead);
           return;
         }
-        if(!overseerElector.getContext().leaderSeqPath.endsWith(electionNode)){
-          log.warn("Asked to rejoin with wrong election node : {}, current node is {}",electionNode, overseerElector.getContext().leaderSeqPath);
+        if (!overseerElector.getContext().leaderSeqPath.endsWith(electionNode)) {
+          log.warn("Asked to rejoin with wrong election node : {}, current node is {}", electionNode, overseerElector.getContext().leaderSeqPath);
           //however delete it . This is possible when the last attempt at deleting the election node failed.
-          if(electionNode.startsWith(getNodeName())){
+          if (electionNode.startsWith(getNodeName())) {
             try {
-              zkClient.delete(OverseerElectionContext.PATH+LeaderElector.ELECTION_NODE+"/"+electionNode,-1,true);
+              zkClient.delete(OverseerElectionContext.PATH + LeaderElector.ELECTION_NODE + "/" + electionNode, -1, true);
             } catch (NoNodeException e) {
               //no problem
-            } catch (InterruptedException e){
+            } catch (InterruptedException e) {
               Thread.currentThread().interrupt();
-            } catch(Exception e) {
-              log.warn("Old election node exists , could not be removed ",e);
+            } catch (Exception e) {
+              log.warn("Old election node exists , could not be removed ", e);
             }
           }
         }
-      }else {
+      } else {
         overseerElector.retryElection(overseerElector.getContext(), joinAtHead);
       }
     } catch (Exception e) {
@@ -1845,50 +1882,52 @@ public final class ZkController {
   public void checkOverseerDesignate() {
     try {
       byte[] data = zkClient.getData(ZkStateReader.ROLES, null, new Stat(), true);
-      if(data ==null) return;
+      if (data == null) return;
       Map roles = (Map) ZkStateReader.fromJSON(data);
-      if(roles ==null) return;
-      List nodeList= (List) roles.get("overseer");
-      if(nodeList == null) return;
-      if(nodeList.contains(getNodeName())){
+      if (roles == null) return;
+      List nodeList = (List) roles.get("overseer");
+      if (nodeList == null) return;
+      if (nodeList.contains(getNodeName())) {
         ZkNodeProps props = new ZkNodeProps(Overseer.QUEUE_OPERATION, CollectionParams.CollectionAction.ADDROLE.toString().toLowerCase(Locale.ROOT),
             "node", getNodeName(),
             "role", "overseer");
-        log.info("Going to add role {} ",props);
+        log.info("Going to add role {} ", props);
         getOverseerCollectionQueue().offer(ZkStateReader.toJSON(props));
       }
-    } catch (NoNodeException nne){
+    } catch (NoNodeException nne) {
       return;
     } catch (Exception e) {
-      log.warn("could not readd the overseer designate ",e);
+      log.warn("could not readd the overseer designate ", e);
     }
   }
 
-  CoreContainer getCoreContainer(){
+  CoreContainer getCoreContainer() {
     return cc;
   }
-      
+
   /**
    * When a leader receives a communication error when trying to send a request to a replica,
    * it calls this method to ensure the replica enters recovery when connectivity is restored.
-   * 
+   * <p>
    * returns true if the node hosting the replica is still considered "live" by ZooKeeper;
    * false means the node is not live either, so no point in trying to send recovery commands
    * to it.
    */
-  public boolean ensureReplicaInLeaderInitiatedRecovery(final String collection, 
-      final String shardId, final String replicaUrl, final ZkCoreNodeProps replicaCoreProps, boolean forcePublishState) 
-          throws KeeperException, InterruptedException 
-  {    
+  public boolean ensureReplicaInLeaderInitiatedRecovery(
+      final String collection, final String shardId, final ZkCoreNodeProps replicaCoreProps,
+      String leaderCoreNodeName, boolean forcePublishState, boolean retryOnConnLoss)
+      throws KeeperException, InterruptedException {
+    final String replicaUrl = replicaCoreProps.getCoreUrl();
+
     if (collection == null)
-      throw new IllegalArgumentException("collection parameter cannot be null for starting leader-initiated recovery for replica: "+replicaUrl);
+      throw new IllegalArgumentException("collection parameter cannot be null for starting leader-initiated recovery for replica: " + replicaUrl);
 
     if (shardId == null)
-      throw new IllegalArgumentException("shard parameter cannot be null for starting leader-initiated recovery for replica: "+replicaUrl);
-    
+      throw new IllegalArgumentException("shard parameter cannot be null for starting leader-initiated recovery for replica: " + replicaUrl);
+
     if (replicaUrl == null)
       throw new IllegalArgumentException("replicaUrl parameter cannot be null for starting leader-initiated recovery");
-    
+
     // First, determine if this replica is already in recovery handling
     // which is needed because there can be many concurrent errors flooding in
     // about the same replica having trouble and we only need to send the "needs"
@@ -1896,10 +1935,10 @@ public final class ZkController {
     boolean nodeIsLive = true;
     boolean publishDownState = false;
     String replicaNodeName = replicaCoreProps.getNodeName();
-    String replicaCoreNodeName = ((Replica)replicaCoreProps.getNodeProps()).getName();
-    assert replicaCoreNodeName != null : "No core name for replica "+replicaNodeName;
+    String replicaCoreNodeName = ((Replica) replicaCoreProps.getNodeProps()).getName();
+    assert replicaCoreNodeName != null : "No core name for replica " + replicaNodeName;
     synchronized (replicasInLeaderInitiatedRecovery) {
-      if (replicasInLeaderInitiatedRecovery.containsKey(replicaUrl)) {     
+      if (replicasInLeaderInitiatedRecovery.containsKey(replicaUrl)) {
         if (!forcePublishState) {
           log.debug("Replica {} already in leader-initiated recovery handling.", replicaUrl);
           return false; // already in this recovery process
@@ -1910,12 +1949,12 @@ public final class ZkController {
       // we only really need to try to send the recovery command if the node itself is "live"
       if (getZkStateReader().getClusterState().liveNodesContain(replicaNodeName)) {
         // create a znode that requires the replica needs to "ack" to verify it knows it was out-of-sync
-        updateLeaderInitiatedRecoveryState(collection, shardId, replicaCoreNodeName, ZkStateReader.DOWN);
+        updateLeaderInitiatedRecoveryState(collection, shardId, replicaCoreNodeName, ZkStateReader.DOWN, leaderCoreNodeName, retryOnConnLoss);
         replicasInLeaderInitiatedRecovery.put(replicaUrl,
             getLeaderInitiatedRecoveryZnodePath(collection, shardId, replicaCoreNodeName));
-        log.info("Put replica core={} coreNodeName={} on "+
-          replicaNodeName+" into leader-initiated recovery.", replicaCoreProps.getCoreName(), replicaCoreNodeName);
-        publishDownState = true;        
+        log.info("Put replica core={} coreNodeName={} on " +
+            replicaNodeName + " into leader-initiated recovery.", replicaCoreProps.getCoreName(), replicaCoreNodeName);
+        publishDownState = true;
       } else {
         nodeIsLive = false; // we really don't need to send the recovery request if the node is NOT live
         log.info("Node " + replicaNodeName +
@@ -1923,23 +1962,23 @@ public final class ZkController {
             replicaCoreProps.getCoreName(), replicaCoreNodeName);
         // publishDownState will be false to avoid publishing the "down" state too many times
         // as many errors can occur together and will each call into this method (SOLR-6189)        
-      }      
-    }    
-    
+      }
+    }
+
     if (publishDownState || forcePublishState) {
-      String replicaCoreName = replicaCoreProps.getCoreName();    
-      ZkNodeProps m = new ZkNodeProps(Overseer.QUEUE_OPERATION, "state", 
-          ZkStateReader.STATE_PROP, ZkStateReader.DOWN, 
-          ZkStateReader.BASE_URL_PROP, replicaCoreProps.getBaseUrl(), 
+      String replicaCoreName = replicaCoreProps.getCoreName();
+      ZkNodeProps m = new ZkNodeProps(Overseer.QUEUE_OPERATION, "state",
+          ZkStateReader.STATE_PROP, ZkStateReader.DOWN,
+          ZkStateReader.BASE_URL_PROP, replicaCoreProps.getBaseUrl(),
           ZkStateReader.CORE_NAME_PROP, replicaCoreProps.getCoreName(),
           ZkStateReader.NODE_NAME_PROP, replicaCoreProps.getNodeName(),
           ZkStateReader.SHARD_ID_PROP, shardId,
           ZkStateReader.COLLECTION_PROP, collection);
-      log.warn("Leader is publishing core={} coreNodeName ={} state={} on behalf of un-reachable replica {}; forcePublishState? "+forcePublishState,
+      log.warn("Leader is publishing core={} coreNodeName ={} state={} on behalf of un-reachable replica {}; forcePublishState? " + forcePublishState,
           replicaCoreName, replicaCoreNodeName, ZkStateReader.DOWN, replicaUrl);
-      overseerJobQueue.offer(ZkStateReader.toJSON(m));      
+      overseerJobQueue.offer(ZkStateReader.toJSON(m));
     }
-    
+
     return nodeIsLive;
   }
 
@@ -1950,23 +1989,23 @@ public final class ZkController {
     }
     return exists;
   }
-  
+
   public void removeReplicaFromLeaderInitiatedRecoveryHandling(String replicaUrl) {
-    synchronized(replicasInLeaderInitiatedRecovery) {
-      replicasInLeaderInitiatedRecovery.remove(replicaUrl);           
+    synchronized (replicasInLeaderInitiatedRecovery) {
+      replicasInLeaderInitiatedRecovery.remove(replicaUrl);
     }
-  }  
-  
+  }
+
   public String getLeaderInitiatedRecoveryState(String collection, String shardId, String coreNodeName) {
-    Map<String,Object> stateObj = getLeaderInitiatedRecoveryStateObject(collection, shardId, coreNodeName);
-    return (stateObj != null) ? (String)stateObj.get("state") : null;
+    Map<String, Object> stateObj = getLeaderInitiatedRecoveryStateObject(collection, shardId, coreNodeName);
+    return (stateObj != null) ? (String) stateObj.get("state") : null;
   }
 
-  public Map<String,Object> getLeaderInitiatedRecoveryStateObject(String collection, String shardId, String coreNodeName) {
+  public Map<String, Object> getLeaderInitiatedRecoveryStateObject(String collection, String shardId, String coreNodeName) {
 
     if (collection == null || shardId == null || coreNodeName == null)
       return null; // if we don't have complete data about a core in cloud mode, return null
-    
+
     String znodePath = getLeaderInitiatedRecoveryZnodePath(collection, shardId, coreNodeName);
     byte[] stateData = null;
     try {
@@ -1976,26 +2015,26 @@ public final class ZkController {
     } catch (ConnectionLossException | SessionExpiredException cle) {
       // sort of safe to ignore ??? Usually these are seen when the core is going down
       // or there are bigger issues to deal with than reading this znode
-      log.warn("Unable to read "+znodePath+" due to: "+cle);
+      log.warn("Unable to read " + znodePath + " due to: " + cle);
     } catch (Exception exc) {
-      log.error("Failed to read data from znode "+znodePath+" due to: "+exc);
+      log.error("Failed to read data from znode " + znodePath + " due to: " + exc);
       if (exc instanceof SolrException) {
-        throw (SolrException)exc;
+        throw (SolrException) exc;
       } else {
-        throw new SolrException(ErrorCode.SERVER_ERROR, 
-            "Failed to read data from znodePath: "+znodePath, exc);
+        throw new SolrException(ErrorCode.SERVER_ERROR,
+            "Failed to read data from znodePath: " + znodePath, exc);
       }
     }
 
-    Map<String,Object> stateObj = null;
+    Map<String, Object> stateObj = null;
     if (stateData != null && stateData.length > 0) {
       // TODO: Remove later ... this is for upgrading from 4.8.x to 4.10.3 (see: SOLR-6732)
-      if (stateData[0] == (byte)'{') {
+      if (stateData[0] == (byte) '{') {
         Object parsedJson = ZkStateReader.fromJSON(stateData);
         if (parsedJson instanceof Map) {
-          stateObj = (Map<String,Object>)parsedJson;
+          stateObj = (Map<String, Object>) parsedJson;
         } else {
-          throw new SolrException(ErrorCode.SERVER_ERROR, "Leader-initiated recovery state data is invalid! "+parsedJson);
+          throw new SolrException(ErrorCode.SERVER_ERROR, "Leader-initiated recovery state data is invalid! " + parsedJson);
         }
       } else {
         // old format still in ZK
@@ -2005,27 +2044,28 @@ public final class ZkController {
 
     return stateObj;
   }
-  
-  private void updateLeaderInitiatedRecoveryState(String collection, String shardId, String coreNodeName, String state) {
+
+  private void updateLeaderInitiatedRecoveryState(String collection, String shardId, String coreNodeName, String state,
+                                                  String leaderCoreNodeName, boolean retryOnConnLoss) {
     if (collection == null || shardId == null || coreNodeName == null) {
-      log.warn("Cannot set leader-initiated recovery state znode to "+state+" using: collection="+collection+
-          "; shardId="+shardId+"; coreNodeName="+coreNodeName);
+      log.warn("Cannot set leader-initiated recovery state znode to " + state + " using: collection=" + collection +
+          "; shardId=" + shardId + "; coreNodeName=" + coreNodeName);
       return; // if we don't have complete data about a core in cloud mode, do nothing
     }
 
     String znodePath = getLeaderInitiatedRecoveryZnodePath(collection, shardId, coreNodeName);
-    
+
     if (ZkStateReader.ACTIVE.equals(state)) {
       // since we're marking it active, we don't need this znode anymore, so delete instead of update
       try {
         zkClient.delete(znodePath, -1, false);
       } catch (Exception justLogIt) {
-        log.warn("Failed to delete znode "+znodePath+" due to: "+justLogIt);
+        log.warn("Failed to delete znode " + znodePath, justLogIt);
       }
       return;
     }
 
-    Map<String,Object> stateObj = null;
+    Map<String, Object> stateObj = null;
     try {
       stateObj = getLeaderInitiatedRecoveryStateObject(collection, shardId, coreNodeName);
     } catch (Exception exc) {
@@ -2040,44 +2080,82 @@ public final class ZkController {
       stateObj.put("createdByNodeName", String.valueOf(this.nodeName));
 
     byte[] znodeData = ZkStateReader.toJSON(stateObj);
-    boolean retryOnConnLoss = true; // be a little more robust when trying to write data
+
     try {
-      if (zkClient.exists(znodePath, retryOnConnLoss)) {
-        zkClient.setData(znodePath, znodeData, retryOnConnLoss);
+      if (ZkStateReader.DOWN.equals(state)) {
+        markShardAsDownIfLeader(collection, shardId, leaderCoreNodeName, znodePath, znodeData);
       } else {
-        zkClient.makePath(znodePath, znodeData, retryOnConnLoss);
+        if (zkClient.exists(znodePath, true)) {
+          zkClient.setData(znodePath, znodeData, true);
+        } else {
+          zkClient.makePath(znodePath, znodeData, true);
+        }
       }
-      log.info("Wrote "+state+" to "+znodePath);
+      log.info("Wrote " + state + " to " + znodePath);
     } catch (Exception exc) {
       if (exc instanceof SolrException) {
-        throw (SolrException)exc;
+        throw (SolrException) exc;
       } else {
-        throw new SolrException(ErrorCode.SERVER_ERROR, 
-            "Failed to update data to "+state+" for znode: "+znodePath, exc);        
+        throw new SolrException(ErrorCode.SERVER_ERROR,
+            "Failed to update data to " + state + " for znode: " + znodePath, exc);
       }
     }
   }
-  
+
+  /**
+   * we use ZK's multi-transactional semantics to ensure that we are able to
+   * publish a replica as 'down' only if our leader election node still exists
+   * in ZK. This ensures that a long running network partition caused by GC etc
+   * doesn't let us mark a node as down *after* we've already lost our session
+   */
+  private void markShardAsDownIfLeader(String collection, String shardId, String leaderCoreNodeName,
+                                       String znodePath, byte[] znodeData) throws KeeperException, InterruptedException {
+    String leaderSeqPath = getLeaderSeqPath(collection, leaderCoreNodeName);
+    if (leaderSeqPath == null) {
+      throw new SolrException(ErrorCode.SERVER_ERROR,
+          "Failed to update data to 'down' for znode: " + znodePath +
+              " because the zookeeper leader sequence for leader: " + leaderCoreNodeName + " is null");
+    }
+    if (zkClient.exists(znodePath, true)) {
+      List<Op> ops = new ArrayList<>(2);
+      ops.add(Op.check(leaderSeqPath, -1)); // version doesn't matter, the seq path is unique
+      ops.add(Op.setData(znodePath, znodeData, -1));
+      zkClient.multi(ops, true);
+    } else {
+      String parentZNodePath = getLeaderInitiatedRecoveryZnodePath(collection, shardId);
+      try {
+        zkClient.makePath(parentZNodePath, true);
+      } catch (KeeperException.NodeExistsException nee) {
+        // if it exists, that's great!
+      }
+      List<Op> ops = new ArrayList<>(2);
+      ops.add(Op.check(leaderSeqPath, -1)); // version doesn't matter, the seq path is unique
+      ops.add(Op.create(znodePath, znodeData, zkClient.getZkACLProvider().getACLsToAdd(znodePath),
+          CreateMode.PERSISTENT));
+      zkClient.multi(ops, true);
+    }
+  }
+
   public String getLeaderInitiatedRecoveryZnodePath(String collection, String shardId) {
-    return "/collections/"+collection+"/leader_initiated_recovery/"+shardId;
-  }  
-  
+    return "/collections/" + collection + "/leader_initiated_recovery/" + shardId;
+  }
+
   public String getLeaderInitiatedRecoveryZnodePath(String collection, String shardId, String coreNodeName) {
-    return getLeaderInitiatedRecoveryZnodePath(collection, shardId)+"/"+coreNodeName;
+    return getLeaderInitiatedRecoveryZnodePath(collection, shardId) + "/" + coreNodeName;
   }
-  
+
   public void throwErrorIfReplicaReplaced(CoreDescriptor desc) {
-      ClusterState clusterState = getZkStateReader().getClusterState();
-      if (clusterState != null) {
-        DocCollection collection = clusterState.getCollectionOrNull(desc
-            .getCloudDescriptor().getCollectionName());
-        if (collection != null) {
-          boolean autoAddReplicas = ClusterStateUtil.isAutoAddReplicas( getZkStateReader(), collection.getName());   
-          if (autoAddReplicas) {
-            CloudUtil.checkSharedFSFailoverReplaced(cc, desc);
-          }
+    ClusterState clusterState = getZkStateReader().getClusterState();
+    if (clusterState != null) {
+      DocCollection collection = clusterState.getCollectionOrNull(desc
+          .getCloudDescriptor().getCollectionName());
+      if (collection != null) {
+        boolean autoAddReplicas = ClusterStateUtil.isAutoAddReplicas(getZkStateReader(), collection.getName());
+        if (autoAddReplicas) {
+          CloudUtil.checkSharedFSFailoverReplaced(cc, desc);
         }
       }
+    }
   }
 
   /**
@@ -2122,8 +2200,8 @@ public final class ZkController {
               log.warn("could not get stat");
             }
 
-            log.info(MessageFormat.format(errMsg, resourceLocation, znodeVersion));
-            throw new ResourceModifiedInZkException(ErrorCode.CONFLICT, MessageFormat.format(errMsg, resourceLocation, znodeVersion) + ", retry.");
+            log.info(StrUtils.formatString(errMsg, resourceLocation, znodeVersion));
+            throw new ResourceModifiedInZkException(ErrorCode.CONFLICT, StrUtils.formatString(errMsg, resourceLocation, znodeVersion) + ", retry.");
           }
         }
       }
@@ -2137,8 +2215,8 @@ public final class ZkController {
         log.error(e.getMessage());
 
       }
-      log.info(MessageFormat.format(errMsg + " zkVersion= " + v, resourceLocation, znodeVersion));
-      throw new ResourceModifiedInZkException(ErrorCode.CONFLICT, MessageFormat.format(errMsg, resourceLocation, znodeVersion) + ", retry.");
+      log.info(StrUtils.formatString(errMsg + " zkVersion= " + v, resourceLocation, znodeVersion));
+      throw new ResourceModifiedInZkException(ErrorCode.CONFLICT, StrUtils.formatString(errMsg, resourceLocation, znodeVersion) + ", retry.");
     } catch (ResourceModifiedInZkException e) {
       throw e;
     } catch (Exception e) {
@@ -2167,18 +2245,18 @@ public final class ZkController {
     }
   }
 
-  public static  class ResourceModifiedInZkException extends SolrException {
+  public static class ResourceModifiedInZkException extends SolrException {
     public ResourceModifiedInZkException(ErrorCode code, String msg) {
       super(code, msg);
     }
   }
 
   public void unRegisterConfListener(Runnable listener) {
-    if(listener == null) return;
-    synchronized (confDirectoryListeners){
+    if (listener == null) return;
+    synchronized (confDirectoryListeners) {
       for (Set<Runnable> listeners : confDirectoryListeners.values()) {
-        if(listeners != null) {
-          if(listeners.remove(listener)) {
+        if (listeners != null) {
+          if (listeners.remove(listener)) {
             log.info(" a listener was removed because of core close");
           }
         }
@@ -2187,15 +2265,16 @@ public final class ZkController {
 
   }
 
-  /**This will give a callback to the listener whenever a child is modified in the
+  /**
+   * This will give a callback to the listener whenever a child is modified in the
    * conf directory. It is the responsibility of the listener to check if the individual
    * item of interest has been modified.  When the last core which was interested in
    * this conf directory is gone the listeners will be removed automatically.
    */
-  public void registerConfListenerForCore(String confDir,SolrCore core, final Runnable listener){
-    if(listener==null) throw new NullPointerException("listener cannot be null");
-    synchronized (confDirectoryListeners){
-      if(confDirectoryListeners.containsKey(confDir)){
+  public void registerConfListenerForCore(String confDir, SolrCore core, final Runnable listener) {
+    if (listener == null) throw new NullPointerException("listener cannot be null");
+    synchronized (confDirectoryListeners) {
+      if (confDirectoryListeners.containsKey(confDir)) {
         confDirectoryListeners.get(confDir).add(listener);
         core.addCloseHook(new CloseHook() {
           @Override
@@ -2204,76 +2283,95 @@ public final class ZkController {
           }
 
           @Override
-          public void postClose(SolrCore core) { }
+          public void postClose(SolrCore core) {
+          }
         });
       } else {
-        throw new SolrException(ErrorCode.SERVER_ERROR,"This conf directory is not valid");
+        throw new SolrException(ErrorCode.SERVER_ERROR, "This conf directory is not valid");
       }
     }
   }
 
-  private final Map<String , Set<Runnable>> confDirectoryListeners =  new HashMap<>();
+  private final Map<String, Set<Runnable>> confDirectoryListeners = new HashMap<>();
 
   void watchZKConfDir(final String zkDir) {
-    log.info("watch zkdir " + zkDir);
+    log.info("watch zkdir {}" , zkDir);
     if (!confDirectoryListeners.containsKey(zkDir)) {
-      confDirectoryListeners.put(zkDir,  new HashSet<Runnable>());
-      setConfWatcher(zkDir, new WatcherImpl(zkDir));
-
+      confDirectoryListeners.put(zkDir, new HashSet<>());
+      setConfWatcher(zkDir, new WatcherImpl(zkDir), null);
     }
-
-
   }
-  private class WatcherImpl implements Watcher{
-    private final String zkDir ;
+
+  private class WatcherImpl implements Watcher {
+    private final String zkDir;
 
     private WatcherImpl(String dir) {
       this.zkDir = dir;
     }
 
     @Override
-      public void process(WatchedEvent event) {
-        try {
+    public void process(WatchedEvent event) {
+      Stat stat = null;
+      try {
+        stat = zkClient.exists(zkDir, null, true);
+      } catch (KeeperException e) {
+        //ignore , it is not a big deal
+      } catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
+      }
 
-          synchronized (confDirectoryListeners) {
-            // if this is not among directories to be watched then don't set the watcher anymore
-            if( !confDirectoryListeners.containsKey(zkDir)) {
-              log.info("Watcher on {} is removed ", zkDir);
-              return;
-            }
-            Set<Runnable> listeners = confDirectoryListeners.get(zkDir);
-            if (listeners != null && !listeners.isEmpty()) {
-              final Set<Runnable> listenersCopy = new HashSet<>(listeners);
-              new Thread() {
-                //run these in a separate thread because this can be long running
-                public void run() {
-                  for (final Runnable listener : listenersCopy) {
-                    try {
-                      listener.run();
-                    } catch (Exception e) {
-                      log.warn("listener throws error", e);
-                    }
-                  }
-                }
-              }.start();
-            }
+      boolean resetWatcher = false;
+      try {
+        resetWatcher = fireEventListeners(zkDir);
+      } finally {
+        if (Event.EventType.None.equals(event.getType())) {
+          log.info("A node got unwatched for {}", zkDir);
+        } else {
+          if (resetWatcher) setConfWatcher(zkDir, this, stat);
+          else log.info("A node got unwatched for {}", zkDir);
+        }
+      }
+    }
 
-          }
+  }
 
-        } finally {
-          if (Event.EventType.None.equals(event.getType())) {
-            log.info("A node got unwatched for {}", zkDir);
-            return;
-          } else {
-            setConfWatcher(zkDir,this);
+  private boolean fireEventListeners(String zkDir) {
+    synchronized (confDirectoryListeners) {
+      // if this is not among directories to be watched then don't set the watcher anymore
+      if (!confDirectoryListeners.containsKey(zkDir)) {
+        log.info("Watcher on {} is removed ", zkDir);
+        return false;
+      }
+      Set<Runnable> listeners = confDirectoryListeners.get(zkDir);
+      if (listeners != null && !listeners.isEmpty()) {
+        final Set<Runnable> listenersCopy = new HashSet<>(listeners);
+        new Thread() {
+          //run these in a separate thread because this can be long running
+          public void run() {
+            log.info("Running listeners for {}", zkDir);
+            for (final Runnable listener : listenersCopy) {
+              try {
+                listener.run();
+              } catch (Exception e) {
+                log.warn("listener throws error", e);
+              }
+            }
           }
-        }
+        }.start();
       }
+
     }
+    return true;
+  }
 
-  private void setConfWatcher(String zkDir, Watcher watcher) {
+  private void setConfWatcher(String zkDir, Watcher watcher, Stat stat) {
     try {
-      zkClient.exists(zkDir, watcher, true);
+      Stat newStat = zkClient.exists(zkDir, watcher, true);
+      if (stat != null && newStat.getVersion() > stat.getVersion()) {
+        //a race condition where a we missed an even fired
+        //so fire the event listeners
+        fireEventListeners(zkDir);
+      }
     } catch (KeeperException e) {
       log.error("failed to set watcher for conf dir {} ", zkDir);
     } catch (InterruptedException e) {
@@ -2286,13 +2384,19 @@ public final class ZkController {
     return new OnReconnect() {
       @Override
       public void command() {
-        synchronized (confDirectoryListeners){
+        synchronized (confDirectoryListeners) {
           for (String s : confDirectoryListeners.keySet()) {
             watchZKConfDir(s);
+            fireEventListeners(s);
           }
         }
       }
     };
   }
 
+  public String getLeaderSeqPath(String collection, String coreNodeName) {
+    ContextKey key = new ContextKey(collection, coreNodeName);
+    ElectionContext context = electionContexts.get(key);
+    return context != null ? context.leaderSeqPath : null;
+  }
 }