You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by da...@apache.org on 2018/01/06 07:47:25 UTC

[2/2] lucene-solr:jira/solr-11702: SOLR-11702: Support rolling updates

SOLR-11702: Support rolling updates


Project: http://git-wip-us.apache.org/repos/asf/lucene-solr/repo
Commit: http://git-wip-us.apache.org/repos/asf/lucene-solr/commit/917dbc65
Tree: http://git-wip-us.apache.org/repos/asf/lucene-solr/tree/917dbc65
Diff: http://git-wip-us.apache.org/repos/asf/lucene-solr/diff/917dbc65

Branch: refs/heads/jira/solr-11702
Commit: 917dbc6530e698ea7cc92074a28c65d0a25f9cda
Parents: b52186b
Author: Cao Manh Dat <da...@apache.org>
Authored: Sat Jan 6 14:47:10 2018 +0700
Committer: Cao Manh Dat <da...@apache.org>
Committed: Sat Jan 6 14:47:10 2018 +0700

----------------------------------------------------------------------
 .../client/solrj/embedded/JettySolrRunner.java  |  16 +-
 .../org/apache/solr/cloud/ElectionContext.java  | 117 ++++-
 .../cloud/LeaderInitiatedRecoveryThread.java    | 348 ++++++++++++++
 .../org/apache/solr/cloud/ZkController.java     | 359 ++++++++++++++-
 .../org/apache/solr/cloud/ZkShardTerms.java     |   4 +
 .../solr/handler/admin/CollectionsHandler.java  |  10 +
 .../processor/DistributedUpdateProcessor.java   |  36 +-
 .../solr/cloud/LIRRollingUpdatesTest.java       | 458 +++++++++++++++++++
 .../TestLeaderInitiatedRecoveryThread.java      | 243 ++++++++++
 9 files changed, 1563 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/917dbc65/solr/core/src/java/org/apache/solr/client/solrj/embedded/JettySolrRunner.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/client/solrj/embedded/JettySolrRunner.java b/solr/core/src/java/org/apache/solr/client/solrj/embedded/JettySolrRunner.java
