You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by da...@apache.org on 2018/01/29 08:56:29 UTC

[2/2] lucene-solr:master: SOLR-11702: Redesign current LIR implementation

SOLR-11702: Redesign current LIR implementation


Project: http://git-wip-us.apache.org/repos/asf/lucene-solr/repo
Commit: http://git-wip-us.apache.org/repos/asf/lucene-solr/commit/27ef6530
Tree: http://git-wip-us.apache.org/repos/asf/lucene-solr/tree/27ef6530
Diff: http://git-wip-us.apache.org/repos/asf/lucene-solr/diff/27ef6530

Branch: refs/heads/master
Commit: 27ef6530646a9af6f8fdf491afd80185bc4f7fee
Parents: 00d453d
Author: Cao Manh Dat <da...@apache.org>
Authored: Mon Jan 29 15:55:28 2018 +0700
Committer: Cao Manh Dat <da...@apache.org>
Committed: Mon Jan 29 15:55:28 2018 +0700

----------------------------------------------------------------------
 solr/CHANGES.txt                                |   6 +
 .../client/solrj/embedded/JettySolrRunner.java  |  16 +-
 .../org/apache/solr/cloud/ElectionContext.java  |  48 +-
 .../cloud/LeaderInitiatedRecoveryThread.java    |   1 +
 .../solr/cloud/RecoveringCoreTermWatcher.java   |  75 +++
 .../org/apache/solr/cloud/RecoveryStrategy.java |  75 ++-
 .../apache/solr/cloud/ZkCollectionTerms.java    |  65 +++
 .../org/apache/solr/cloud/ZkController.java     | 112 +++--
 .../org/apache/solr/cloud/ZkShardTerms.java     | 475 +++++++++++++++++++
 .../api/collections/CreateCollectionCmd.java    |  18 +-
 .../solr/handler/admin/CollectionsHandler.java  |  45 +-
 .../solr/handler/admin/PrepRecoveryOp.java      |   7 +
 .../solr/update/DefaultSolrCoreState.java       |  19 +-
 .../processor/DistributedUpdateProcessor.java   |  86 +++-
 .../org/apache/solr/cloud/ForceLeaderTest.java  | 220 +++++++--
 .../solr/cloud/HttpPartitionOnCommitTest.java   | 178 +++++++
 .../apache/solr/cloud/HttpPartitionTest.java    | 179 +++++--
 .../solr/cloud/LIRRollingUpdatesTest.java       | 457 ++++++++++++++++++
 .../LeaderInitiatedRecoveryOnCommitTest.java    | 178 -------
 ...aderInitiatedRecoveryOnShardRestartTest.java |  12 +-
 .../TestLeaderInitiatedRecoveryThread.java      |   1 +
 .../org/apache/solr/cloud/ZkShardTermsTest.java | 204 ++++++++
 .../solr/update/TestInPlaceUpdatesDistrib.java  |  33 +-
 .../cloud/AbstractFullDistribZkTestBase.java    |   5 +-
 24 files changed, 2112 insertions(+), 403 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/27ef6530/solr/CHANGES.txt
----------------------------------------------------------------------
diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt
index 2943f6c..57d40b3 100644
--- a/solr/CHANGES.txt
+++ b/solr/CHANGES.txt
@@ -71,6 +71,10 @@ Upgrade Notes
   Before 7.3, the copied over configset was named the same as the collection name, but 7.3 onwards it will be named
   with an additional ".AUTOCREATED" suffix.
 
+* SOLR-11702: The old LIR implementation (SOLR-5495) is now deprecated and replaced.
+  Solr will support rolling upgrades from old 7.x versions of Solr to the new one until
+  the last release of the 7.x major version.
+
 New Features
 ----------------------
 * SOLR-11285: Simulation framework for autoscaling. (ab)
@@ -113,6 +117,8 @@ New Features
 * SOLR-11617: Alias metadata is now mutable via a new MODIFYALIAS command.  Metadata is returned from LISTALIASES.
   (Gus Heck via David Smiley)
 
+* SOLR-11702: Redesign current LIR implementation (Cao Manh Dat, shalin)
+
 Bug Fixes
 ----------------------
 

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/27ef6530/solr/core/src/java/org/apache/solr/client/solrj/embedded/JettySolrRunner.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/client/solrj/embedded/JettySolrRunner.java b/solr/core/src/java/org/apache/solr/client/solrj/embedded/JettySolrRunner.java
index e5b81f8..23a8dc1 100644
--- a/solr/core/src/java/org/apache/solr/client/solrj/embedded/JettySolrRunner.java
+++ b/solr/core/src/java/org/apache/solr/client/solrj/embedded/JettySolrRunner.java
@@ -84,7 +84,7 @@ public class JettySolrRunner {
   FilterHolder debugFilter;
 
   private boolean waitOnSolr = false;
-  private int lastPort = -1;
+  private int jettyPort = -1;
 
   private final JettyConfig config;
   private final String solrHome;
@@ -280,8 +280,10 @@ public class JettySolrRunner {
       @Override
       public void lifeCycleStarted(LifeCycle arg0) {
 
-        lastPort = getFirstConnectorPort();
-        nodeProperties.setProperty("hostPort", Integer.toString(lastPort));
+        jettyPort = getFirstConnectorPort();
+        int port = jettyPort;
+        if (proxyPort != -1) port = proxyPort;
+        nodeProperties.setProperty("hostPort", Integer.toString(port));
         nodeProperties.setProperty("hostContext", config.context);
 
         root.getServletContext().setAttribute(SolrDispatchFilter.PROPERTIES_ATTRIBUTE, nodeProperties);
@@ -384,7 +386,7 @@ public class JettySolrRunner {
       // if started before, make a new server
       if (startedBefore) {
         waitOnSolr = false;
-        int port = reusePort ? lastPort : this.config.port;
+        int port = reusePort ? jettyPort : this.config.port;
         init(port);
       } else {
         startedBefore = true;
@@ -456,7 +458,7 @@ public class JettySolrRunner {
     if (0 == conns.length) {
       throw new RuntimeException("Jetty Server has no Connectors");
     }
-    return (proxyPort != -1) ? proxyPort : ((ServerConnector) conns[0]).getLocalPort();
+    return ((ServerConnector) conns[0]).getLocalPort();
   }
   
   /**
@@ -465,10 +467,10 @@ public class JettySolrRunner {
    * @exception RuntimeException if there is no Connector
    */
   public int getLocalPort() {
-    if (lastPort == -1) {
+    if (jettyPort == -1) {
       throw new IllegalStateException("You cannot get the port until this instance has started");
     }
-    return (proxyPort != -1) ? proxyPort : lastPort;
+    return (proxyPort != -1) ? proxyPort : jettyPort;
   }
   
   /**

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/27ef6530/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java b/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java
index 7169ea8..2d00151 100644
--- a/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java
+++ b/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java
@@ -20,6 +20,7 @@ import java.io.Closeable;
 import java.io.IOException;
 import java.lang.invoke.MethodHandles;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.EnumSet;
 import java.util.List;
 import java.util.concurrent.Future;
@@ -491,7 +492,7 @@ final class ShardLeaderElectionContext extends ShardLeaderElectionContextBase {
             rejoinLeaderElection(core);
           }
         }
-        
+
         if (isLeader) {
           // check for any replicas in my shard that were set to down by the previous leader
           try {
@@ -530,6 +531,7 @@ final class ShardLeaderElectionContext extends ShardLeaderElectionContextBase {
     return docCollection.getReplica(replicaName);
   }
 
+  @Deprecated
   public void checkLIR(String coreName, boolean allReplicasInLine)
       throws InterruptedException, KeeperException, IOException {
     if (allReplicasInLine) {
@@ -551,7 +553,7 @@ final class ShardLeaderElectionContext extends ShardLeaderElectionContextBase {
               leaderProps.getStr(ZkStateReader.CORE_NODE_NAME_PROP), Replica.State.ACTIVE, core.getCoreDescriptor(), true);
         }
       }
-      
+
     } else {
       try (SolrCore core = cc.getCore(coreName)) {
         if (core != null) {
@@ -567,7 +569,8 @@ final class ShardLeaderElectionContext extends ShardLeaderElectionContextBase {
       }
     }
   }
-  
+
+  @Deprecated
   private void startLeaderInitiatedRecoveryOnReplicas(String coreName) throws Exception {
     try (SolrCore core = cc.getCore(coreName)) {
       CloudDescriptor cloudDesc = core.getCoreDescriptor().getCloudDescriptor();
@@ -577,10 +580,10 @@ final class ShardLeaderElectionContext extends ShardLeaderElectionContextBase {
 
       if (coll == null || shardId == null) {
         log.error("Cannot start leader-initiated recovery on new leader (core="+
-           coreName+",coreNodeName=" + coreNodeName + ") because collection and/or shard is null!");
+            coreName+",coreNodeName=" + coreNodeName + ") because collection and/or shard is null!");
         return;
       }
-      
+
       String znodePath = zkController.getLeaderInitiatedRecoveryZnodePath(coll, shardId);
       List<String> replicas = null;
       try {
@@ -588,21 +591,28 @@ final class ShardLeaderElectionContext extends ShardLeaderElectionContextBase {
       } catch (NoNodeException nne) {
         // this can be ignored
       }
-      
+
       if (replicas != null && replicas.size() > 0) {
         for (String replicaCoreNodeName : replicas) {
-          
+
           if (coreNodeName.equals(replicaCoreNodeName))
             continue; // added safe-guard so we don't mark this core as down
-          
+
+          if (zkController.getShardTerms(collection, shardId).registered(replicaCoreNodeName)) {
+            // the replica registered its term so it is running with the new LIR implementation
+            // we can put this replica into recovery by increase our terms
+            zkController.getShardTerms(collection, shardId).ensureTermsIsHigher(coreNodeName, Collections.singleton(replicaCoreNodeName));
+            continue;
+          }
+
           final Replica.State lirState = zkController.getLeaderInitiatedRecoveryState(coll, shardId, replicaCoreNodeName);
           if (lirState == Replica.State.DOWN || lirState == Replica.State.RECOVERY_FAILED) {
             log.info("After core={} coreNodeName={} was elected leader, a replica coreNodeName={} was found in state: "
                 + lirState.toString() + " and needing recovery.", coreName, coreNodeName, replicaCoreNodeName);
-            List<ZkCoreNodeProps> replicaProps =  
+            List<ZkCoreNodeProps> replicaProps =
                 zkController.getZkStateReader().getReplicaProps(collection, shardId, coreNodeName);
-            
-            if (replicaProps != null && replicaProps.size() > 0) {                
+
+            if (replicaProps != null && replicaProps.size() > 0) {
               ZkCoreNodeProps coreNodeProps = null;
               for (ZkCoreNodeProps p : replicaProps) {
                 if (((Replica)p.getNodeProps()).getName().equals(replicaCoreNodeName)) {
@@ -610,17 +620,18 @@ final class ShardLeaderElectionContext extends ShardLeaderElectionContextBase {
                   break;
                 }
               }
-              
+
               zkController.ensureReplicaInLeaderInitiatedRecovery(cc,
                   collection, shardId, coreNodeProps, core.getCoreDescriptor(),
                   false /* forcePublishState */);
-            }              
+            }
           }
         }
       }
-    } // core gets closed automagically    
+    } // core gets closed automagically
   }
 
+
   // returns true if all replicas are found to be up, false if not
   private boolean waitForReplicasToComeUp(int timeoutms) throws InterruptedException {
     long timeoutAt = System.nanoTime() + TimeUnit.NANOSECONDS.convert(timeoutms, TimeUnit.MILLISECONDS);
@@ -743,7 +754,14 @@ final class ShardLeaderElectionContext extends ShardLeaderElectionContextBase {
       // to make sure others participate in sync and leader election, we can be leader
       return true;
     }
-    
+
+    String coreNodeName = core.getCoreDescriptor().getCloudDescriptor().getCoreNodeName();
+    if (zkController.getShardTerms(collection, shardId).registered(coreNodeName)
+        && !zkController.getShardTerms(collection, shardId).canBecomeLeader(coreNodeName)) {
+      log.info("Can't become leader, term of replica {} less than leader", coreNodeName);
+      return false;
+    }
+
     if (core.getCoreDescriptor().getCloudDescriptor().getLastPublished() == Replica.State.ACTIVE) {
       log.debug("My last published State was Active, it's okay to be the leader.");
       return true;

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/27ef6530/solr/core/src/java/org/apache/solr/cloud/LeaderInitiatedRecoveryThread.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/LeaderInitiatedRecoveryThread.java b/solr/core/src/java/org/apache/solr/cloud/LeaderInitiatedRecoveryThread.java
index 8c892ce..9c46236 100644
--- a/solr/core/src/java/org/apache/solr/cloud/LeaderInitiatedRecoveryThread.java
+++ b/solr/core/src/java/org/apache/solr/cloud/LeaderInitiatedRecoveryThread.java
@@ -45,6 +45,7 @@ import java.util.List;
  * replica; used by a shard leader to nag a replica into recovering after the
  * leader experiences an error trying to send an update request to the replica.
  */
+@Deprecated
 public class LeaderInitiatedRecoveryThread extends Thread {
 
   private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/27ef6530/solr/core/src/java/org/apache/solr/cloud/RecoveringCoreTermWatcher.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/RecoveringCoreTermWatcher.java b/solr/core/src/java/org/apache/solr/cloud/RecoveringCoreTermWatcher.java
new file mode 100644
index 0000000..26fec97
--- /dev/null
+++ b/solr/core/src/java/org/apache/solr/cloud/RecoveringCoreTermWatcher.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.cloud;
+
+import java.lang.invoke.MethodHandles;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.solr.core.SolrCore;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Start recovery of a core if its term is less than leader's term
+ */
+public class RecoveringCoreTermWatcher implements ZkShardTerms.CoreTermWatcher {
+  private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+  private final SolrCore solrCore;
+  // used to prevent the case when term of other replicas get changed, we redo recovery
+  // the idea here is with a specific term of a replica, we only do recovery one
+  private final AtomicLong lastTermDoRecovery;
+
+  RecoveringCoreTermWatcher(SolrCore solrCore) {
+    this.solrCore = solrCore;
+    this.lastTermDoRecovery = new AtomicLong(-1);
+  }
+
+  @Override
+  public boolean onTermChanged(ZkShardTerms.Terms terms) {
+    if (solrCore.isClosed()) {
+      return false;
+    }
+
+    if (solrCore.getCoreDescriptor() == null || solrCore.getCoreDescriptor().getCloudDescriptor() == null) return true;
+
+    String coreNodeName = solrCore.getCoreDescriptor().getCloudDescriptor().getCoreNodeName();
+    if (terms.canBecomeLeader(coreNodeName)) return true;
+    if (lastTermDoRecovery.get() < terms.getTerm(coreNodeName)) {
+      log.info("Start recovery on {} because core's term is less than leader's term", coreNodeName);
+      lastTermDoRecovery.set(terms.getTerm(coreNodeName));
+      solrCore.getUpdateHandler().getSolrCoreState().doRecovery(solrCore.getCoreContainer(), solrCore.getCoreDescriptor());
+    }
+
+    return true;
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) return true;
+    if (o == null || getClass() != o.getClass()) return false;
+
+    RecoveringCoreTermWatcher that = (RecoveringCoreTermWatcher) o;
+
+    return solrCore.equals(that.solrCore);
+  }
+
+  @Override
+  public int hashCode() {
+    return solrCore.hashCode();
+  }
+}

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/27ef6530/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java b/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java
index 3ab4eca..63dfe19 100644
--- a/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java
+++ b/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java
@@ -35,8 +35,10 @@ import org.apache.solr.client.solrj.impl.HttpSolrClient.HttpUriRequestResponse;
 import org.apache.solr.client.solrj.request.AbstractUpdateRequest;
 import org.apache.solr.client.solrj.request.CoreAdminRequest.WaitForState;
 import org.apache.solr.client.solrj.request.UpdateRequest;
+import org.apache.solr.client.solrj.response.SolrPingResponse;
 import org.apache.solr.common.SolrException;
 import org.apache.solr.common.SolrException.ErrorCode;
+import org.apache.solr.common.cloud.DocCollection;
 import org.apache.solr.common.cloud.Replica;
 import org.apache.solr.common.cloud.Slice;
 import org.apache.solr.common.cloud.ZkCoreNodeProps;
@@ -458,7 +460,7 @@ public class RecoveryStrategy implements Runnable, Closeable {
           core.getCoreDescriptor());
       return;
     }
-    
+
     // we temporary ignore peersync for tlog replicas
     boolean firstTime = replicaType != Replica.Type.TLOG;
 
@@ -516,21 +518,18 @@ public class RecoveryStrategy implements Runnable, Closeable {
       zkController.stopReplicationFromLeader(coreName);
     }
 
+    final String ourUrl = ZkCoreNodeProps.getCoreUrl(baseUrl, coreName);
     Future<RecoveryInfo> replayFuture = null;
     while (!successfulRecovery && !Thread.currentThread().isInterrupted() && !isClosed()) { // don't use interruption or it will close channels though
       try {
         CloudDescriptor cloudDesc = core.getCoreDescriptor().getCloudDescriptor();
-        ZkNodeProps leaderprops = zkStateReader.getLeaderRetry(
-            cloudDesc.getCollectionName(), cloudDesc.getShardId());
-      
-        final String leaderBaseUrl = leaderprops.getStr(ZkStateReader.BASE_URL_PROP);
-        final String leaderCoreName = leaderprops.getStr(ZkStateReader.CORE_NAME_PROP);
-
-        String leaderUrl = ZkCoreNodeProps.getCoreUrl(leaderBaseUrl, leaderCoreName);
-
-        String ourUrl = ZkCoreNodeProps.getCoreUrl(baseUrl, coreName);
+        final Replica leader = pingLeader(ourUrl, core.getCoreDescriptor(), true);
+        if (isClosed()) {
+          LOG.info("RecoveryStrategy has been closed");
+          break;
+        }
 
-        boolean isLeader = leaderUrl.equals(ourUrl);
+        boolean isLeader = leader.getCoreUrl().equals(ourUrl);
         if (isLeader && !cloudDesc.isLeader()) {
           throw new SolrException(ErrorCode.SERVER_ERROR, "Cloud state still says we are leader.");
         }
@@ -541,12 +540,12 @@ public class RecoveryStrategy implements Runnable, Closeable {
           zkController.publish(core.getCoreDescriptor(), Replica.State.ACTIVE);
           return;
         }
-        
+
         LOG.info("Begin buffering updates. core=[{}]", coreName);
         ulog.bufferUpdates();
         replayed = false;
         
-        LOG.info("Publishing state of core [{}] as recovering, leader is [{}] and I am [{}]", core.getName(), leaderUrl,
+        LOG.info("Publishing state of core [{}] as recovering, leader is [{}] and I am [{}]", core.getName(), leader.getCoreUrl(),
             ourUrl);
         zkController.publish(core.getCoreDescriptor(), Replica.State.RECOVERING);
         
@@ -565,7 +564,7 @@ public class RecoveryStrategy implements Runnable, Closeable {
           break;
         }
 
-        sendPrepRecoveryCmd(leaderBaseUrl, leaderCoreName, slice);
+        sendPrepRecoveryCmd(leader.getBaseUrl(), leader.getCoreName(), slice);
         
         if (isClosed()) {
           LOG.info("RecoveryStrategy has been closed");
@@ -585,11 +584,11 @@ public class RecoveryStrategy implements Runnable, Closeable {
         // first thing we just try to sync
         if (firstTime) {
           firstTime = false; // only try sync the first time through the loop
-          LOG.info("Attempting to PeerSync from [{}] - recoveringAfterStartup=[{}]", leaderUrl, recoveringAfterStartup);
+          LOG.info("Attempting to PeerSync from [{}] - recoveringAfterStartup=[{}]", leader.getCoreUrl(), recoveringAfterStartup);
           // System.out.println("Attempting to PeerSync from " + leaderUrl
           // + " i am:" + zkController.getNodeName());
           PeerSync peerSync = new PeerSync(core,
-              Collections.singletonList(leaderUrl), ulog.getNumRecordsToKeep(), false, false);
+              Collections.singletonList(leader.getCoreUrl()), ulog.getNumRecordsToKeep(), false, false);
           peerSync.setStartingVersions(recentVersions);
           boolean syncSuccess = peerSync.sync().isSuccess();
           if (syncSuccess) {
@@ -623,7 +622,7 @@ public class RecoveryStrategy implements Runnable, Closeable {
 
         try {
 
-          replicate(zkController.getNodeName(), core, leaderprops);
+          replicate(zkController.getNodeName(), core, leader);
 
           if (isClosed()) {
             LOG.info("RecoveryStrategy has been closed");
@@ -745,6 +744,48 @@ public class RecoveryStrategy implements Runnable, Closeable {
     LOG.info("Finished recovery process, successful=[{}]", Boolean.toString(successfulRecovery));
   }
 
+  private final Replica pingLeader(String ourUrl, CoreDescriptor coreDesc, boolean mayPutReplicaAsDown) throws Exception {
+    int numTried = 0;
+    while (true) {
+      CloudDescriptor cloudDesc = coreDesc.getCloudDescriptor();
+      DocCollection docCollection = zkStateReader.getClusterState().getCollection(cloudDesc.getCollectionName());
+      if (mayPutReplicaAsDown && numTried == 1 &&
+          docCollection.getReplica(coreDesc.getCloudDescriptor().getCoreNodeName()).getState() == Replica.State.ACTIVE) {
+        // this operation may take a long time, by putting replica into DOWN state, client won't query this replica
+        zkController.publish(coreDesc, Replica.State.DOWN);
+      }
+      numTried++;
+      final Replica leaderReplica = zkStateReader.getLeaderRetry(
+          cloudDesc.getCollectionName(), cloudDesc.getShardId());
+
+      if (isClosed()) {
+        return leaderReplica;
+      }
+
+      if (leaderReplica.getCoreUrl().equals(ourUrl)) {
+        return leaderReplica;
+      }
+
+      try (HttpSolrClient httpSolrClient = new HttpSolrClient.Builder(leaderReplica.getCoreUrl())
+          .withSocketTimeout(1000)
+          .withConnectionTimeout(1000)
+          .build()) {
+        SolrPingResponse resp = httpSolrClient.ping();
+        return leaderReplica;
+      } catch (IOException e) {
+        LOG.info("Failed to connect leader {} on recovery, try again", leaderReplica.getBaseUrl(), e);
+        Thread.sleep(500);
+      } catch (Exception e) {
+        if (e.getCause() instanceof IOException) {
+          LOG.info("Failed to connect leader {} on recovery, try again", leaderReplica.getBaseUrl(), e);
+          Thread.sleep(500);
+        } else {
+          return leaderReplica;
+        }
+      }
+    }
+  }
+
   public static Runnable testing_beforeReplayBufferingUpdates;
 
   final private Future<RecoveryInfo> replay(SolrCore core)

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/27ef6530/solr/core/src/java/org/apache/solr/cloud/ZkCollectionTerms.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/ZkCollectionTerms.java b/solr/core/src/java/org/apache/solr/cloud/ZkCollectionTerms.java
new file mode 100644
index 0000000..b232f9b
--- /dev/null
+++ b/solr/core/src/java/org/apache/solr/cloud/ZkCollectionTerms.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.cloud;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.solr.common.cloud.SolrZkClient;
+import org.apache.solr.common.util.ObjectReleaseTracker;
+import org.apache.solr.core.CoreDescriptor;
+
+/**
+ * Used to manage all ZkShardTerms of a collection
+ */
+class ZkCollectionTerms implements AutoCloseable {
+  private final String collection;
+  private final Map<String, ZkShardTerms> terms;
+  private final SolrZkClient zkClient;
+
+  ZkCollectionTerms(String collection, SolrZkClient client) {
+    this.collection = collection;
+    this.terms = new HashMap<>();
+    this.zkClient = client;
+    ObjectReleaseTracker.track(this);
+  }
+
+
+  public ZkShardTerms getShard(String shardId) {
+    synchronized (terms) {
+      if (!terms.containsKey(shardId)) terms.put(shardId, new ZkShardTerms(collection, shardId, zkClient));
+      return terms.get(shardId);
+    }
+  }
+
+  public void remove(String shardId, CoreDescriptor coreDescriptor) {
+    synchronized (terms) {
+      if (getShard(shardId).removeTerm(coreDescriptor)) {
+        terms.remove(shardId).close();
+      }
+    }
+  }
+
+  public void close() {
+    synchronized (terms) {
+      terms.values().forEach(ZkShardTerms::close);
+    }
+    ObjectReleaseTracker.release(this);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/27ef6530/solr/core/src/java/org/apache/solr/cloud/ZkController.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/ZkController.java b/solr/core/src/java/org/apache/solr/cloud/ZkController.java
index 365da65..7898e96 100644
--- a/solr/core/src/java/org/apache/solr/cloud/ZkController.java
+++ b/solr/core/src/java/org/apache/solr/cloud/ZkController.java
@@ -193,7 +193,6 @@ public class ZkController {
   private final Map<ContextKey, ElectionContext> electionContexts = Collections.synchronizedMap(new HashMap<>());
 
   private final SolrZkClient zkClient;
-  private final ZkCmdExecutor cmdExecutor;
   public final ZkStateReader zkStateReader;
   private SolrCloudManager cloudManager;
   private CloudSolrClient cloudSolrClient;
@@ -210,6 +209,7 @@ public class ZkController {
   private LeaderElector overseerElector;
 
   private Map<String, ReplicateFromLeader> replicateFromLeaders = new ConcurrentHashMap<>();
+  private final Map<String, ZkCollectionTerms> collectionToTerms = new HashMap<>();
 
   // for now, this can be null in tests, in which case recovery will be inactive, and other features
   // may accept defaults or use mocks rather than pulling things from a CoreContainer
@@ -226,6 +226,7 @@ public class ZkController {
 
   private volatile boolean isClosed;
 
+  @Deprecated
   // keeps track of replicas that have been asked to recover by leaders running on this node
   private final Map<String, String> replicasInLeaderInitiatedRecovery = new HashMap<String, String>();
 
@@ -323,7 +324,7 @@ public class ZkController {
           @Override
           public void command() {
             log.info("ZooKeeper session re-connected ... refreshing core states after session expiration.");
-
+            clearZkCollectionTerms();
             try {
               zkStateReader.createClusterStateWatchersAndUpdate();
 
@@ -435,7 +436,6 @@ public class ZkController {
     this.overseerRunningMap = Overseer.getRunningMap(zkClient);
     this.overseerCompletedMap = Overseer.getCompletedMap(zkClient);
     this.overseerFailureMap = Overseer.getFailureMap(zkClient);
-    cmdExecutor = new ZkCmdExecutor(clientTimeout);
     zkStateReader = new ZkStateReader(zkClient, () -> {
       if (cc != null) cc.securityNodeChanged();
     });
@@ -547,6 +547,9 @@ public class ZkController {
    */
   public void close() {
     this.isClosed = true;
+    synchronized (collectionToTerms) {
+      collectionToTerms.values().forEach(ZkCollectionTerms::close);
+    }
     try {
       for (ElectionContext context : electionContexts.values()) {
         try {
@@ -1034,7 +1037,14 @@ public class ZkController {
       
       final String coreZkNodeName = desc.getCloudDescriptor().getCoreNodeName();
       assert coreZkNodeName != null : "we should have a coreNodeName by now";
-      
+
+      ZkShardTerms shardTerms = getShardTerms(collection, cloudDesc.getShardId());
+
+      // This flag is used for testing rolling updates and should be removed in SOLR-11812
+      boolean isRunningInNewLIR = "new".equals(desc.getCoreProperty("lirVersion", "new"));
+      if (isRunningInNewLIR) {
+        shardTerms.registerTerm(coreZkNodeName);
+      }
       String shardId = cloudDesc.getShardId();
       Map<String,Object> props = new HashMap<>();
       // we only put a subset of props into the leader node
@@ -1118,15 +1128,17 @@ public class ZkController {
           }
         }
         boolean didRecovery
-            = checkRecovery(recoverReloadedCores, isLeader, skipRecovery, collection, coreZkNodeName, core, cc, afterExpiration);
+            = checkRecovery(recoverReloadedCores, isLeader, skipRecovery, collection, coreZkNodeName, shardId, core, cc, afterExpiration);
         if (!didRecovery) {
           if (isTlogReplicaAndNotLeader) {
             startReplicationFromLeader(coreName, true);
           }
           publish(desc, Replica.State.ACTIVE);
         }
-        
-        
+
+        if (isRunningInNewLIR && replicaType != Type.PULL) {
+          shardTerms.addListener(new RecoveringCoreTermWatcher(core));
+        }
         core.getCoreDescriptor().getCloudDescriptor().setHasRegistered(true);
       }
       
@@ -1295,7 +1307,7 @@ public class ZkController {
    * Returns whether or not a recovery was started
    */
   private boolean checkRecovery(boolean recoverReloadedCores, final boolean isLeader, boolean skipRecovery,
-                                final String collection, String shardId,
+                                final String collection, String coreZkNodeName, String shardId,
                                 SolrCore core, CoreContainer cc, boolean afterExpiration) {
     if (SKIP_AUTO_RECOVERY) {
       log.warn("Skipping recovery according to sys prop solrcloud.skip.autorecovery");
@@ -1322,6 +1334,13 @@ public class ZkController {
         core.getUpdateHandler().getSolrCoreState().doRecovery(cc, core.getCoreDescriptor());
         return true;
       }
+
+      ZkShardTerms zkShardTerms = getShardTerms(collection, shardId);
+      if (zkShardTerms.registered(coreZkNodeName) && !zkShardTerms.canBecomeLeader(coreZkNodeName)) {
+        log.info("Leader's term larger than core " + core.getName() + "; starting recovery process");
+        core.getUpdateHandler().getSolrCoreState().doRecovery(cc, core.getCoreDescriptor());
+        return true;
+      }
     } else {
       log.info("I am the leader, no recovery necessary");
     }
@@ -1372,6 +1391,7 @@ public class ZkController {
       String shardId = cd.getCloudDescriptor().getShardId();
       
       String coreNodeName = cd.getCloudDescriptor().getCoreNodeName();
+
       // If the leader initiated recovery, then verify that this replica has performed
       // recovery as requested before becoming active; don't even look at lirState if going down
       if (state != Replica.State.DOWN) {
@@ -1394,7 +1414,7 @@ public class ZkController {
           }
         }
       }
-      
+
       Map<String,Object> props = new HashMap<>();
       props.put(Overseer.QUEUE_OPERATION, "state");
       props.put(ZkStateReader.STATE_PROP, state.toString());
@@ -1430,6 +1450,11 @@ public class ZkController {
         log.info("The core '{}' had failed to initialize before.", cd.getName());
       }
 
+      // This flag is used for testing rolling updates and should be removed in SOLR-11812
+      boolean isRunningInNewLIR = "new".equals(cd.getCoreProperty("lirVersion", "new"));
+      if (state == Replica.State.RECOVERING && isRunningInNewLIR) {
+        getShardTerms(collection, shardId).setEqualsToMax(coreNodeName);
+      }
       ZkNodeProps m = new ZkNodeProps(props);
       
       if (updateLastState) {
@@ -1441,23 +1466,28 @@ public class ZkController {
     }
   }
 
-  private boolean needsToBeAssignedShardId(final CoreDescriptor desc,
-                                           final ClusterState state, final String coreNodeName) {
-
-    final CloudDescriptor cloudDesc = desc.getCloudDescriptor();
+  public ZkShardTerms getShardTerms(String collection, String shardId) {
+    return getCollectionTerms(collection).getShard(shardId);
+  }
 
-    final String shardId = state.getShardId(getNodeName(), desc.getName());
+  private ZkCollectionTerms getCollectionTerms(String collection) {
+    synchronized (collectionToTerms) {
+      if (!collectionToTerms.containsKey(collection)) collectionToTerms.put(collection, new ZkCollectionTerms(collection, zkClient));
+      return collectionToTerms.get(collection);
+    }
+  }
 
-    if (shardId != null) {
-      cloudDesc.setShardId(shardId);
-      return false;
+  public void clearZkCollectionTerms() {
+    synchronized (collectionToTerms) {
+      collectionToTerms.values().forEach(ZkCollectionTerms::close);
+      collectionToTerms.clear();
     }
-    return true;
   }
 
   public void unregister(String coreName, CoreDescriptor cd) throws Exception {
     final String coreNodeName = cd.getCloudDescriptor().getCoreNodeName();
     final String collection = cd.getCloudDescriptor().getCollectionName();
+    getCollectionTerms(collection).remove(cd.getCloudDescriptor().getShardId(), cd);
 
     if (Strings.isNullOrEmpty(collection)) {
       log.error("No collection was specified.");
@@ -1733,7 +1763,7 @@ public class ZkController {
     boolean isLeader = leaderProps.getCoreUrl().equals(ourUrl);
     if (!isLeader && !SKIP_AUTO_RECOVERY) {
 
-      // detect if this core is in leader-initiated recovery and if so, 
+      // detect if this core is in leader-initiated recovery and if so,
       // then we don't need the leader to wait on seeing the down state
       Replica.State lirState = null;
       try {
@@ -1743,9 +1773,9 @@ public class ZkController {
             " is in leader-initiated recovery due to: " + exc, exc);
       }
 
-      if (lirState != null) {
-        log.debug("Replica " + myCoreNodeName +
-            " is already in leader-initiated recovery, so not waiting for leader to see down state.");
+      if (lirState != null || !getShardTerms(collection, shard).canBecomeLeader(myCoreNodeName)) {
+        log.debug("Term of replica " + myCoreNodeName +
+            " is already less than leader, so not waiting for leader to see down state.");
       } else {
 
         log.info("Replica " + myCoreNodeName +
@@ -2055,6 +2085,7 @@ public class ZkController {
    * false means the node is not live either, so no point in trying to send recovery commands
    * to it.
    */
+  @Deprecated
   public boolean ensureReplicaInLeaderInitiatedRecovery(
       final CoreContainer container,
       final String collection, final String shardId, final ZkCoreNodeProps replicaCoreProps,
@@ -2117,13 +2148,14 @@ public class ZkController {
                 " is not live, so skipping leader-initiated recovery for replica: core={} coreNodeName={}",
             replicaCoreProps.getCoreName(), replicaCoreNodeName);
         // publishDownState will be false to avoid publishing the "down" state too many times
-        // as many errors can occur together and will each call into this method (SOLR-6189)        
+        // as many errors can occur together and will each call into this method (SOLR-6189)
       }
     }
 
     return nodeIsLive;
   }
 
+  @Deprecated
   public boolean isReplicaInRecoveryHandling(String replicaUrl) {
     boolean exists = false;
     synchronized (replicasInLeaderInitiatedRecovery) {
@@ -2132,12 +2164,14 @@ public class ZkController {
     return exists;
   }
 
+  @Deprecated
   public void removeReplicaFromLeaderInitiatedRecoveryHandling(String replicaUrl) {
     synchronized (replicasInLeaderInitiatedRecovery) {
       replicasInLeaderInitiatedRecovery.remove(replicaUrl);
     }
   }
 
+  @Deprecated
   public Replica.State getLeaderInitiatedRecoveryState(String collection, String shardId, String coreNodeName) {
     final Map<String, Object> stateObj = getLeaderInitiatedRecoveryStateObject(collection, shardId, coreNodeName);
     if (stateObj == null) {
@@ -2147,6 +2181,7 @@ public class ZkController {
     return stateStr == null ? null : Replica.State.getState(stateStr);
   }
 
+  @Deprecated
   public Map<String, Object> getLeaderInitiatedRecoveryStateObject(String collection, String shardId, String coreNodeName) {
 
     if (collection == null || shardId == null || coreNodeName == null)
@@ -2191,6 +2226,7 @@ public class ZkController {
     return stateObj;
   }
 
+  @Deprecated
   public void updateLeaderInitiatedRecoveryState(String collection, String shardId, String coreNodeName,
       Replica.State state, CoreDescriptor leaderCd, boolean retryOnConnLoss) {
     if (collection == null || shardId == null || coreNodeName == null) {
@@ -2199,12 +2235,12 @@ public class ZkController {
           + "; shardId=" + shardId + "; coreNodeName=" + coreNodeName);
       return; // if we don't have complete data about a core in cloud mode, do nothing
     }
-    
+
     assert leaderCd != null;
     assert leaderCd.getCloudDescriptor() != null;
 
     String leaderCoreNodeName = leaderCd.getCloudDescriptor().getCoreNodeName();
-    
+
     String znodePath = getLeaderInitiatedRecoveryZnodePath(collection, shardId, coreNodeName);
 
     if (state == Replica.State.ACTIVE) {
@@ -2269,29 +2305,29 @@ public class ZkController {
   private void markShardAsDownIfLeader(String collection, String shardId, CoreDescriptor leaderCd,
                                        String znodePath, byte[] znodeData,
                                        boolean retryOnConnLoss) throws KeeperException, InterruptedException {
-    
+
 
     if (!leaderCd.getCloudDescriptor().isLeader()) {
       log.info("No longer leader, aborting attempt to mark shard down as part of LIR");
       throw new NotLeaderException(ErrorCode.SERVER_ERROR, "Locally, we do not think we are the leader.");
     }
-    
+
     ContextKey key = new ContextKey(collection, leaderCd.getCloudDescriptor().getCoreNodeName());
     ElectionContext context = electionContexts.get(key);
-    
+
     // we make sure we locally think we are the leader before and after getting the context - then
     // we only try zk if we still think we are the leader and have our leader context
     if (context == null || !leaderCd.getCloudDescriptor().isLeader()) {
       log.info("No longer leader, aborting attempt to mark shard down as part of LIR");
       throw new NotLeaderException(ErrorCode.SERVER_ERROR, "Locally, we do not think we are the leader.");
     }
-    
+
     // we think we are the leader - get the expected shard leader version
     // we use this version and multi to ensure *only* the current zk registered leader
     // for a shard can put a replica into LIR
-    
+
     Integer leaderZkNodeParentVersion = ((ShardLeaderElectionContextBase)context).getLeaderZkNodeParentVersion();
-    
+
     // TODO: should we do this optimistically to avoid races?
     if (zkClient.exists(znodePath, retryOnConnLoss)) {
       List<Op> ops = new ArrayList<>(2);
@@ -2306,7 +2342,7 @@ public class ZkController {
       } catch (KeeperException.NodeExistsException nee) {
         // if it exists, that's great!
       }
-      
+
       // we only create the entry if the context we are using is registered as the current leader in ZK
       List<Op> ops = new ArrayList<>(2);
       ops.add(Op.check(new org.apache.hadoop.fs.Path(((ShardLeaderElectionContextBase)context).leaderPath).getParent().toString(), leaderZkNodeParentVersion));
@@ -2316,11 +2352,13 @@ public class ZkController {
     }
   }
 
-  public String getLeaderInitiatedRecoveryZnodePath(String collection, String shardId) {
+  @Deprecated
+  public static String getLeaderInitiatedRecoveryZnodePath(String collection, String shardId) {
     return "/collections/" + collection + "/leader_initiated_recovery/" + shardId;
   }
 
-  public String getLeaderInitiatedRecoveryZnodePath(String collection, String shardId, String coreNodeName) {
+  @Deprecated
+  public static String getLeaderInitiatedRecoveryZnodePath(String collection, String shardId, String coreNodeName) {
     return getLeaderInitiatedRecoveryZnodePath(collection, shardId) + "/" + coreNodeName;
   }
 
@@ -2608,12 +2646,6 @@ public class ZkController {
     };
   }
 
-  public String getLeaderSeqPath(String collection, String coreNodeName) {
-    ContextKey key = new ContextKey(collection, coreNodeName);
-    ElectionContext context = electionContexts.get(key);
-    return context != null ? context.leaderSeqPath : null;
-  }
-
   /**
    * Thrown during leader initiated recovery process if current node is not leader
    */

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/27ef6530/solr/core/src/java/org/apache/solr/cloud/ZkShardTerms.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/ZkShardTerms.java b/solr/core/src/java/org/apache/solr/cloud/ZkShardTerms.java
new file mode 100644
index 0000000..7dc0d57
--- /dev/null
+++ b/solr/core/src/java/org/apache/solr/cloud/ZkShardTerms.java
@@ -0,0 +1,475 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.cloud;
+
+import java.lang.invoke.MethodHandles;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.solr.common.SolrException;
+import org.apache.solr.common.cloud.SolrZkClient;
+import org.apache.solr.common.cloud.ZkStateReader;
+import org.apache.solr.common.util.ObjectReleaseTracker;
+import org.apache.solr.common.util.Utils;
+import org.apache.solr.core.CoreDescriptor;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.data.Stat;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Class used for interact with a ZK term node.
+ * Each ZK term node relates to a shard of a collection and have this format (in json)
+ * <p>
+ * <code>
+ * {
+ *   "replicaNodeName1" : 1,
+ *   "replicaNodeName2" : 2,
+ *   ..
+ * }
+ * </code>
+ * <p>
+ * The values correspond to replicas are called terms.
+ * Only replicas with highest term value are considered up to date and be able to become leader and serve queries.
+ * <p>
+ * Terms can only updated in two strict ways:
+ * <ul>
+ * <li>A replica sets its term equals to leader's term
+ * <li>The leader increase its term and some other replicas by 1
+ * </ul>
+ * This class should not be reused after {@link org.apache.zookeeper.Watcher.Event.KeeperState#Expired} event
+ */
+public class ZkShardTerms implements AutoCloseable{
+
+  private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+  private final Object writingLock = new Object();
+  private final String collection;
+  private final String shard;
+  private final String znodePath;
+  private final SolrZkClient zkClient;
+  private final Set<CoreTermWatcher> listeners = new HashSet<>();
+  private final AtomicBoolean isClosed = new AtomicBoolean(false);
+
+  private Terms terms;
+
+  // Listener of a core for shard's term change events
+  interface CoreTermWatcher {
+    // return true if the listener wanna to be triggered in the next time
+    boolean onTermChanged(Terms terms);
+  }
+
+  public ZkShardTerms(String collection, String shard, SolrZkClient zkClient) {
+    this.znodePath = ZkStateReader.COLLECTIONS_ZKNODE + "/" + collection + "/terms/" + shard;
+    this.collection = collection;
+    this.shard = shard;
+    this.zkClient = zkClient;
+    ensureTermNodeExist();
+    refreshTerms();
+    retryRegisterWatcher();
+    ObjectReleaseTracker.track(this);
+  }
+
+  /**
+   * Ensure that leader's term is higher than some replica's terms
+   * @param leader coreNodeName of leader
+   * @param replicasNeedingRecovery set of replicas in which their terms should be lower than leader's term
+   */
+  public void ensureTermsIsHigher(String leader, Set<String> replicasNeedingRecovery) {
+    Terms newTerms;
+    while( (newTerms = terms.increaseTerms(leader, replicasNeedingRecovery)) != null) {
+      if (forceSaveTerms(newTerms)) return;
+    }
+  }
+
+  /**
+   * Can this replica become leader or is this replica's term equals to leader's term?
+   * @param coreNodeName of the replica
+   * @return true if this replica can become leader, false if otherwise
+   */
+  public boolean canBecomeLeader(String coreNodeName) {
+    return terms.canBecomeLeader(coreNodeName);
+  }
+
+  /**
+   * Did this replica registered its term? This is a sign to check f
+   * @param coreNodeName of the replica
+   * @return true if this replica registered its term, false if otherwise
+   */
+  public boolean registered(String coreNodeName) {
+    return terms.getTerm(coreNodeName) != null;
+  }
+
+  public void close() {
+    // no watcher will be registered
+    isClosed.set(true);
+    synchronized (listeners) {
+      listeners.clear();
+    }
+    ObjectReleaseTracker.release(this);
+  }
+
+  // package private for testing, only used by tests
+  Map<String, Long> getTerms() {
+    synchronized (writingLock) {
+      return new HashMap<>(terms.values);
+    }
+  }
+
+  /**
+   * Add a listener so the next time the shard's term get updated, listeners will be called
+   */
+  void addListener(CoreTermWatcher listener) {
+    synchronized (listeners) {
+      listeners.add(listener);
+    }
+  }
+
+  /**
+   * Remove the coreNodeName from terms map and also remove any expired listeners
+   * @return Return true if this object should not be reused
+   */
+  boolean removeTerm(CoreDescriptor cd) {
+    int numListeners;
+    synchronized (listeners) {
+      // solrcore already closed
+      listeners.removeIf(coreTermWatcher -> !coreTermWatcher.onTermChanged(terms));
+      numListeners = listeners.size();
+    }
+    Terms newTerms;
+    while ( (newTerms = terms.removeTerm(cd.getCloudDescriptor().getCoreNodeName())) != null) {
+      try {
+        if (saveTerms(newTerms)) return numListeners == 0;
+      } catch (KeeperException.NoNodeException e) {
+        return true;
+      }
+    }
+    return true;
+  }
+
+  /**
+   * Register a replica's term (term value will be 0).
+   * If a term is already associate with this replica do nothing
+   * @param coreNodeName of the replica
+   */
+  void registerTerm(String coreNodeName) {
+    Terms newTerms;
+    while ( (newTerms = terms.registerTerm(coreNodeName)) != null) {
+      if (forceSaveTerms(newTerms)) break;
+    }
+  }
+
+  /**
+   * Set a replica's term equals to leader's term
+   * @param coreNodeName of the replica
+   */
+  public void setEqualsToMax(String coreNodeName) {
+    Terms newTerms;
+    while ( (newTerms = terms.setEqualsToMax(coreNodeName)) != null) {
+      if (forceSaveTerms(newTerms)) break;
+    }
+  }
+
+  public long getTerm(String coreNodeName) {
+    Long term = terms.getTerm(coreNodeName);
+    return term == null? -1 : term;
+  }
+
+  // package private for testing, only used by tests
+  int getNumListeners() {
+    synchronized (listeners) {
+      return listeners.size();
+    }
+  }
+
+  /**
+   * Set new terms to ZK.
+   * In case of correspond ZK term node is not created, create it
+   * @param newTerms to be set
+   * @return true if terms is saved successfully to ZK, false if otherwise
+   */
+  private boolean forceSaveTerms(Terms newTerms) {
+    try {
+      return saveTerms(newTerms);
+    } catch (KeeperException.NoNodeException e) {
+      ensureTermNodeExist();
+      return false;
+    }
+  }
+
+  /**
+   * Set new terms to ZK, the version of new terms must match the current ZK term node
+   * @param newTerms to be set
+   * @return true if terms is saved successfully to ZK, false if otherwise
+   * @throws KeeperException.NoNodeException correspond ZK term node is not created
+   */
+  private boolean saveTerms(Terms newTerms) throws KeeperException.NoNodeException {
+    byte[] znodeData = Utils.toJSON(newTerms.values);
+    try {
+      Stat stat = zkClient.setData(znodePath, znodeData, newTerms.version, true);
+      setNewTerms(new Terms(newTerms.values, stat.getVersion()));
+      return true;
+    } catch (KeeperException.BadVersionException e) {
+      log.info("Failed to save terms, version is not match, retrying");
+      refreshTerms();
+    } catch (KeeperException.NoNodeException e) {
+      throw e;
+    } catch (Exception e) {
+      throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Error save shard term for collection:" + collection, e);
+    }
+    return false;
+  }
+
+  /**
+   * Create correspond ZK term node
+   */
+  private void ensureTermNodeExist() {
+    String path = "/collections/"+collection+ "/terms";
+    try {
+      if (!zkClient.exists(path, true)) {
+        try {
+          zkClient.makePath(path, true);
+        } catch (KeeperException.NodeExistsException e) {
+          // it's okay if another beats us creating the node
+        }
+      }
+      path += "/"+shard;
+      if (!zkClient.exists(path, true)) {
+        try {
+          Map<String, Long> initialTerms = new HashMap<>();
+          zkClient.create(path, Utils.toJSON(initialTerms), CreateMode.PERSISTENT, true);
+        } catch (KeeperException.NodeExistsException e) {
+          // it's okay if another beats us creating the node
+        }
+      }
+    }  catch (InterruptedException e) {
+      Thread.interrupted();
+      throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Error creating shard term node in Zookeeper for collection:" + collection, e);
+    } catch (KeeperException e) {
+      throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Error creating shard term node in Zookeeper for collection:" + collection, e);
+    }
+  }
+
+  /**
+   * Fetch latest terms from ZK
+   */
+  public void refreshTerms() {
+    Terms newTerms;
+    try {
+      Stat stat = new Stat();
+      byte[] data = zkClient.getData(znodePath, null, stat, true);
+      newTerms = new Terms((Map<String, Long>) Utils.fromJSON(data), stat.getVersion());
+    } catch (KeeperException e) {
+      Thread.interrupted();
+      throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Error updating shard term for collection:" + collection, e);
+    } catch (InterruptedException e) {
+      throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Error updating shard term for collection:" + collection, e);
+    }
+
+    setNewTerms(newTerms);
+  }
+
+  /**
+   * Retry register a watcher to the correspond ZK term node
+   */
+  private void retryRegisterWatcher() {
+    while (!isClosed.get()) {
+      try {
+        registerWatcher();
+        return;
+      } catch (KeeperException.SessionExpiredException | KeeperException.AuthFailedException e) {
+        isClosed.set(true);
+        log.error("Failed watching shard term for collection: {} due to unrecoverable exception", collection, e);
+        return;
+      } catch (KeeperException e) {
+        log.warn("Failed watching shard term for collection:{}, retrying!", collection, e);
+        try {
+          zkClient.getConnectionManager().waitForConnected(zkClient.getZkClientTimeout());
+        } catch (TimeoutException te) {
+          if (Thread.interrupted()) {
+            throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Error watching shard term for collection:" + collection, te);
+          }
+        }
+      }
+    }
+  }
+
+  /**
+   * Register a watcher to the correspond ZK term node
+   */
+  private void registerWatcher() throws KeeperException {
+    Watcher watcher = event -> {
+      // session events are not change events, and do not remove the watcher
+      if (Watcher.Event.EventType.None == event.getType()) {
+        return;
+      }
+      retryRegisterWatcher();
+      // Some events may be missed during register a watcher, so it is safer to refresh terms after registering watcher
+      refreshTerms();
+    };
+    try {
+      // exists operation is faster than getData operation
+      zkClient.exists(znodePath, watcher, true);
+    } catch (InterruptedException e) {
+      Thread.interrupted();
+      throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Error watching shard term for collection:" + collection, e);
+    }
+  }
+
+
+  /**
+   * Atomically update {@link ZkShardTerms#terms} and call listeners
+   * @param newTerms to be set
+   */
+  private void setNewTerms(Terms newTerms) {
+    boolean isChanged = false;
+    synchronized (writingLock) {
+      if (terms == null || newTerms.version > terms.version) {
+        terms = newTerms;
+        isChanged = true;
+      }
+    }
+    if (isChanged) onTermUpdates(newTerms);
+  }
+
+  private void onTermUpdates(Terms newTerms) {
+    synchronized (listeners) {
+      listeners.removeIf(coreTermWatcher -> !coreTermWatcher.onTermChanged(newTerms));
+    }
+  }
+
+  /**
+   * Hold values of terms, this class is immutable
+   */
+  static class Terms {
+    private final Map<String, Long> values;
+    // ZK node version
+    private final int version;
+
+    public Terms () {
+      this(new HashMap<>(), 0);
+    }
+
+    public Terms(Map<String, Long> values, int version) {
+      this.values = values;
+      this.version = version;
+    }
+
+    /**
+     * Can this replica become leader or is this replica's term equals to leader's term?
+     * @param coreNodeName of the replica
+     * @return true if this replica can become leader, false if otherwise
+     */
+    boolean canBecomeLeader(String coreNodeName) {
+      if (values.isEmpty()) return true;
+      long maxTerm = Collections.max(values.values());
+      return values.getOrDefault(coreNodeName, 0L) == maxTerm;
+    }
+
+    Long getTerm(String coreNodeName) {
+      return values.get(coreNodeName);
+    }
+
+    /**
+     * Return a new {@link Terms} in which term of {@code leader} is higher than {@code replicasNeedingRecovery}
+     * @param leader coreNodeName of leader
+     * @param replicasNeedingRecovery set of replicas in which their terms should be lower than leader's term
+     * @return null if term of {@code leader} is already higher than {@code replicasNeedingRecovery}
+     */
+    Terms increaseTerms(String leader, Set<String> replicasNeedingRecovery) {
+      if (!values.containsKey(leader)) {
+        throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Can not find leader's term " + leader);
+      }
+
+      boolean changed = false;
+      boolean foundReplicasInLowerTerms = false;
+
+      HashMap<String, Long> newValues = new HashMap<>(values);
+      long leaderTerm = newValues.get(leader);
+      for (String replica : newValues.keySet()) {
+        if (replicasNeedingRecovery.contains(replica)) foundReplicasInLowerTerms = true;
+        if (Objects.equals(newValues.get(replica), leaderTerm)) {
+          if(replicasNeedingRecovery.contains(replica)) {
+            changed = true;
+          } else {
+            newValues.put(replica, leaderTerm+1);
+          }
+        }
+      }
+
+      // We should skip the optimization if there are no replicasNeedingRecovery present in local terms,
+      // this may indicate that the current value is stale
+      if (!changed && foundReplicasInLowerTerms) return null;
+      return new Terms(newValues, version);
+    }
+
+    /**
+     * Return a new {@link Terms} in which term of {@code coreNodeName} is removed
+     * @param coreNodeName of the replica
+     * @return null if term of {@code coreNodeName} is already not exist
+     */
+    Terms removeTerm(String coreNodeName) {
+      if (!values.containsKey(coreNodeName)) return null;
+
+      HashMap<String, Long> newValues = new HashMap<>(values);
+      newValues.remove(coreNodeName);
+      return new Terms(newValues, version);
+    }
+
+    /**
+     * Return a new {@link Terms} in which the associate term of {@code coreNodeName} is not null
+     * @param coreNodeName of the replica
+     * @return null if term of {@code coreNodeName} is already exist
+     */
+    Terms registerTerm(String coreNodeName) {
+      if (values.containsKey(coreNodeName)) return null;
+
+      HashMap<String, Long> newValues = new HashMap<>(values);
+      newValues.put(coreNodeName, 0L);
+      return new Terms(newValues, version);
+    }
+
+    /**
+     * Return a new {@link Terms} in which the term of {@code coreNodeName} is max
+     * @param coreNodeName of the replica
+     * @return null if term of {@code coreNodeName} is already maximum
+     */
+    Terms setEqualsToMax(String coreNodeName) {
+      long maxTerm;
+      try {
+        maxTerm = Collections.max(values.values());
+      } catch (NoSuchElementException e){
+        maxTerm = 0;
+      }
+      if (values.get(coreNodeName) == maxTerm) return null;
+
+      HashMap<String, Long> newValues = new HashMap<>(values);
+      newValues.put(coreNodeName, maxTerm);
+      return new Terms(newValues, version);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/27ef6530/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateCollectionCmd.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateCollectionCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateCollectionCmd.java
index 4c6ce47..428ad83 100644
--- a/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateCollectionCmd.java
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateCollectionCmd.java
@@ -34,6 +34,7 @@ import java.util.concurrent.atomic.AtomicReference;
 
 import org.apache.solr.client.solrj.cloud.autoscaling.AlreadyExistsException;
 import org.apache.solr.client.solrj.cloud.autoscaling.AutoScalingConfig;
+import org.apache.solr.client.solrj.cloud.autoscaling.BadVersionException;
 import org.apache.solr.client.solrj.cloud.autoscaling.DistribStateManager;
 import org.apache.solr.client.solrj.cloud.autoscaling.Policy;
 import org.apache.solr.client.solrj.cloud.autoscaling.PolicyHelper;
@@ -392,7 +393,22 @@ public class CreateCollectionCmd implements OverseerCollectionMessageHandler.Cmd
   public static void createCollectionZkNode(DistribStateManager stateManager, String collection, Map<String,String> params) {
     log.debug("Check for collection zkNode:" + collection);
     String collectionPath = ZkStateReader.COLLECTIONS_ZKNODE + "/" + collection;
-
+    // clean up old terms node
+    String termsPath = ZkStateReader.COLLECTIONS_ZKNODE + "/" + collection + "/terms";
+    try {
+      if (stateManager.hasData(termsPath)) {
+        List<String> paths = stateManager.listData(termsPath);
+        for (String path : paths) {
+          stateManager.removeData(termsPath + "/" + path, -1);
+        }
+        stateManager.removeData(termsPath, -1);
+      }
+    } catch (InterruptedException e) {
+      Thread.interrupted();
+      throw new SolrException(ErrorCode.SERVER_ERROR, "Error deleting old term nodes for collection from Zookeeper", e);
+    } catch (KeeperException | IOException | BadVersionException e) {
+      throw new SolrException(ErrorCode.SERVER_ERROR, "Error deleting old term nodes for collection from Zookeeper", e);
+    }
     try {
       if (!stateManager.hasData(collectionPath)) {
         log.debug("Creating collection in ZooKeeper:" + collection);

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/27ef6530/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java b/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java
index cebb2d0..dcc3de6 100644
--- a/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java
+++ b/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java
@@ -28,8 +28,10 @@ import java.util.List;
 import java.util.Locale;
 import java.util.Map;
 import java.util.Optional;
+import java.util.OptionalLong;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
 import java.util.function.BiConsumer;
 
 import com.google.common.collect.ImmutableSet;
@@ -47,6 +49,7 @@ import org.apache.solr.cloud.OverseerSolrResponse;
 import org.apache.solr.cloud.OverseerTaskQueue;
 import org.apache.solr.cloud.OverseerTaskQueue.QueueEvent;
 import org.apache.solr.cloud.ZkController;
+import org.apache.solr.cloud.ZkShardTerms;
 import org.apache.solr.cloud.api.collections.OverseerCollectionMessageHandler;
 import org.apache.solr.cloud.overseer.SliceMutator;
 import org.apache.solr.cloud.rule.ReplicaAssigner;
@@ -1067,7 +1070,8 @@ public class CollectionsHandler extends RequestHandlerBase implements Permission
   }
 
   private static void forceLeaderElection(SolrQueryRequest req, CollectionsHandler handler) {
-    ClusterState clusterState = handler.coreContainer.getZkController().getClusterState();
+    ZkController zkController = handler.coreContainer.getZkController();
+    ClusterState clusterState = zkController.getClusterState();
     String collectionName = req.getParams().required().get(COLLECTION_PROP);
     String sliceId = req.getParams().required().get(SHARD_ID_PROP);
 
@@ -1079,7 +1083,7 @@ public class CollectionsHandler extends RequestHandlerBase implements Permission
           "No shard with name " + sliceId + " exists for collection " + collectionName);
     }
 
-    try {
+    try (ZkShardTerms zkShardTerms = new ZkShardTerms(collectionName, slice.getName(), zkController.getZkClient())) {
       // if an active replica is the leader, then all is fine already
       Replica leader = slice.getLeader();
       if (leader != null && leader.getState() == State.ACTIVE) {
@@ -1096,20 +1100,37 @@ public class CollectionsHandler extends RequestHandlerBase implements Permission
         handler.coreContainer.getZkController().getZkClient().clean(lirPath);
       }
 
+      final Set<String> liveNodes = clusterState.getLiveNodes();
+      List<Replica> liveReplicas = slice.getReplicas().stream()
+          .filter(rep -> liveNodes.contains(rep.getNodeName())).collect(Collectors.toList());
+      boolean shouldIncreaseReplicaTerms = liveReplicas.stream()
+          .noneMatch(rep -> zkShardTerms.registered(rep.getName()) && zkShardTerms.canBecomeLeader(rep.getName()));
+      // we won't increase replica's terms if exist a live replica with term equals to leader
+      if (shouldIncreaseReplicaTerms) {
+        OptionalLong optionalMaxTerm = liveReplicas.stream()
+            .filter(rep -> zkShardTerms.registered(rep.getName()))
+            .mapToLong(rep -> zkShardTerms.getTerm(rep.getName()))
+            .max();
+        // increase terms of replicas less out-of-sync
+        if (optionalMaxTerm.isPresent()) {
+          liveReplicas.stream()
+              .filter(rep -> zkShardTerms.getTerm(rep.getName()) == optionalMaxTerm.getAsLong())
+              .forEach(rep -> zkShardTerms.setEqualsToMax(rep.getName()));
+        }
+      }
+
       // Call all live replicas to prepare themselves for leadership, e.g. set last published
       // state to active.
-      for (Replica rep : slice.getReplicas()) {
-        if (clusterState.getLiveNodes().contains(rep.getNodeName())) {
-          ShardHandler shardHandler = handler.coreContainer.getShardHandlerFactory().getShardHandler();
+      for (Replica rep : liveReplicas) {
+        ShardHandler shardHandler = handler.coreContainer.getShardHandlerFactory().getShardHandler();
 
-          ModifiableSolrParams params = new ModifiableSolrParams();
-          params.set(CoreAdminParams.ACTION, CoreAdminAction.FORCEPREPAREFORLEADERSHIP.toString());
-          params.set(CoreAdminParams.CORE, rep.getStr("core"));
-          String nodeName = rep.getNodeName();
+        ModifiableSolrParams params = new ModifiableSolrParams();
+        params.set(CoreAdminParams.ACTION, CoreAdminAction.FORCEPREPAREFORLEADERSHIP.toString());
+        params.set(CoreAdminParams.CORE, rep.getStr("core"));
+        String nodeName = rep.getNodeName();
 
-          OverseerCollectionMessageHandler.sendShardRequest(nodeName, params, shardHandler, null, null,
-              CommonParams.CORES_HANDLER_PATH, handler.coreContainer.getZkController().getZkStateReader()); // synchronous request
-        }
+        OverseerCollectionMessageHandler.sendShardRequest(nodeName, params, shardHandler, null, null,
+            CommonParams.CORES_HANDLER_PATH, handler.coreContainer.getZkController().getZkStateReader()); // synchronous request
       }
 
       // Wait till we have an active leader

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/27ef6530/solr/core/src/java/org/apache/solr/handler/admin/PrepRecoveryOp.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/handler/admin/PrepRecoveryOp.java b/solr/core/src/java/org/apache/solr/handler/admin/PrepRecoveryOp.java
index 0a6d5ce..3647735 100644
--- a/solr/core/src/java/org/apache/solr/handler/admin/PrepRecoveryOp.java
+++ b/solr/core/src/java/org/apache/solr/handler/admin/PrepRecoveryOp.java
@@ -21,6 +21,7 @@ import java.lang.invoke.MethodHandles;
 import java.util.Objects;
 
 import org.apache.solr.cloud.CloudDescriptor;
+import org.apache.solr.cloud.ZkShardTerms;
 import org.apache.solr.common.SolrException;
 import org.apache.solr.common.cloud.ClusterState;
 import org.apache.solr.common.cloud.DocCollection;
@@ -124,6 +125,12 @@ class PrepRecoveryOp implements CoreAdminHandler.CoreAdminOp {
                 log.warn("Leader " + core.getName() + " ignoring request to be in the recovering state because it is live and active.");
               }
 
+              ZkShardTerms shardTerms = coreContainer.getZkController().getShardTerms(collectionName, slice.getName());
+              // if the replica is waiting for leader to see recovery state, the leader should refresh its terms
+              if (waitForState == Replica.State.RECOVERING && shardTerms.registered(coreNodeName) && !shardTerms.canBecomeLeader(coreNodeName)) {
+                shardTerms.refreshTerms();
+              }
+
               boolean onlyIfActiveCheckResult = onlyIfLeaderActive != null && onlyIfLeaderActive && localState != Replica.State.ACTIVE;
               log.info("In WaitForState(" + waitForState + "): collection=" + collectionName + ", shard=" + slice.getName() +
                   ", thisCore=" + core.getName() + ", leaderDoesNotNeedRecovery=" + leaderDoesNotNeedRecovery +

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/27ef6530/solr/core/src/java/org/apache/solr/update/DefaultSolrCoreState.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/update/DefaultSolrCoreState.java b/solr/core/src/java/org/apache/solr/update/DefaultSolrCoreState.java
index b418a19..739604f 100644
--- a/solr/core/src/java/org/apache/solr/update/DefaultSolrCoreState.java
+++ b/solr/core/src/java/org/apache/solr/update/DefaultSolrCoreState.java
@@ -308,19 +308,20 @@ public final class DefaultSolrCoreState extends SolrCoreState implements Recover
           // after the current one, and if there is, bail
           boolean locked = recoveryLock.tryLock();
           try {
-            if (!locked) {
-              if (recoveryWaiting.get() > 0) {
-                return;
-              }
-              recoveryWaiting.incrementAndGet();
-            } else {
-              recoveryWaiting.incrementAndGet();
-              cancelRecovery();
+            if (!locked && recoveryWaiting.get() > 0) {
+              return;
             }
+
+            recoveryWaiting.incrementAndGet();
+            cancelRecovery();
             
             recoveryLock.lock();
             try {
-              recoveryWaiting.decrementAndGet();
+              // don't use recoveryLock.getQueueLength() for this
+              if (recoveryWaiting.decrementAndGet() > 0) {
+                // another recovery waiting behind us, let it run now instead of after we finish
+                return;
+              }
               
               // to be air tight we must also check after lock
               if (cc.isShutDown()) {

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/27ef6530/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java b/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java
index de031a2..3cff171 100644
--- a/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java
+++ b/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java
@@ -23,6 +23,7 @@ import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.EnumSet;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
@@ -44,6 +45,7 @@ import org.apache.solr.client.solrj.response.SimpleSolrResponse;
 import org.apache.solr.cloud.CloudDescriptor;
 import org.apache.solr.cloud.Overseer;
 import org.apache.solr.cloud.ZkController;
+import org.apache.solr.cloud.ZkShardTerms;
 import org.apache.solr.cloud.overseer.OverseerAction;
 import org.apache.solr.common.SolrException;
 import org.apache.solr.common.SolrException.ErrorCode;
@@ -184,6 +186,10 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
   private final boolean cloneRequiredOnLeader;
   private final Replica.Type replicaType;
 
+  @Deprecated
+  // this flag, used for testing rolling updates, should be removed by SOLR-11812
+  private final boolean isOldLIRMode;
+
   public DistributedUpdateProcessor(SolrQueryRequest req, SolrQueryResponse rsp, UpdateRequestProcessor next) {
     this(req, rsp, new AtomicUpdateDocumentMerger(req), next);
   }
@@ -202,6 +208,7 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
 
     this.ulog = req.getCore().getUpdateHandler().getUpdateLog();
     this.vinfo = ulog == null ? null : ulog.getVersionInfo();
+    this.isOldLIRMode = !"new".equals(req.getCore().getCoreDescriptor().getCoreProperty("lirVersion", "new"));
     versionsStored = this.vinfo != null && this.vinfo.getVersionField() != null;
     returnVersions = req.getParams().getBool(UpdateParams.VERSIONS ,false);
 
@@ -343,13 +350,13 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
         }
 
         List<Node> nodes = new ArrayList<>(replicaProps.size());
+        ZkShardTerms zkShardTerms = zkController.getShardTerms(collection, shardId);
         for (ZkCoreNodeProps props : replicaProps) {
-          if (skipList != null) {
-            boolean skip = skipListSet.contains(props.getCoreUrl());
-            log.info("check url:" + props.getCoreUrl() + " against:" + skipListSet + " result:" + skip);
-            if (!skip) {
-              nodes.add(new StdNode(props, collection, shardId));
-            }
+          String coreNodeName = ((Replica) props.getNodeProps()).getName();
+          if (skipList != null && skipListSet.contains(props.getCoreUrl())) {
+            log.info("check url:" + props.getCoreUrl() + " against:" + skipListSet + " result:true");
+          } else if(!isOldLIRMode && zkShardTerms.registered(coreNodeName) && !zkShardTerms.canBecomeLeader(coreNodeName)) {
+            log.info("skip url:{} cause its term is less than leader", props.getCoreUrl());
           } else {
             nodes.add(new StdNode(props, collection, shardId));
           }
@@ -751,7 +758,7 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
     // TODO - we may need to tell about more than one error...
 
     List<Error> errorsForClient = new ArrayList<>(errors.size());
-    
+    Map<ShardInfo, Set<String>> failedReplicas = new HashMap<>();
     for (final SolrCmdDistributor.Error error : errors) {
       
       if (error.req.node instanceof RetryNode) {
@@ -843,18 +850,27 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
             && foundErrorNodeInReplicaList // we found an error for one of replicas
             && !stdNode.getNodeProps().getCoreUrl().equals(leaderProps.getCoreUrl())) { // we do not want to put ourself into LIR
           try {
+            String coreNodeName = ((Replica) stdNode.getNodeProps().getNodeProps()).getName();
             // if false, then the node is probably not "live" anymore
             // and we do not need to send a recovery message
             Throwable rootCause = SolrException.getRootCause(error.e);
-            log.error("Setting up to try to start recovery on replica {}", replicaUrl, rootCause);
-            zkController.ensureReplicaInLeaderInitiatedRecovery(
-                req.getCore().getCoreContainer(),
-                collection,
-                shardId,
-                stdNode.getNodeProps(),
-                req.getCore().getCoreDescriptor(),
-                false /* forcePublishState */
-            );
+            if (!isOldLIRMode && zkController.getShardTerms(collection, shardId).registered(coreNodeName)) {
+              log.error("Setting up to try to start recovery on replica {} with url {} by increasing leader term", coreNodeName, replicaUrl, rootCause);
+              ShardInfo shardInfo = new ShardInfo(collection, shardId, leaderCoreNodeName);
+              failedReplicas.putIfAbsent(shardInfo, new HashSet<>());
+              failedReplicas.get(shardInfo).add(coreNodeName);
+            } else {
+              // The replica did not registered its term, so it must run with old LIR implementation
+              log.error("Setting up to try to start recovery on replica {}", replicaUrl, rootCause);
+              zkController.ensureReplicaInLeaderInitiatedRecovery(
+                  req.getCore().getCoreContainer(),
+                  collection,
+                  shardId,
+                  stdNode.getNodeProps(),
+                  req.getCore().getCoreDescriptor(),
+                  false /* forcePublishState */
+              );
+            }
           } catch (Exception exc) {
             Throwable setLirZnodeFailedCause = SolrException.getRootCause(exc);
             log.error("Leader failed to set replica " +
@@ -873,6 +889,12 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
         }
       }
     }
+    if (!isOldLIRMode) {
+      for (Map.Entry<ShardInfo, Set<String>> entry : failedReplicas.entrySet()) {
+        ShardInfo shardInfo = entry.getKey();
+        zkController.getShardTerms(shardInfo.collection, shardInfo.shard).ensureTermsIsHigher(shardInfo.leader, entry.getValue());
+      }
+    }
     // in either case, we need to attach the achieved and min rf to the response.
     if (leaderReplicationTracker != null || rollupReplicationTracker != null) {
       int achievedRf = Integer.MAX_VALUE;
@@ -905,6 +927,38 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
     }
   }
 
+  private class ShardInfo {
+    private String collection;
+    private String shard;
+    private String leader;
+
+    public ShardInfo(String collection, String shard, String leader) {
+      this.collection = collection;
+      this.shard = shard;
+      this.leader = leader;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+      if (this == o) return true;
+      if (o == null || getClass() != o.getClass()) return false;
+
+      ShardInfo shardInfo = (ShardInfo) o;
+
+      if (!collection.equals(shardInfo.collection)) return false;
+      if (!shard.equals(shardInfo.shard)) return false;
+      return leader.equals(shardInfo.leader);
+    }
+
+    @Override
+    public int hashCode() {
+      int result = collection.hashCode();
+      result = 31 * result + shard.hashCode();
+      result = 31 * result + leader.hashCode();
+      return result;
+    }
+  }
+
  
   // must be synchronized by bucket
   private void doLocalAdd(AddUpdateCommand cmd) throws IOException {