index 608b234..4eec058 100644
--- a/solr/core/src/java/org/apache/solr/client/solrj/embedded/JettySolrRunner.java
+++ b/solr/core/src/java/org/apache/solr/client/solrj/embedded/JettySolrRunner.java
@@ -84,7 +84,7 @@ public class JettySolrRunner {
   FilterHolder debugFilter;
 
   private boolean waitOnSolr = false;
-  private int lastPort = -1;
+  private int jettyPort = -1;
 
   private final JettyConfig config;
   private final String solrHome;
@@ -280,8 +280,10 @@ public class JettySolrRunner {
       @Override
       public void lifeCycleStarted(LifeCycle arg0) {
 
-        lastPort = getFirstConnectorPort();
-        nodeProperties.setProperty("hostPort", Integer.toString(lastPort));
+        jettyPort = getFirstConnectorPort();
+        int port = jettyPort;
+        if (proxyPort != -1) port = proxyPort;
+        nodeProperties.setProperty("hostPort", Integer.toString(port));
         nodeProperties.setProperty("hostContext", config.context);
 
         root.getServletContext().setAttribute(SolrDispatchFilter.PROPERTIES_ATTRIBUTE, nodeProperties);
@@ -384,7 +386,7 @@ public class JettySolrRunner {
       // if started before, make a new server
       if (startedBefore) {
         waitOnSolr = false;
-        int port = reusePort ? lastPort : this.config.port;
+        int port = reusePort ? jettyPort : this.config.port;
         init(port);
       } else {
         startedBefore = true;
@@ -456,7 +458,7 @@ public class JettySolrRunner {
     if (0 == conns.length) {
       throw new RuntimeException("Jetty Server has no Connectors");
     }
-    return (proxyPort != -1) ? proxyPort : ((ServerConnector) conns[0]).getLocalPort();
+    return ((ServerConnector) conns[0]).getLocalPort();
   }
   
   /**
@@ -465,10 +467,10 @@ public class JettySolrRunner {
    * @exception RuntimeException if there is no Connector
    */
   public int getLocalPort() {
-    if (lastPort == -1) {
+    if (jettyPort == -1) {
       throw new IllegalStateException("You cannot get the port until this instance has started");
     }
-    return (proxyPort != -1) ? proxyPort : lastPort;
+    return (proxyPort != -1) ? proxyPort : jettyPort;
   }
   
   /**

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/917dbc65/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java b/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java
index d659c73..d6efd1d 100644
--- a/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java
+++ b/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java
@@ -20,6 +20,7 @@ import java.io.Closeable;
 import java.io.IOException;
 import java.lang.invoke.MethodHandles;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.EnumSet;
 import java.util.List;
 import java.util.concurrent.Future;
@@ -312,7 +313,8 @@ final class ShardLeaderElectionContext extends ShardLeaderElectionContextBase {
         }
       }
       coreNodeName = core.getCoreDescriptor().getCloudDescriptor().getCoreNodeName();
-      if (!zkController.getShardTerms(collection, shardId).canBecomeLeader(coreNodeName)) {
+      if (zkController.getShardTerms(collection, shardId).registered(coreNodeName)
+          && !zkController.getShardTerms(collection, shardId).canBecomeLeader(coreNodeName)) {
         // no need to do recovery here, we already has term less than leader term, the recovery will be triggered later
         log.info("Can not become leader, this core has term less than leader's term");
         cancelElection();
@@ -451,6 +453,9 @@ final class ShardLeaderElectionContext extends ShardLeaderElectionContextBase {
       boolean isLeader = true;
       if (!isClosed) {
         try {
+          //TODO remove this on 8.0, SOLR-11812
+          // we must check LIR before registering as leader
+          checkLIR(coreName, allReplicasInLine);
           if (replicaType == Replica.Type.TLOG) {
             // stop replicate from old leader
             zkController.stopReplicationFromLeader(coreName);
@@ -499,6 +504,15 @@ final class ShardLeaderElectionContext extends ShardLeaderElectionContextBase {
           }
         }
 
+        if (isLeader) {
+          // check for any replicas in my shard that were set to down by the previous leader
+          try {
+            startLeaderInitiatedRecoveryOnReplicas(coreName);
+          } catch (Exception exc) {
+            // don't want leader election to fail because of
+            // an error trying to tell others to recover
+          }
+        }
       } else {
         cancelElection();
       }
@@ -528,6 +542,107 @@ final class ShardLeaderElectionContext extends ShardLeaderElectionContextBase {
     return docCollection.getReplica(replicaName);
   }
 
+  //TODO remove this method on 8.0, SOLR-11812
+  public void checkLIR(String coreName, boolean allReplicasInLine)
+      throws InterruptedException, KeeperException, IOException {
+    if (allReplicasInLine) {
+      log.info("Found all replicas participating in election, clear LIR");
+      // SOLR-8075: A bug may allow the proper leader to get marked as LIR DOWN and
+      // if we are marked as DOWN but were able to become the leader, we remove
+      // the DOWN entry here so that we don't fail publishing ACTIVE due to being in LIR.
+      // We only do this if all the replicas participated in the election just in case
+      // this was a valid LIR entry and the proper leader replica is missing.
+      try (SolrCore core = cc.getCore(coreName)) {
+        final Replica.State lirState = zkController.getLeaderInitiatedRecoveryState(collection, shardId,
+            core.getCoreDescriptor().getCloudDescriptor().getCoreNodeName());
+        if (lirState == Replica.State.DOWN) {
+          // We can do this before registering as leader because only setting DOWN requires that
+          // we are already registered as leader, and here we are setting ACTIVE
+          // The fact that we just won the zk leader election provides a quasi lock on setting this state, but
+          // we should improve this: see SOLR-8075 discussion
+          zkController.updateLeaderInitiatedRecoveryState(collection, shardId,
+              leaderProps.getStr(ZkStateReader.CORE_NODE_NAME_PROP), Replica.State.ACTIVE, core.getCoreDescriptor(), true);
+        }
+      }
+
+    } else {
+      try (SolrCore core = cc.getCore(coreName)) {
+        if (core != null) {
+          final Replica.State lirState = zkController.getLeaderInitiatedRecoveryState(collection, shardId,
+              core.getCoreDescriptor().getCloudDescriptor().getCoreNodeName());
+          if (lirState == Replica.State.DOWN || lirState == Replica.State.RECOVERING) {
+            log.warn("The previous leader marked me " + core.getName()
+                + " as " + lirState.toString() + " and I haven't recovered yet, so I shouldn't be the leader.");
+
+            throw new SolrException(ErrorCode.SERVER_ERROR, "Leader Initiated Recovery prevented leadership");
+          }
+        }
+      }
+    }
+  }
+
+  //TODO remove this method on 8.0, SOLR-11812
+  private void startLeaderInitiatedRecoveryOnReplicas(String coreName) throws Exception {
+    try (SolrCore core = cc.getCore(coreName)) {
+      CloudDescriptor cloudDesc = core.getCoreDescriptor().getCloudDescriptor();
+      String coll = cloudDesc.getCollectionName();
+      String shardId = cloudDesc.getShardId();
+      String coreNodeName = cloudDesc.getCoreNodeName();
+
+      if (coll == null || shardId == null) {
+        log.error("Cannot start leader-initiated recovery on new leader (core="+
+            coreName+",coreNodeName=" + coreNodeName + ") because collection and/or shard is null!");
+        return;
+      }
+
+      String znodePath = zkController.getLeaderInitiatedRecoveryZnodePath(coll, shardId);
+      List<String> replicas = null;
+      try {
+        replicas = zkClient.getChildren(znodePath, null, false);
+      } catch (NoNodeException nne) {
+        // this can be ignored
+      }
+
+      if (replicas != null && replicas.size() > 0) {
+        for (String replicaCoreNodeName : replicas) {
+
+          if (coreNodeName.equals(replicaCoreNodeName))
+            continue; // added safe-guard so we don't mark this core as down
+
+          if (zkController.getShardTerms(collection, shardId).registered(replicaCoreNodeName)) {
+            // this replica is registered its term so it is running with the new LIR implementation
+            // we can put this replica into recovery by increase our terms
+            zkController.getShardTerms(collection, shardId).ensureTermsIsHigher(coreNodeName, Collections.singleton(replicaCoreNodeName));
+            continue;
+          }
+
+          final Replica.State lirState = zkController.getLeaderInitiatedRecoveryState(coll, shardId, replicaCoreNodeName);
+          if (lirState == Replica.State.DOWN || lirState == Replica.State.RECOVERY_FAILED) {
+            log.info("After core={} coreNodeName={} was elected leader, a replica coreNodeName={} was found in state: "
+                + lirState.toString() + " and needing recovery.", coreName, coreNodeName, replicaCoreNodeName);
+            List<ZkCoreNodeProps> replicaProps =
+                zkController.getZkStateReader().getReplicaProps(collection, shardId, coreNodeName);
+
+            if (replicaProps != null && replicaProps.size() > 0) {
+              ZkCoreNodeProps coreNodeProps = null;
+              for (ZkCoreNodeProps p : replicaProps) {
+                if (((Replica)p.getNodeProps()).getName().equals(replicaCoreNodeName)) {
+                  coreNodeProps = p;
+                  break;
+                }
+              }
+
+              zkController.ensureReplicaInLeaderInitiatedRecovery(cc,
+                  collection, shardId, coreNodeProps, core.getCoreDescriptor(),
+                  false /* forcePublishState */);
+            }
+          }
+        }
+      }
+    } // core gets closed automagically
+  }
+
+
   // returns true if all replicas are found to be up, false if not
   private boolean waitForReplicasToComeUp(int timeoutms) throws InterruptedException {
     long timeoutAt = System.nanoTime() + TimeUnit.NANOSECONDS.convert(timeoutms, TimeUnit.MILLISECONDS);

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/917dbc65/solr/core/src/java/org/apache/solr/cloud/LeaderInitiatedRecoveryThread.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/LeaderInitiatedRecoveryThread.java b/solr/core/src/java/org/apache/solr/cloud/LeaderInitiatedRecoveryThread.java
new file mode 100644
index 0000000..88baac5
--- /dev/null
+++ b/solr/core/src/java/org/apache/solr/cloud/LeaderInitiatedRecoveryThread.java
@@ -0,0 +1,348 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.cloud;
+
+import org.apache.http.NoHttpResponseException;
+import org.apache.http.conn.ConnectTimeoutException;
+import org.apache.solr.client.solrj.impl.HttpSolrClient;
+import org.apache.solr.client.solrj.request.CoreAdminRequest.RequestRecovery;
+import org.apache.solr.common.SolrException;
+import org.apache.solr.common.SolrException.ErrorCode;
+import org.apache.solr.common.cloud.Replica;
+import org.apache.solr.common.cloud.ZkCoreNodeProps;
+import org.apache.solr.common.cloud.ZkNodeProps;
+import org.apache.solr.common.cloud.ZkStateReader;
+import org.apache.solr.common.params.CoreAdminParams.CoreAdminAction;
+import org.apache.solr.common.util.Utils;
+import org.apache.solr.core.CoreContainer;
+import org.apache.solr.core.CoreDescriptor;
+import org.apache.zookeeper.KeeperException;
+import org.apache.solr.util.RTimer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.invoke.MethodHandles;
+import java.net.ConnectException;
+import java.net.SocketException;
+import java.util.List;
+
+/**
+ * Background daemon thread that tries to send the REQUESTRECOVERY to a downed
+ * replica; used by a shard leader to nag a replica into recovering after the
+ * leader experiences an error trying to send an update request to the replica.
+ */
+//TODO remove this class on 8.0, SOLR-11812
+public class LeaderInitiatedRecoveryThread extends Thread {
+
+  private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+  protected ZkController zkController;
+  protected CoreContainer coreContainer;
+  protected String collection;
+  protected String shardId;
+  protected ZkCoreNodeProps nodeProps;
+  protected int maxTries;
+  private CoreDescriptor leaderCd;
+  
+  public LeaderInitiatedRecoveryThread(ZkController zkController, 
+                                       CoreContainer cc, 
+                                       String collection, 
+                                       String shardId, 
+                                       ZkCoreNodeProps nodeProps,
+                                       int maxTries,
+                                       CoreDescriptor leaderCd)
+  {
+    super("LeaderInitiatedRecoveryThread-"+nodeProps.getCoreName());
+    this.zkController = zkController;
+    this.coreContainer = cc;
+    this.collection = collection;
+    this.shardId = shardId;    
+    this.nodeProps = nodeProps;
+    this.maxTries = maxTries;
+    this.leaderCd = leaderCd;
+    setDaemon(true);
+  }
+  
+  public void run() {
+    RTimer timer = new RTimer();
+
+    String replicaCoreName = nodeProps.getCoreName();
+    String replicaCoreNodeName = ((Replica) nodeProps.getNodeProps()).getName();
+    String replicaNodeName = nodeProps.getNodeName();
+    final String replicaUrl = nodeProps.getCoreUrl();
+
+    if (!zkController.isReplicaInRecoveryHandling(replicaUrl)) {
+      throw new SolrException(ErrorCode.INVALID_STATE, "Replica: " + replicaUrl + " should have been marked under leader initiated recovery in ZkController but wasn't.");
+    }
+
+    boolean sendRecoveryCommand = publishDownState(replicaCoreName, replicaCoreNodeName, replicaNodeName, replicaUrl, false);
+
+    if (sendRecoveryCommand)  {
+      try {
+        sendRecoveryCommandWithRetry();
+      } catch (Exception exc) {
+        log.error(getName()+" failed due to: "+exc, exc);
+        if (exc instanceof SolrException) {
+          throw (SolrException)exc;
+        } else {
+          throw new SolrException(ErrorCode.SERVER_ERROR, exc);
+        }
+      } finally {
+        zkController.removeReplicaFromLeaderInitiatedRecoveryHandling(replicaUrl);
+      }
+    } else  {
+      // replica is no longer in recovery on this node (may be handled on another node)
+      zkController.removeReplicaFromLeaderInitiatedRecoveryHandling(replicaUrl);
+    }
+    log.info("{} completed successfully after running for {}ms", getName(), timer.getTime());
+  }
+
+  public boolean publishDownState(String replicaCoreName, String replicaCoreNodeName, String replicaNodeName, String replicaUrl, boolean forcePublishState) {
+    boolean sendRecoveryCommand = true;
+    boolean publishDownState = false;
+
+    if (zkController.getZkStateReader().getClusterState().liveNodesContain(replicaNodeName)) {
+      try {
+        // create a znode that requires the replica needs to "ack" to verify it knows it was out-of-sync
+        updateLIRState(replicaCoreNodeName);
+
+        log.info("Put replica core={} coreNodeName={} on " +
+            replicaNodeName + " into leader-initiated recovery.", replicaCoreName, replicaCoreNodeName);
+        publishDownState = true;
+      } catch (Exception e) {
+        Throwable setLirZnodeFailedCause = SolrException.getRootCause(e);
+        log.error("Leader failed to set replica " +
+            nodeProps.getCoreUrl() + " state to DOWN due to: " + setLirZnodeFailedCause, setLirZnodeFailedCause);
+        if (setLirZnodeFailedCause instanceof KeeperException.SessionExpiredException
+            || setLirZnodeFailedCause instanceof KeeperException.ConnectionLossException
+            || setLirZnodeFailedCause instanceof ZkController.NotLeaderException) {
+          // our session is expired, which means our state is suspect, so don't go
+          // putting other replicas in recovery (see SOLR-6511)
+          sendRecoveryCommand = false;
+          forcePublishState = false; // no need to force publish any state in this case
+        } // else will go ahead and try to send the recovery command once after this error
+      }
+    } else  {
+      log.info("Node " + replicaNodeName +
+              " is not live, so skipping leader-initiated recovery for replica: core={} coreNodeName={}",
+          replicaCoreName, replicaCoreNodeName);
+      // publishDownState will be false to avoid publishing the "down" state too many times
+      // as many errors can occur together and will each call into this method (SOLR-6189)
+      forcePublishState = false; // no need to force publish the state because replica is not live
+      sendRecoveryCommand = false; // no need to send recovery messages as well
+    }
+
+    try {
+      if (publishDownState || forcePublishState) {
+        ZkNodeProps m = new ZkNodeProps(Overseer.QUEUE_OPERATION, "state",
+            ZkStateReader.STATE_PROP, Replica.State.DOWN.toString(),
+            ZkStateReader.BASE_URL_PROP, nodeProps.getBaseUrl(),
+            ZkStateReader.CORE_NAME_PROP, nodeProps.getCoreName(),
+            ZkStateReader.NODE_NAME_PROP, nodeProps.getNodeName(),
+            ZkStateReader.SHARD_ID_PROP, shardId,
+            ZkStateReader.COLLECTION_PROP, collection);
+        log.warn("Leader is publishing core={} coreNodeName ={} state={} on behalf of un-reachable replica {}",
+            replicaCoreName, replicaCoreNodeName, Replica.State.DOWN.toString(), replicaUrl);
+        zkController.getOverseerJobQueue().offer(Utils.toJSON(m));
+      }
+    } catch (Exception e) {
+      log.error("Could not publish 'down' state for replicaUrl: {}", replicaUrl, e);
+    }
+
+    return sendRecoveryCommand;
+  }
+
+  /*
+  protected scope for testing purposes
+   */
+  protected void updateLIRState(String replicaCoreNodeName) {
+    zkController.updateLeaderInitiatedRecoveryState(collection,
+        shardId,
+        replicaCoreNodeName, Replica.State.DOWN, leaderCd, true);
+  }
+
+  protected void sendRecoveryCommandWithRetry() throws Exception {    
+    int tries = 0;
+    long waitBetweenTriesMs = 5000L;
+    boolean continueTrying = true;
+
+    String replicaCoreName = nodeProps.getCoreName();
+    String recoveryUrl = nodeProps.getBaseUrl();
+    String replicaNodeName = nodeProps.getNodeName();
+    String coreNeedingRecovery = nodeProps.getCoreName();
+    String replicaCoreNodeName = ((Replica) nodeProps.getNodeProps()).getName();
+    String replicaUrl = nodeProps.getCoreUrl();
+    
+    log.info(getName()+" started running to send REQUESTRECOVERY command to "+replicaUrl+
+        "; will try for a max of "+(maxTries * (waitBetweenTriesMs/1000))+" secs");
+
+    RequestRecovery recoverRequestCmd = new RequestRecovery();
+    recoverRequestCmd.setAction(CoreAdminAction.REQUESTRECOVERY);
+    recoverRequestCmd.setCoreName(coreNeedingRecovery);
+    
+    while (continueTrying && ++tries <= maxTries) {
+      if (tries > 1) {
+        log.warn("Asking core={} coreNodeName={} on " + recoveryUrl +
+            " to recover; unsuccessful after "+tries+" of "+maxTries+" attempts so far ...", coreNeedingRecovery, replicaCoreNodeName);
+      } else {
+        log.info("Asking core={} coreNodeName={} on " + recoveryUrl + " to recover", coreNeedingRecovery, replicaCoreNodeName);
+      }
+
+      try (HttpSolrClient client = new HttpSolrClient.Builder(recoveryUrl)
+          .withConnectionTimeout(15000)
+          .withSocketTimeout(60000)
+          .build()) {
+        try {
+          client.request(recoverRequestCmd);
+          
+          log.info("Successfully sent " + CoreAdminAction.REQUESTRECOVERY +
+              " command to core={} coreNodeName={} on " + recoveryUrl, coreNeedingRecovery, replicaCoreNodeName);
+          
+          continueTrying = false; // succeeded, so stop looping
+        } catch (Exception t) {
+          Throwable rootCause = SolrException.getRootCause(t);
+          boolean wasCommError =
+              (rootCause instanceof ConnectException ||
+                  rootCause instanceof ConnectTimeoutException ||
+                  rootCause instanceof NoHttpResponseException ||
+                  rootCause instanceof SocketException);
+
+          SolrException.log(log, recoveryUrl + ": Could not tell a replica to recover", t);
+          
+          if (!wasCommError) {
+            continueTrying = false;
+          }                                                
+        }
+      }
+      
+      // wait a few seconds
+      if (continueTrying) {
+        try {
+          Thread.sleep(waitBetweenTriesMs);
+        } catch (InterruptedException ignoreMe) {
+          Thread.currentThread().interrupt();          
+        }
+        
+        if (coreContainer.isShutDown()) {
+          log.warn("Stop trying to send recovery command to downed replica core={} coreNodeName={} on "
+              + replicaNodeName + " because my core container is closed.", coreNeedingRecovery, replicaCoreNodeName);
+          continueTrying = false;
+          break;
+        }
+        
+        // see if the replica's node is still live, if not, no need to keep doing this loop
+        ZkStateReader zkStateReader = zkController.getZkStateReader();
+        if (!zkStateReader.getClusterState().liveNodesContain(replicaNodeName)) {
+          log.warn("Node "+replicaNodeName+" hosting core "+coreNeedingRecovery+
+              " is no longer live. No need to keep trying to tell it to recover!");
+          continueTrying = false;
+          break;
+        }
+
+        String leaderCoreNodeName = leaderCd.getCloudDescriptor().getCoreNodeName();
+        // stop trying if I'm no longer the leader
+        if (leaderCoreNodeName != null && collection != null) {
+          String leaderCoreNodeNameFromZk = null;
+          try {
+            leaderCoreNodeNameFromZk = zkController.getZkStateReader().getLeaderRetry(collection, shardId, 1000).getName();
+          } catch (Exception exc) {
+            log.error("Failed to determine if " + leaderCoreNodeName + " is still the leader for " + collection +
+                " " + shardId + " before starting leader-initiated recovery thread for " + replicaUrl + " due to: " + exc);
+          }
+          if (!leaderCoreNodeName.equals(leaderCoreNodeNameFromZk)) {
+            log.warn("Stop trying to send recovery command to downed replica core=" + coreNeedingRecovery +
+                ",coreNodeName=" + replicaCoreNodeName + " on " + replicaNodeName + " because " +
+                leaderCoreNodeName + " is no longer the leader! New leader is " + leaderCoreNodeNameFromZk);
+            continueTrying = false;
+            break;
+          }
+          if (!leaderCd.getCloudDescriptor().isLeader()) {
+            log.warn("Stop trying to send recovery command to downed replica core=" + coreNeedingRecovery +
+                ",coreNodeName=" + replicaCoreNodeName + " on " + replicaNodeName + " because " +
+                leaderCoreNodeName + " is no longer the leader!");
+            continueTrying = false;
+            break;
+          }
+        }
+
+        // additional safeguard against the replica trying to be in the active state
+        // before acknowledging the leader initiated recovery command
+        if (collection != null && shardId != null) {
+          try {
+            // call out to ZooKeeper to get the leader-initiated recovery state
+            final Replica.State lirState = zkController.getLeaderInitiatedRecoveryState(collection, shardId, replicaCoreNodeName);
+            
+            if (lirState == null) {
+              log.warn("Stop trying to send recovery command to downed replica core="+coreNeedingRecovery+
+                  ",coreNodeName=" + replicaCoreNodeName + " on "+replicaNodeName+" because the znode no longer exists.");
+              continueTrying = false;
+              break;              
+            }
+            
+            if (lirState == Replica.State.RECOVERING) {
+              // replica has ack'd leader initiated recovery and entered the recovering state
+              // so we don't need to keep looping to send the command
+              continueTrying = false;  
+              log.info("Replica "+coreNeedingRecovery+
+                  " on node "+replicaNodeName+" ack'd the leader initiated recovery state, "
+                      + "no need to keep trying to send recovery command");
+            } else {
+              String lcnn = zkStateReader.getLeaderRetry(collection, shardId, 5000).getName();
+              List<ZkCoreNodeProps> replicaProps = 
+                  zkStateReader.getReplicaProps(collection, shardId, lcnn);
+              if (replicaProps != null && replicaProps.size() > 0) {
+                for (ZkCoreNodeProps prop : replicaProps) {
+                  final Replica replica = (Replica) prop.getNodeProps();
+                  if (replicaCoreNodeName.equals(replica.getName())) {
+                    if (replica.getState() == Replica.State.ACTIVE) {
+                      // replica published its state as "active",
+                      // which is bad if lirState is still "down"
+                      if (lirState == Replica.State.DOWN) {
+                        // OK, so the replica thinks it is active, but it never ack'd the leader initiated recovery
+                        // so its state cannot be trusted and it needs to be told to recover again ... and we keep looping here
+                        log.warn("Replica core={} coreNodeName={} set to active but the leader thinks it should be in recovery;"
+                            + " forcing it back to down state to re-run the leader-initiated recovery process; props: " + replicaProps.get(0), coreNeedingRecovery, replicaCoreNodeName);
+                        publishDownState(replicaCoreName, replicaCoreNodeName, replicaNodeName, replicaUrl, true);
+                      }
+                    }
+                    break;
+                  }
+                }
+              }
+            }                  
+          } catch (Exception ignoreMe) {
+            log.warn("Failed to determine state of core={} coreNodeName={} due to: "+ignoreMe, coreNeedingRecovery, replicaCoreNodeName);
+            // eventually this loop will exhaust max tries and stop so we can just log this for now
+          }                
+        }
+      }
+    }
+    
+    // replica is no longer in recovery on this node (may be handled on another node)
+    zkController.removeReplicaFromLeaderInitiatedRecoveryHandling(replicaUrl);
+    
+    if (continueTrying) {
+      // ugh! this means the loop timed out before the recovery command could be delivered
+      // how exotic do we want to get here?
+      log.error("Timed out after waiting for "+(tries * (waitBetweenTriesMs/1000))+
+          " secs to send the recovery request to: "+replicaUrl+"; not much more we can do here?");
+      
+      // TODO: need to raise a JMX event to allow monitoring tools to take over from here
+      
+    }    
+  }
+}

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/917dbc65/solr/core/src/java/org/apache/solr/cloud/ZkController.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/ZkController.java b/solr/core/src/java/org/apache/solr/cloud/ZkController.java
index 7f49d26..2e495d3 100644
--- a/solr/core/src/java/org/apache/solr/cloud/ZkController.java
+++ b/solr/core/src/java/org/apache/solr/cloud/ZkController.java
@@ -107,6 +107,7 @@ import org.apache.solr.util.RTimer;
 import org.apache.solr.util.RefCounted;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.KeeperException.ConnectionLossException;
 import org.apache.zookeeper.KeeperException.NoNodeException;
 import org.apache.zookeeper.KeeperException.SessionExpiredException;
 import org.apache.zookeeper.Op;
@@ -115,6 +116,7 @@ import org.apache.zookeeper.Watcher;
 import org.apache.zookeeper.data.Stat;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.slf4j.MDC;
 
 import static org.apache.solr.common.cloud.ZkStateReader.BASE_URL_PROP;
 import static org.apache.solr.common.cloud.ZkStateReader.COLLECTION_PROP;
@@ -191,7 +193,6 @@ public class ZkController {
   private final Map<ContextKey, ElectionContext> electionContexts = Collections.synchronizedMap(new HashMap<>());
 
   private final SolrZkClient zkClient;
-  private final ZkCmdExecutor cmdExecutor;
   public final ZkStateReader zkStateReader;
   private SolrCloudManager cloudManager;
   private CloudSolrClient cloudSolrClient;
@@ -225,6 +226,10 @@ public class ZkController {
 
   private volatile boolean isClosed;
 
+  //TODO remove in 8.0, SOLR-11812
+  // keeps track of replicas that have been asked to recover by leaders running on this node
+  private final Map<String, String> replicasInLeaderInitiatedRecovery = new HashMap<String, String>();
+
   // This is an expert and unsupported development mode that does not create
   // an Overseer or register a /live node. This let's you monitor the cluster
   // and interact with zookeeper via the Solr admin UI on a node outside the cluster,
@@ -431,7 +436,6 @@ public class ZkController {
     this.overseerRunningMap = Overseer.getRunningMap(zkClient);
     this.overseerCompletedMap = Overseer.getCompletedMap(zkClient);
     this.overseerFailureMap = Overseer.getFailureMap(zkClient);
-    cmdExecutor = new ZkCmdExecutor(clientTimeout);
     zkStateReader = new ZkStateReader(zkClient, () -> {
       if (cc != null) cc.securityNodeChanged();
     });
@@ -1032,7 +1036,9 @@ public class ZkController {
       final String coreZkNodeName = desc.getCloudDescriptor().getCoreNodeName();
       assert coreZkNodeName != null : "we should have a coreNodeName by now";
 
-      getShardTerms(collection, cloudDesc.getShardId()).registerTerm(coreZkNodeName);
+      if ("new".equals(desc.getCoreProperty("lirVersion", "new"))) {
+        getShardTerms(collection, cloudDesc.getShardId()).registerTerm(coreZkNodeName);
+      }
       String shardId = cloudDesc.getShardId();
       Map<String,Object> props = new HashMap<>();
       // we only put a subset of props into the leader node
@@ -1123,8 +1129,10 @@ public class ZkController {
           }
           publish(desc, Replica.State.ACTIVE);
         }
-        
-        getShardTerms(collection, shardId).addListener(new RecoveringCoreTermWatcher(core));
+
+        if ("new".equals(desc.getCoreProperty("lirVersion", "new"))) {
+          getShardTerms(collection, shardId).addListener(new RecoveringCoreTermWatcher(core));
+        }
         core.getCoreDescriptor().getCloudDescriptor().setHasRegistered(true);
       }
       
@@ -1312,7 +1320,18 @@ public class ZkController {
         return true;
       }
 
-      if (!getShardTerms(collection, shardId).canBecomeLeader(coreZkNodeName)) {
+      //TODO remove in 8.0, SOLR-11812
+      // see if the leader told us to recover
+      final Replica.State lirState = getLeaderInitiatedRecoveryState(collection, shardId,
+          core.getCoreDescriptor().getCloudDescriptor().getCoreNodeName());
+      if (lirState == Replica.State.DOWN) {
+        log.info("Leader marked core " + core.getName() + " down; starting recovery process");
+        core.getUpdateHandler().getSolrCoreState().doRecovery(cc, core.getCoreDescriptor());
+        return true;
+      }
+
+      ZkShardTerms zkShardTerms = getShardTerms(collection, shardId);
+      if (zkShardTerms.registered(coreZkNodeName) && !zkShardTerms.canBecomeLeader(coreZkNodeName)) {
         log.info("Leader's term larger than core " + core.getName() + "; starting recovery process");
         core.getUpdateHandler().getSolrCoreState().doRecovery(cc, core.getCoreDescriptor());
         return true;
@@ -1367,13 +1386,31 @@ public class ZkController {
       String shardId = cd.getCloudDescriptor().getShardId();
       
       String coreNodeName = cd.getCloudDescriptor().getCoreNodeName();
+
+      //TODO remove in 8.0, SOLR-11812
       // If the leader initiated recovery, then verify that this replica has performed
       // recovery as requested before becoming active; don't even look at lirState if going down
-      if (state == Replica.State.ACTIVE && !getShardTerms(collection, shardId).canBecomeLeader(coreNodeName)) {
-        throw new SolrException(ErrorCode.INVALID_STATE,
-            "Cannot publish state of core '" + cd.getName() + "' as active without recovering first!");
+      if (state != Replica.State.DOWN) {
+        final Replica.State lirState = getLeaderInitiatedRecoveryState(collection, shardId, coreNodeName);
+        if (lirState != null) {
+          assert cd.getCloudDescriptor().getReplicaType() != Replica.Type.PULL: "LIR should not happen for pull replicas!";
+          if (state == Replica.State.ACTIVE) {
+            // trying to become active, so leader-initiated state must be recovering
+            if (lirState == Replica.State.RECOVERING) {
+              updateLeaderInitiatedRecoveryState(collection, shardId, coreNodeName, Replica.State.ACTIVE, cd, true);
+            } else if (lirState == Replica.State.DOWN) {
+              throw new SolrException(ErrorCode.INVALID_STATE,
+                  "Cannot publish state of core '" + cd.getName() + "' as active without recovering first!");
+            }
+          } else if (state == Replica.State.RECOVERING) {
+            // if it is currently DOWN, then trying to enter into recovering state is good
+            if (lirState == Replica.State.DOWN) {
+              updateLeaderInitiatedRecoveryState(collection, shardId, coreNodeName, Replica.State.RECOVERING, cd, true);
+            }
+          }
+        }
       }
-      
+
       Map<String,Object> props = new HashMap<>();
       props.put(Overseer.QUEUE_OPERATION, "state");
       props.put(ZkStateReader.STATE_PROP, state.toString());
@@ -1409,7 +1446,7 @@ public class ZkController {
         log.info("The core '{}' had failed to initialize before.", cd.getName());
       }
 
-      if (state == Replica.State.RECOVERING) {
+      if (state == Replica.State.RECOVERING && "new".equals(cd.getCoreProperty("lirVersion", "new"))) {
         getShardTerms(collection, shardId).setEqualsToMax(coreNodeName);
       }
       ZkNodeProps m = new ZkNodeProps(props);
@@ -1713,7 +1750,18 @@ public class ZkController {
     boolean isLeader = leaderProps.getCoreUrl().equals(ourUrl);
     if (!isLeader && !SKIP_AUTO_RECOVERY) {
 
-      if (!getShardTerms(collection, shard).canBecomeLeader(myCoreNodeName)) {
+      // detect if this core is in leader-initiated recovery and if so,
+      // then we don't need the leader to wait on seeing the down state
+      // TODO remove getting LIR state in 8.0, SOLR-11812
+      Replica.State lirState = null;
+      try {
+        lirState = getLeaderInitiatedRecoveryState(collection, shard, myCoreNodeName);
+      } catch (Exception exc) {
+        log.error("Failed to determine if replica " + myCoreNodeName +
+            " is in leader-initiated recovery due to: " + exc, exc);
+      }
+
+      if (lirState != null || !getShardTerms(collection, shard).canBecomeLeader(myCoreNodeName)) {
         log.debug("Term of replica " + myCoreNodeName +
             " is already less than leader, so not waiting for leader to see down state.");
       } else {
@@ -2017,6 +2065,284 @@ public class ZkController {
     return cc;
   }
 
+  //TODO remove all old LIR implementation in 8.0, SOLR-11812
+  /**
+   * When a leader receives a communication error when trying to send a request to a replica,
+   * it calls this method to ensure the replica enters recovery when connectivity is restored.
+   * <p>
+   * returns true if the node hosting the replica is still considered "live" by ZooKeeper;
+   * false means the node is not live either, so no point in trying to send recovery commands
+   * to it.
+   */
+  public boolean ensureReplicaInLeaderInitiatedRecovery(
+      final CoreContainer container,
+      final String collection, final String shardId, final ZkCoreNodeProps replicaCoreProps,
+      CoreDescriptor leaderCd, boolean forcePublishState)
+      throws KeeperException, InterruptedException {
+    final String replicaUrl = replicaCoreProps.getCoreUrl();
+
+    if (collection == null)
+      throw new IllegalArgumentException("collection parameter cannot be null for starting leader-initiated recovery for replica: " + replicaUrl);
+
+    if (shardId == null)
+      throw new IllegalArgumentException("shard parameter cannot be null for starting leader-initiated recovery for replica: " + replicaUrl);
+
+    if (replicaUrl == null)
+      throw new IllegalArgumentException("replicaUrl parameter cannot be null for starting leader-initiated recovery");
+
+    // First, determine if this replica is already in recovery handling
+    // which is needed because there can be many concurrent errors flooding in
+    // about the same replica having trouble and we only need to send the "needs"
+    // recovery signal once
+    boolean nodeIsLive = true;
+    String replicaNodeName = replicaCoreProps.getNodeName();
+    String replicaCoreNodeName = ((Replica) replicaCoreProps.getNodeProps()).getName();
+    assert replicaCoreNodeName != null : "No core name for replica " + replicaNodeName;
+    synchronized (replicasInLeaderInitiatedRecovery) {
+      if (replicasInLeaderInitiatedRecovery.containsKey(replicaUrl)) {
+        if (!forcePublishState) {
+          log.debug("Replica {} already in leader-initiated recovery handling.", replicaUrl);
+          return false; // already in this recovery process
+        }
+      }
+
+      // we only really need to try to start the LIR process if the node itself is "live"
+      if (getZkStateReader().getClusterState().liveNodesContain(replicaNodeName)) {
+
+        LeaderInitiatedRecoveryThread lirThread =
+            new LeaderInitiatedRecoveryThread(this,
+                container,
+                collection,
+                shardId,
+                replicaCoreProps,
+                120,
+                leaderCd);
+        ExecutorService executor = container.getUpdateShardHandler().getUpdateExecutor();
+        try {
+          MDC.put("DistributedUpdateProcessor.replicaUrlToRecover", replicaCoreProps.getCoreUrl());
+          executor.execute(lirThread);
+        } finally {
+          MDC.remove("DistributedUpdateProcessor.replicaUrlToRecover");
+        }
+
+        // create a znode that requires the replica needs to "ack" to verify it knows it was out-of-sync
+        replicasInLeaderInitiatedRecovery.put(replicaUrl,
+            getLeaderInitiatedRecoveryZnodePath(collection, shardId, replicaCoreNodeName));
+        log.info("Put replica core={} coreNodeName={} on " +
+            replicaNodeName + " into leader-initiated recovery.", replicaCoreProps.getCoreName(), replicaCoreNodeName);
+      } else {
+        nodeIsLive = false; // we really don't need to send the recovery request if the node is NOT live
+        log.info("Node " + replicaNodeName +
+                " is not live, so skipping leader-initiated recovery for replica: core={} coreNodeName={}",
+            replicaCoreProps.getCoreName(), replicaCoreNodeName);
+        // publishDownState will be false to avoid publishing the "down" state too many times
+        // as many errors can occur together and will each call into this method (SOLR-6189)
+      }
+    }
+
+    return nodeIsLive;
+  }
+
+  public boolean isReplicaInRecoveryHandling(String replicaUrl) {
+    boolean exists = false;
+    synchronized (replicasInLeaderInitiatedRecovery) {
+      exists = replicasInLeaderInitiatedRecovery.containsKey(replicaUrl);
+    }
+    return exists;
+  }
+
+  public void removeReplicaFromLeaderInitiatedRecoveryHandling(String replicaUrl) {
+    synchronized (replicasInLeaderInitiatedRecovery) {
+      replicasInLeaderInitiatedRecovery.remove(replicaUrl);
+    }
+  }
+
+  public Replica.State getLeaderInitiatedRecoveryState(String collection, String shardId, String coreNodeName) {
+    final Map<String, Object> stateObj = getLeaderInitiatedRecoveryStateObject(collection, shardId, coreNodeName);
+    if (stateObj == null) {
+      return null;
+    }
+    final String stateStr = (String) stateObj.get(ZkStateReader.STATE_PROP);
+    return stateStr == null ? null : Replica.State.getState(stateStr);
+  }
+
+  public Map<String, Object> getLeaderInitiatedRecoveryStateObject(String collection, String shardId, String coreNodeName) {
+
+    if (collection == null || shardId == null || coreNodeName == null)
+      return null; // if we don't have complete data about a core in cloud mode, return null
+
+    String znodePath = getLeaderInitiatedRecoveryZnodePath(collection, shardId, coreNodeName);
+    byte[] stateData = null;
+    try {
+      stateData = zkClient.getData(znodePath, null, new Stat(), false);
+    } catch (NoNodeException ignoreMe) {
+      // safe to ignore as this znode will only exist if the leader initiated recovery
+    } catch (ConnectionLossException | SessionExpiredException cle) {
+      // sort of safe to ignore ??? Usually these are seen when the core is going down
+      // or there are bigger issues to deal with than reading this znode
+      log.warn("Unable to read " + znodePath + " due to: " + cle);
+    } catch (Exception exc) {
+      log.error("Failed to read data from znode " + znodePath + " due to: " + exc);
+      if (exc instanceof SolrException) {
+        throw (SolrException) exc;
+      } else {
+        throw new SolrException(ErrorCode.SERVER_ERROR,
+            "Failed to read data from znodePath: " + znodePath, exc);
+      }
+    }
+
+    Map<String, Object> stateObj = null;
+    if (stateData != null && stateData.length > 0) {
+      // TODO: Remove later ... this is for upgrading from 4.8.x to 4.10.3 (see: SOLR-6732)
+      if (stateData[0] == (byte) '{') {
+        Object parsedJson = Utils.fromJSON(stateData);
+        if (parsedJson instanceof Map) {
+          stateObj = (Map<String, Object>) parsedJson;
+        } else {
+          throw new SolrException(ErrorCode.SERVER_ERROR, "Leader-initiated recovery state data is invalid! " + parsedJson);
+        }
+      } else {
+        // old format still in ZK
+        stateObj = Utils.makeMap("state", new String(stateData, StandardCharsets.UTF_8));
+      }
+    }
+
+    return stateObj;
+  }
+
+  public void updateLeaderInitiatedRecoveryState(String collection, String shardId, String coreNodeName,
+      Replica.State state, CoreDescriptor leaderCd, boolean retryOnConnLoss) {
+    if (collection == null || shardId == null || coreNodeName == null) {
+      log.warn("Cannot set leader-initiated recovery state znode to "
+          + state.toString() + " using: collection=" + collection
+          + "; shardId=" + shardId + "; coreNodeName=" + coreNodeName);
+      return; // if we don't have complete data about a core in cloud mode, do nothing
+    }
+
+    assert leaderCd != null;
+    assert leaderCd.getCloudDescriptor() != null;
+
+    String leaderCoreNodeName = leaderCd.getCloudDescriptor().getCoreNodeName();
+
+    String znodePath = getLeaderInitiatedRecoveryZnodePath(collection, shardId, coreNodeName);
+
+    if (state == Replica.State.ACTIVE) {
+      // since we're marking it active, we don't need this znode anymore, so delete instead of update
+      try {
+        zkClient.delete(znodePath, -1, retryOnConnLoss);
+      } catch (Exception justLogIt) {
+        log.warn("Failed to delete znode " + znodePath, justLogIt);
+      }
+      return;
+    }
+
+    Map<String, Object> stateObj = null;
+    try {
+      stateObj = getLeaderInitiatedRecoveryStateObject(collection, shardId, coreNodeName);
+    } catch (Exception exc) {
+      log.warn(exc.getMessage(), exc);
+    }
+    if (stateObj == null) {
+      stateObj = Utils.makeMap();
+    }
+
+    stateObj.put(ZkStateReader.STATE_PROP, state.toString());
+    // only update the createdBy value if it's not set
+    if (stateObj.get("createdByNodeName") == null) {
+      stateObj.put("createdByNodeName", this.nodeName);
+    }
+    if (stateObj.get("createdByCoreNodeName") == null && leaderCoreNodeName != null)  {
+      stateObj.put("createdByCoreNodeName", leaderCoreNodeName);
+    }
+
+    byte[] znodeData = Utils.toJSON(stateObj);
+
+    try {
+      if (state == Replica.State.DOWN) {
+        markShardAsDownIfLeader(collection, shardId, leaderCd, znodePath, znodeData, retryOnConnLoss);
+      } else {
+        // must retry on conn loss otherwise future election attempts may assume wrong LIR state
+        if (zkClient.exists(znodePath, true)) {
+          zkClient.setData(znodePath, znodeData, retryOnConnLoss);
+        } else {
+          zkClient.makePath(znodePath, znodeData, retryOnConnLoss);
+        }
+      }
+      log.debug("Wrote {} to {}", state.toString(), znodePath);
+    } catch (Exception exc) {
+      if (exc instanceof SolrException) {
+        throw (SolrException) exc;
+      } else {
+        throw new SolrException(ErrorCode.SERVER_ERROR,
+            "Failed to update data to " + state.toString() + " for znode: " + znodePath, exc);
+      }
+    }
+  }
+
+  /**
+   * we use ZK's multi-transactional semantics to ensure that we are able to
+   * publish a replica as 'down' only if our leader election node still exists
+   * in ZK. This ensures that a long running network partition caused by GC etc
+   * doesn't let us mark a node as down *after* we've already lost our session
+   */
+  private void markShardAsDownIfLeader(String collection, String shardId, CoreDescriptor leaderCd,
+                                       String znodePath, byte[] znodeData,
+                                       boolean retryOnConnLoss) throws KeeperException, InterruptedException {
+
+
+    if (!leaderCd.getCloudDescriptor().isLeader()) {
+      log.info("No longer leader, aborting attempt to mark shard down as part of LIR");
+      throw new NotLeaderException(ErrorCode.SERVER_ERROR, "Locally, we do not think we are the leader.");
+    }
+
+    ContextKey key = new ContextKey(collection, leaderCd.getCloudDescriptor().getCoreNodeName());
+    ElectionContext context = electionContexts.get(key);
+
+    // we make sure we locally think we are the leader before and after getting the context - then
+    // we only try zk if we still think we are the leader and have our leader context
+    if (context == null || !leaderCd.getCloudDescriptor().isLeader()) {
+      log.info("No longer leader, aborting attempt to mark shard down as part of LIR");
+      throw new NotLeaderException(ErrorCode.SERVER_ERROR, "Locally, we do not think we are the leader.");
+    }
+
+    // we think we are the leader - get the expected shard leader version
+    // we use this version and multi to ensure *only* the current zk registered leader
+    // for a shard can put a replica into LIR
+
+    Integer leaderZkNodeParentVersion = ((ShardLeaderElectionContextBase)context).getLeaderZkNodeParentVersion();
+
+    // TODO: should we do this optimistically to avoid races?
+    if (zkClient.exists(znodePath, retryOnConnLoss)) {
+      List<Op> ops = new ArrayList<>(2);
+      ops.add(Op.check(new org.apache.hadoop.fs.Path(((ShardLeaderElectionContextBase)context).leaderPath).getParent().toString(), leaderZkNodeParentVersion));
+      ops.add(Op.setData(znodePath, znodeData, -1));
+      zkClient.multi(ops, retryOnConnLoss);
+    } else {
+      String parentZNodePath = getLeaderInitiatedRecoveryZnodePath(collection, shardId);
+      try {
+        // make sure we don't create /collections/{collection} if they do not exist with 2 param
+        zkClient.makePath(parentZNodePath, (byte[]) null, CreateMode.PERSISTENT, (Watcher) null, true, retryOnConnLoss, 2);
+      } catch (KeeperException.NodeExistsException nee) {
+        // if it exists, that's great!
+      }
+
+      // we only create the entry if the context we are using is registered as the current leader in ZK
+      List<Op> ops = new ArrayList<>(2);
+      ops.add(Op.check(new org.apache.hadoop.fs.Path(((ShardLeaderElectionContextBase)context).leaderPath).getParent().toString(), leaderZkNodeParentVersion));
+      ops.add(Op.create(znodePath, znodeData, zkClient.getZkACLProvider().getACLsToAdd(znodePath),
+          CreateMode.PERSISTENT));
+      zkClient.multi(ops, retryOnConnLoss);
+    }
+  }
+
+  public static String getLeaderInitiatedRecoveryZnodePath(String collection, String shardId) {
+    return "/collections/" + collection + "/leader_initiated_recovery/" + shardId;
+  }
+
+  public static String getLeaderInitiatedRecoveryZnodePath(String collection, String shardId, String coreNodeName) {
+    return getLeaderInitiatedRecoveryZnodePath(collection, shardId) + "/" + coreNodeName;
+  }
+
   public void throwErrorIfReplicaReplaced(CoreDescriptor desc) {
     ClusterState clusterState = getZkStateReader().getClusterState();
     if (clusterState != null) {
@@ -2301,6 +2627,15 @@ public class ZkController {
     };
   }
 
+  /**
+   * Thrown during leader initiated recovery process if current node is not leader
+   */
+  public static class NotLeaderException extends SolrException  {
+    public NotLeaderException(ErrorCode code, String msg) {
+      super(code, msg);
+    }
+  }
+
   public boolean checkIfCoreNodeNameAlreadyExists(CoreDescriptor dcore) {
     DocCollection collection = zkStateReader.getClusterState().getCollectionOrNull(dcore.getCollectionName());
     if (collection != null) {

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/917dbc65/solr/core/src/java/org/apache/solr/cloud/ZkShardTerms.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/ZkShardTerms.java b/solr/core/src/java/org/apache/solr/cloud/ZkShardTerms.java
index f61ed2c..30a7355 100644
--- a/solr/core/src/java/org/apache/solr/cloud/ZkShardTerms.java
+++ b/solr/core/src/java/org/apache/solr/cloud/ZkShardTerms.java
@@ -81,6 +81,10 @@ public class ZkShardTerms implements AutoCloseable{
     return terms.canBecomeLeader(coreNodeName);
   }
 
+  public boolean registered(String coreNodeName) {
+    return terms.getTerm(coreNodeName) != null;
+  }
+
   public void close() {
     // no watcher will be registered
     numWatcher.addAndGet(1);

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/917dbc65/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java b/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java
index 020e3e9..aba32b4 100644
--- a/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java
+++ b/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java
@@ -979,6 +979,16 @@ public class CollectionsHandler extends RequestHandlerBase implements Permission
             "The shard already has an active leader. Force leader is not applicable. State: " + slice);
       }
 
+      //TODO remove this in 8.0, SOLR-11812
+      // Clear out any LIR state
+      String lirPath = handler.coreContainer.getZkController().getLeaderInitiatedRecoveryZnodePath(collectionName, sliceId);
+      if (handler.coreContainer.getZkController().getZkClient().exists(lirPath, true)) {
+        StringBuilder sb = new StringBuilder();
+        handler.coreContainer.getZkController().getZkClient().printLayout(lirPath, 4, sb);
+        log.info("Cleaning out LIR data, which was: {}", sb);
+        handler.coreContainer.getZkController().getZkClient().clean(lirPath);
+      }
+
       // Call all live replicas to prepare themselves for leadership, e.g. set last published
       // state to active.
       for (Replica rep : slice.getReplicas()) {

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/917dbc65/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java b/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java
index c724994..063a740 100644
--- a/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java
+++ b/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java
@@ -45,6 +45,7 @@ import org.apache.solr.client.solrj.response.SimpleSolrResponse;
 import org.apache.solr.cloud.CloudDescriptor;
 import org.apache.solr.cloud.Overseer;
 import org.apache.solr.cloud.ZkController;
+import org.apache.solr.cloud.ZkShardTerms;
 import org.apache.solr.cloud.overseer.OverseerAction;
 import org.apache.solr.common.SolrException;
 import org.apache.solr.common.SolrException.ErrorCode;
@@ -184,6 +185,8 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
   // are custom and may modify the SolrInputDocument racing with its serialization for replication
   private final boolean cloneRequiredOnLeader;
   private final Replica.Type replicaType;
+  // this flag, used for testing rolling updates, should be removed by SOLR-11812
+  private final boolean isOldLIRMode;
 
   public DistributedUpdateProcessor(SolrQueryRequest req, SolrQueryResponse rsp, UpdateRequestProcessor next) {
     this(req, rsp, new AtomicUpdateDocumentMerger(req), next);
@@ -203,6 +206,7 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
 
     this.ulog = req.getCore().getUpdateHandler().getUpdateLog();
     this.vinfo = ulog == null ? null : ulog.getVersionInfo();
+    this.isOldLIRMode = !"new".equals(req.getCore().getCoreDescriptor().getCoreProperty("lirVersion", "new"));
     versionsStored = this.vinfo != null && this.vinfo.getVersionField() != null;
     returnVersions = req.getParams().getBool(UpdateParams.VERSIONS ,false);
 
@@ -344,11 +348,12 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
         }
 
         List<Node> nodes = new ArrayList<>(replicaProps.size());
+        ZkShardTerms zkShardTerms = zkController.getShardTerms(collection, shardId);
         for (ZkCoreNodeProps props : replicaProps) {
           String coreNodeName = ((Replica) props.getNodeProps()).getName();
           if (skipList != null && skipListSet.contains(props.getCoreUrl())) {
             log.info("check url:" + props.getCoreUrl() + " against:" + skipListSet + " result:true");
-          } else if(!zkController.getShardTerms(collection, shardId).canBecomeLeader(coreNodeName)) {
+          } else if(!isOldLIRMode && zkShardTerms.registered(coreNodeName) && !zkShardTerms.canBecomeLeader(coreNodeName)) {
             log.info("skip url:{} cause its term is less than leader", props.getCoreUrl());
           } else {
             nodes.add(new StdNode(props, collection, shardId));
@@ -847,10 +852,23 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
             // if false, then the node is probably not "live" anymore
             // and we do not need to send a recovery message
             Throwable rootCause = SolrException.getRootCause(error.e);
-            log.error("Setting up to try to start recovery on replica {} with url {}", coreNodeName, replicaUrl, rootCause);
-            ShardInfo shardInfo = new ShardInfo(collection, shardId, leaderCoreNodeName);
-            failedReplicas.putIfAbsent(shardInfo, new HashSet<>());
-            failedReplicas.get(shardInfo).add(coreNodeName);
+            if (!isOldLIRMode && zkController.getShardTerms(collection, shardId).registered(coreNodeName)) {
+              log.error("Setting up to try to start recovery on replica {} with url {} by increasing leader term", coreNodeName, replicaUrl, rootCause);
+              ShardInfo shardInfo = new ShardInfo(collection, shardId, leaderCoreNodeName);
+              failedReplicas.putIfAbsent(shardInfo, new HashSet<>());
+              failedReplicas.get(shardInfo).add(coreNodeName);
+            } else {
+              // The replica did not registered its term, so it must run with old LIR implementation
+              log.error("Setting up to try to start recovery on replica {}", replicaUrl, rootCause);
+              zkController.ensureReplicaInLeaderInitiatedRecovery(
+                  req.getCore().getCoreContainer(),
+                  collection,
+                  shardId,
+                  stdNode.getNodeProps(),
+                  req.getCore().getCoreDescriptor(),
+                  false /* forcePublishState */
+              );
+            }
           } catch (Exception exc) {
             Throwable setLirZnodeFailedCause = SolrException.getRootCause(exc);
             log.error("Leader failed to set replica " +
@@ -869,9 +887,11 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
         }
       }
     }
-    for (Map.Entry<ShardInfo, Set<String>> entry : failedReplicas.entrySet()) {
-      ShardInfo shardInfo = entry.getKey();
-      zkController.getShardTerms(shardInfo.collection, shardInfo.shard).ensureTermsIsHigher(shardInfo.leader, entry.getValue());
+    if (!isOldLIRMode) {
+      for (Map.Entry<ShardInfo, Set<String>> entry : failedReplicas.entrySet()) {
+        ShardInfo shardInfo = entry.getKey();
+        zkController.getShardTerms(shardInfo.collection, shardInfo.shard).ensureTermsIsHigher(shardInfo.leader, entry.getValue());
+      }
     }
     // in either case, we need to attach the achieved and min rf to the response.
     if (leaderReplicationTracker != null || rollupReplicationTracker != null) {

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/917dbc65/solr/core/src/test/org/apache/solr/cloud/LIRRollingUpdatesTest.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/LIRRollingUpdatesTest.java b/solr/core/src/test/org/apache/solr/cloud/LIRRollingUpdatesTest.java
new file mode 100644
index 0000000..a997167
--- /dev/null
+++ b/solr/core/src/test/org/apache/solr/cloud/LIRRollingUpdatesTest.java
@@ -0,0 +1,458 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.cloud;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.lang.invoke.MethodHandles;
+import java.net.URI;
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Supplier;
+
+import org.apache.solr.JSONTestUtil;
+import org.apache.solr.client.solrj.SolrServerException;
+import org.apache.solr.client.solrj.embedded.JettySolrRunner;
+import org.apache.solr.client.solrj.impl.HttpSolrClient;
+import org.apache.solr.client.solrj.request.CollectionAdminRequest;
+import org.apache.solr.client.solrj.request.QueryRequest;
+import org.apache.solr.common.SolrInputDocument;
+import org.apache.solr.common.cloud.Replica;
+import org.apache.solr.common.cloud.Slice;
+import org.apache.solr.common.cloud.ZkCoreNodeProps;
+import org.apache.solr.common.cloud.ZkStateReader;
+import org.apache.solr.common.util.NamedList;
+import org.apache.solr.common.util.TimeSource;
+import org.apache.solr.util.TestInjection;
+import org.apache.solr.util.TimeOut;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class LIRRollingUpdatesTest extends SolrCloudTestCase {
+
+  private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+  private static Map<URI, SocketProxy> proxies;
+  private static Map<URI, JettySolrRunner> jettys;
+
+  @BeforeClass
+  public static void setupCluster() throws Exception {
+    configureCluster(3)
+        .addConfig("conf", configset("cloud-minimal"))
+        .configure();
+    // Add proxies
+    proxies = new HashMap<>(cluster.getJettySolrRunners().size());
+    jettys = new HashMap<>(cluster.getJettySolrRunners().size());
+    for (JettySolrRunner jetty:cluster.getJettySolrRunners()) {
+      SocketProxy proxy = new SocketProxy();
+      jetty.setProxyPort(proxy.getListenPort());
+      cluster.stopJettySolrRunner(jetty);//TODO: Can we avoid this restart
+      cluster.startJettySolrRunner(jetty);
+      proxy.open(jetty.getBaseUrl().toURI());
+      LOG.info("Adding proxy for URL: " + jetty.getBaseUrl() + ". Proxy: " + proxy.getUrl());
+      proxies.put(proxy.getUrl(), proxy);
+      jettys.put(proxy.getUrl(), jetty);
+    }
+  }
+
+
+  @AfterClass
+  public static void tearDownCluster() throws Exception {
+    for (SocketProxy proxy:proxies.values()) {
+      proxy.close();
+    }
+    proxies = null;
+    jettys = null;
+    TestInjection.reset();
+  }
+
+  @Test
+  public void testNewReplicaOldLeader() throws Exception {
+
+    String collection = "testNewReplicaOldLeader";
+    CollectionAdminRequest.createCollection(collection, 1, 2)
+        .setCreateNodeSet("")
+        .process(cluster.getSolrClient());
+    Properties oldLir = new Properties();
+    oldLir.setProperty("lirVersion", "old");
+
+    CollectionAdminRequest
+        .addReplicaToShard(collection, "shard1")
+        .setProperties(oldLir)
+        .setNode(cluster.getJettySolrRunner(0).getNodeName())
+        .process(cluster.getSolrClient());
+
+    CollectionAdminRequest
+        .addReplicaToShard(collection, "shard1")
+        .setProperties(oldLir)
+        .setNode(cluster.getJettySolrRunner(1).getNodeName())
+        .process(cluster.getSolrClient());
+    addDocs(collection, 2, 0);
+
+    Slice shard1 = getCollectionState(collection).getSlice("shard1");
+    //introduce network partition between leader & replica
+    Replica notLeader = shard1.getReplicas(x -> x != shard1.getLeader()).get(0);
+    assertTrue(runInOldLIRMode(collection, "shard1", notLeader));
+    getProxyForReplica(notLeader).close();
+    getProxyForReplica(shard1.getLeader()).close();
+
+    addDoc(collection, 2, getJettyForReplica(shard1.getLeader()));
+    waitForState("Replica " + notLeader.getName() + " is not put as DOWN", collection,
+        (liveNodes, collectionState) ->
+            collectionState.getSlice("shard1").getReplica(notLeader.getName()).getState() == Replica.State.DOWN);
+    getProxyForReplica(shard1.getLeader()).reopen();
+    getProxyForReplica(notLeader).reopen();
+    // make sure that, when new replica works with old leader, it still can recovery normally
+    waitForState("Timeout waiting for recovering", collection, clusterShape(1, 2));
+    assertDocsExistInAllReplicas(Collections.singletonList(notLeader), collection, 0, 2);
+
+    // make sure that, when new replica restart during LIR, it still can recovery normally (by looking at LIR node)
+    getProxyForReplica(notLeader).close();
+    getProxyForReplica(shard1.getLeader()).close();
+
+    addDoc(collection, 3, getJettyForReplica(shard1.getLeader()));
+    waitForState("Replica " + notLeader.getName() + " is not put as DOWN", collection,
+        (liveNodes, collectionState) ->
+            collectionState.getSlice("shard1").getReplica(notLeader.getName()).getState() == Replica.State.DOWN);
+
+    JettySolrRunner notLeaderJetty = getJettyForReplica(notLeader);
+    notLeaderJetty.stop();
+    waitForState("Node did not leave", collection, (liveNodes, collectionState) -> liveNodes.size() == 2);
+    upgrade(notLeaderJetty);
+    notLeaderJetty.start();
+
+    getProxyForReplica(shard1.getLeader()).reopen();
+    getProxyForReplica(notLeader).reopen();
+    waitForState("Timeout waiting for recovering", collection, clusterShape(1, 2));
+    assertFalse(runInOldLIRMode(collection, "shard1", notLeader));
+    assertDocsExistInAllReplicas(Collections.singletonList(notLeader), collection, 0, 3);
+
+    CollectionAdminRequest.deleteCollection(collection).process(cluster.getSolrClient());
+  }
+
+  public void testNewLeaderOldReplica() throws Exception {
+    // in case of new leader & old replica, new leader can still put old replica into LIR
+
+    String collection = "testNewLeaderOldReplica";
+    CollectionAdminRequest.createCollection(collection, 1, 2)
+        .setCreateNodeSet("")
+        .process(cluster.getSolrClient());
+    Properties oldLir = new Properties();
+    oldLir.setProperty("lirVersion", "old");
+
+    CollectionAdminRequest
+        .addReplicaToShard(collection, "shard1")
+        .setNode(cluster.getJettySolrRunner(0).getNodeName())
+        .process(cluster.getSolrClient());
+    waitForState("Timeout waiting for shard1 become active", collection, (liveNodes, collectionState) -> {
+      Slice shard1 = collectionState.getSlice("shard1");
+      if (shard1.getReplicas().size() == 1 && shard1.getLeader() != null) return true;
+      return false;
+    });
+
+    CollectionAdminRequest
+        .addReplicaToShard(collection, "shard1")
+        .setProperties(oldLir)
+        .setNode(cluster.getJettySolrRunner(1).getNodeName())
+        .process(cluster.getSolrClient());
+
+    Slice shard1 = getCollectionState(collection).getSlice("shard1");
+    Replica notLeader = shard1.getReplicas(x -> x != shard1.getLeader()).get(0);
+    Replica leader = shard1.getLeader();
+
+    assertTrue(runInOldLIRMode(collection, "shard1", notLeader));
+    assertFalse(runInOldLIRMode(collection, "shard1", leader));
+
+    addDocs(collection, 2, 0);
+    getProxyForReplica(notLeader).close();
+    getProxyForReplica(leader).close();
+
+    JettySolrRunner leaderJetty = getJettyForReplica(leader);
+    addDoc(collection, 2, leaderJetty);
+    waitForState("Replica " + notLeader.getName() + " is not put as DOWN", collection,
+        (liveNodes, collectionState) ->
+            collectionState.getSlice("shard1").getReplica(notLeader.getName()).getState() == Replica.State.DOWN);
+    // wait a little bit
+    Thread.sleep(500);
+    getProxyForReplica(notLeader).reopen();
+    getProxyForReplica(leader).reopen();
+
+    waitForState("Timeout waiting for recovering", collection, clusterShape(1, 2));
+    assertDocsExistInAllReplicas(Collections.singletonList(notLeader), collection, 0, 2);
+
+    // ensure that after recovery, the upgraded replica will clean its LIR status cause it is no longer needed
+    assertFalse(cluster.getSolrClient().getZkStateReader().getZkClient().exists(
+        ZkController.getLeaderInitiatedRecoveryZnodePath(collection, "shard1", notLeader.getName()), true));
+    // ensure that, leader should not register other replica's term
+    try (ZkShardTerms zkShardTerms = new ZkShardTerms(collection, "shard1", cluster.getZkClient())) {
+      assertFalse(zkShardTerms.getTerms().containsKey(notLeader.getName()));
+    }
+    CollectionAdminRequest.deleteCollection(collection).process(cluster.getSolrClient());
+  }
+
+  public void testLeaderAndMixedReplicas(boolean leaderInOldMode) throws Exception {
+    // in case of new leader and mixed old replica and new replica, new leader can still put all of them into recovery
+    // step1 : setup collection
+    String collection = "testMixedReplicas-"+leaderInOldMode;
+    CollectionAdminRequest.createCollection(collection, 1, 2)
+        .setCreateNodeSet("")
+        .process(cluster.getSolrClient());
+    Properties oldLir = new Properties();
+    oldLir.setProperty("lirVersion", "old");
+
+    if (leaderInOldMode) {
+      CollectionAdminRequest
+          .addReplicaToShard(collection, "shard1")
+          .setProperties(oldLir)
+          .setNode(cluster.getJettySolrRunner(0).getNodeName())
+          .process(cluster.getSolrClient());
+    } else {
+      CollectionAdminRequest
+          .addReplicaToShard(collection, "shard1")
+          .setNode(cluster.getJettySolrRunner(0).getNodeName())
+          .process(cluster.getSolrClient());
+    }
+
+    waitForState("Timeout waiting for shard1 become active", collection, clusterShape(1, 1));
+
+    CollectionAdminRequest
+        .addReplicaToShard(collection, "shard1")
+        .setProperties(oldLir)
+        .setNode(cluster.getJettySolrRunner(1).getNodeName())
+        .process(cluster.getSolrClient());
+
+    CollectionAdminRequest
+        .addReplicaToShard(collection, "shard1")
+        .setNode(cluster.getJettySolrRunner(2).getNodeName())
+        .process(cluster.getSolrClient());
+
+    Slice shard1 = getCollectionState(collection).getSlice("shard1");
+    Replica replicaInOldMode = shard1.getReplicas(x -> x != shard1.getLeader()).get(0);
+    Replica replicaInNewMode = shard1.getReplicas(x -> x != shard1.getLeader()).get(1);
+    Replica leader = shard1.getLeader();
+
+    assertEquals(leaderInOldMode, runInOldLIRMode(collection, "shard1", leader));
+    if (!runInOldLIRMode(collection, "shard1", replicaInOldMode)) {
+      Replica temp = replicaInOldMode;
+      replicaInOldMode = replicaInNewMode;
+      replicaInNewMode = temp;
+    }
+    assertTrue(runInOldLIRMode(collection, "shard1", replicaInOldMode));
+    assertFalse(runInOldLIRMode(collection, "shard1", replicaInNewMode));
+
+    addDocs(collection, 2, 0);
+
+    // step2 : introduce network partition then add doc, replicas should be put into recovery
+    getProxyForReplica(replicaInOldMode).close();
+    getProxyForReplica(replicaInNewMode).close();
+    getProxyForReplica(leader).close();
+
+    JettySolrRunner leaderJetty = getJettyForReplica(leader);
+    addDoc(collection, 2, leaderJetty);
+
+    Replica finalReplicaInOldMode = replicaInOldMode;
+    waitForState("Replica " + replicaInOldMode.getName() + " is not put as DOWN", collection,
+        (liveNodes, collectionState) ->
+            collectionState.getSlice("shard1").getReplica(finalReplicaInOldMode.getName()).getState() == Replica.State.DOWN);
+
+    // wait a little bit
+    Thread.sleep(500);
+    getProxyForReplica(replicaInOldMode).reopen();
+    getProxyForReplica(replicaInNewMode).reopen();
+    getProxyForReplica(leader).reopen();
+
+    waitForState("Timeout waiting for recovering", collection, clusterShape(1, 3));
+    assertDocsExistInAllReplicas(Arrays.asList(replicaInNewMode, replicaInOldMode), collection, 0, 2);
+
+    addDocs(collection, 3, 3);
+
+    // ensure that, leader should not register other replica's term
+    try (ZkShardTerms zkShardTerms = new ZkShardTerms(collection, "shard1", cluster.getZkClient())) {
+      assertFalse(zkShardTerms.getTerms().containsKey(replicaInOldMode.getName()));
+    }
+
+    // step3 : upgrade the replica running in old mode to the new mode
+    getProxyForReplica(leader).close();
+    getProxyForReplica(replicaInOldMode).close();
+    addDoc(collection, 6, leaderJetty);
+    JettySolrRunner oldJetty = getJettyForReplica(replicaInOldMode);
+    oldJetty.stop();
+    upgrade(oldJetty);
+
+    getProxyForReplica(leader).reopen();
+    getProxyForReplica(replicaInOldMode).reopen();
+
+    waitForState("Node did not leave", collection, (liveNodes, collectionState)
+        -> liveNodes.size() == 2);
+    oldJetty.start();
+
+    waitForState("Timeout waiting for recovering", collection, clusterShape(1, 3));
+    assertDocsExistInAllReplicas(Arrays.asList(replicaInNewMode, replicaInOldMode), collection, 0, 6);
+
+    CollectionAdminRequest.deleteCollection(collection).process(cluster.getSolrClient());
+  }
+
+  @Test
+  public void testNewLeaderAndMixedReplicas() throws Exception {
+    testLeaderAndMixedReplicas(false);
+  }
+
+  @Test
+  public void testOldLeaderAndMixedReplicas() throws Exception {
+    testLeaderAndMixedReplicas(true);
+  }
+
+  private void upgrade(JettySolrRunner solrRunner) {
+    File[] corePaths = new File(solrRunner.getSolrHome()).listFiles();
+    for (File corePath : corePaths) {
+      File coreProperties = new File(corePath, "core.properties");
+      if (!coreProperties.exists()) continue;
+      Properties properties = new Properties();
+
+      try (InputStream is = new FileInputStream(coreProperties)) {
+        properties.load(is);
+      } catch (Exception e) {
+        continue;
+      }
+      properties.remove("lirVersion");
+      try (OutputStream out = new FileOutputStream(coreProperties)) {
+        properties.store(out, "Upgraded");
+      } catch (Exception e) {
+        continue;
+      }
+    }
+  }
+
+  protected void assertDocsExistInAllReplicas(List<Replica> notLeaders,
+                                              String testCollectionName, int firstDocId, int lastDocId)
+      throws Exception {
+    Replica leader =
+        cluster.getSolrClient().getZkStateReader().getLeaderRetry(testCollectionName, "shard1", 10000);
+    HttpSolrClient leaderSolr = getHttpSolrClient(leader, testCollectionName);
+    List<HttpSolrClient> replicas =
+        new ArrayList<HttpSolrClient>(notLeaders.size());
+
+    for (Replica r : notLeaders) {
+      replicas.add(getHttpSolrClient(r, testCollectionName));
+    }
+    try {
+      for (int d = firstDocId; d <= lastDocId; d++) {
+        String docId = String.valueOf(d);
+        assertDocExists(leaderSolr, testCollectionName, docId);
+        for (HttpSolrClient replicaSolr : replicas) {
+          assertDocExists(replicaSolr, testCollectionName, docId);
+        }
+      }
+    } finally {
+      if (leaderSolr != null) {
+        leaderSolr.close();
+      }
+      for (HttpSolrClient replicaSolr : replicas) {
+        replicaSolr.close();
+      }
+    }
+  }
+
+  protected void assertDocExists(HttpSolrClient solr, String coll, String docId) throws Exception {
+    NamedList rsp = realTimeGetDocId(solr, docId);
+    String match = JSONTestUtil.matchObj("/id", rsp.get("doc"), docId);
+    assertTrue("Doc with id=" + docId + " not found in " + solr.getBaseURL()
+        + " due to: " + match + "; rsp="+rsp, match == null);
+  }
+
+  private NamedList realTimeGetDocId(HttpSolrClient solr, String docId) throws SolrServerException, IOException {
+    QueryRequest qr = new QueryRequest(params("qt", "/get", "id", docId, "distrib", "false"));
+    return solr.request(qr);
+  }
+
+  protected HttpSolrClient getHttpSolrClient(Replica replica, String coll) throws Exception {
+    ZkCoreNodeProps zkProps = new ZkCoreNodeProps(replica);
+    String url = zkProps.getBaseUrl() + "/" + coll;
+    return getHttpSolrClient(url);
+  }
+
+  private <T> void waitFor(int waitTimeInSecs, T expected, Supplier<T> supplier) throws InterruptedException {
+    TimeOut timeOut = new TimeOut(waitTimeInSecs, TimeUnit.SECONDS, new TimeSource.CurrentTimeSource());
+    while (!timeOut.hasTimedOut()) {
+      if (expected == supplier.get()) return;
+      Thread.sleep(100);
+    }
+    assertEquals(expected, supplier.get());
+  }
+
+  private boolean runInOldLIRMode(String collection, String shard, Replica replica) {
+    try (ZkShardTerms shardTerms = new ZkShardTerms(collection, shard, cluster.getZkClient())) {
+      return !shardTerms.registered(replica.getName());
+    }
+  }
+
+  private void addDoc(String collection, int docId, JettySolrRunner solrRunner) throws IOException, SolrServerException {
+    try (HttpSolrClient solrClient = new HttpSolrClient.Builder(solrRunner.getBaseUrl().toString()).build()) {
+      solrClient.add(collection, new SolrInputDocument("id", String.valueOf(docId), "fieldName_s", String.valueOf(docId)));
+    }
+  }
+
+  private void addDocs(String collection, int numDocs, int startId) throws SolrServerException, IOException {
+    List<SolrInputDocument> docs = new ArrayList<>(numDocs);
+    for (int i = 0; i < numDocs; i++) {
+      int id = startId + i;
+      docs.add(new SolrInputDocument("id", String.valueOf(id), "fieldName_s", String.valueOf(id)));
+    }
+    cluster.getSolrClient().add(collection, docs);
+    cluster.getSolrClient().commit(collection);
+  }
+
+
+  protected JettySolrRunner getJettyForReplica(Replica replica) throws Exception {
+    String replicaBaseUrl = replica.getStr(ZkStateReader.BASE_URL_PROP);
+    assertNotNull(replicaBaseUrl);
+    URL baseUrl = new URL(replicaBaseUrl);
+
+    JettySolrRunner proxy = jettys.get(baseUrl.toURI());
+    assertNotNull("No proxy found for " + baseUrl + "!", proxy);
+    return proxy;
+  }
+
+  protected SocketProxy getProxyForReplica(Replica replica) throws Exception {
+    String replicaBaseUrl = replica.getStr(ZkStateReader.BASE_URL_PROP);
+    assertNotNull(replicaBaseUrl);
+    URL baseUrl = new URL(replicaBaseUrl);
+
+    SocketProxy proxy = proxies.get(baseUrl.toURI());
+    if (proxy == null && !baseUrl.toExternalForm().endsWith("/")) {
+      baseUrl = new URL(baseUrl.toExternalForm() + "/");
+      proxy = proxies.get(baseUrl.toURI());
+    }
+    assertNotNull("No proxy found for " + baseUrl + "!", proxy);
+    return proxy;
+  }
+}