You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by da...@apache.org on 2018/01/29 08:58:07 UTC

[1/2] lucene-solr:branch_7x: SOLR-11702: Redesign current LIR implementation

Repository: lucene-solr
Updated Branches:
  refs/heads/branch_7x 16e80e6a3 -> 8c8d78a4b


http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/8c8d78a4/solr/core/src/test/org/apache/solr/cloud/ForceLeaderTest.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/ForceLeaderTest.java b/solr/core/src/test/org/apache/solr/cloud/ForceLeaderTest.java
index 89ff67a..46ecdb6 100644
--- a/solr/core/src/test/org/apache/solr/cloud/ForceLeaderTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/ForceLeaderTest.java
@@ -21,6 +21,7 @@ import java.lang.invoke.MethodHandles;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
+import java.util.Properties;
 
 import org.apache.solr.client.solrj.SolrClient;
 import org.apache.solr.client.solrj.SolrRequest;
@@ -70,18 +71,166 @@ public class ForceLeaderTest extends HttpPartitionTest {
 
   }
 
+  /**
+   * Tests that FORCELEADER can get an active leader even only replicas with term lower than leader's term are live
+   */
+  @Test
+  @Slow
+  public void testReplicasInLowerTerms() throws Exception {
+    handle.put("maxScore", SKIPVAL);
+    handle.put("timestamp", SKIPVAL);
+
+    String testCollectionName = "forceleader_lower_terms_collection";
+    createCollection(testCollectionName, "conf1", 1, 3, 1);
+    cloudClient.setDefaultCollection(testCollectionName);
+
+    try {
+      List<Replica> notLeaders = ensureAllReplicasAreActive(testCollectionName, SHARD1, 1, 3, maxWaitSecsToSeeAllActive);
+      assertEquals("Expected 2 replicas for collection " + testCollectionName
+          + " but found " + notLeaders.size() + "; clusterState: "
+          + printClusterStateInfo(testCollectionName), 2, notLeaders.size());
+
+      Replica leader = cloudClient.getZkStateReader().getLeaderRetry(testCollectionName, SHARD1);
+      JettySolrRunner notLeader0 = getJettyOnPort(getReplicaPort(notLeaders.get(0)));
+      ZkController zkController = notLeader0.getCoreContainer().getZkController();
+
+      log.info("Before put non leaders into lower term: " + printClusterStateInfo());
+      putNonLeadersIntoLowerTerm(testCollectionName, SHARD1, zkController, leader, notLeaders);
+
+      for (Replica replica : notLeaders) {
+        waitForState(testCollectionName, replica.getName(), State.DOWN, 60000);
+      }
+      waitForState(testCollectionName, leader.getName(), State.DOWN, 60000);
+      cloudClient.getZkStateReader().forceUpdateCollection(testCollectionName);
+      ClusterState clusterState = cloudClient.getZkStateReader().getClusterState();
+      int numActiveReplicas = getNumberOfActiveReplicas(clusterState, testCollectionName, SHARD1);
+      assertEquals("Expected only 0 active replica but found " + numActiveReplicas +
+          "; clusterState: " + printClusterStateInfo(), 0, numActiveReplicas);
+
+      int numReplicasOnLiveNodes = 0;
+      for (Replica rep : clusterState.getCollection(testCollectionName).getSlice(SHARD1).getReplicas()) {
+        if (clusterState.getLiveNodes().contains(rep.getNodeName())) {
+          numReplicasOnLiveNodes++;
+        }
+      }
+      assertEquals(2, numReplicasOnLiveNodes);
+      log.info("Before forcing leader: " + printClusterStateInfo());
+      // Assert there is no leader yet
+      assertNull("Expected no leader right now. State: " + clusterState.getCollection(testCollectionName).getSlice(SHARD1),
+          clusterState.getCollection(testCollectionName).getSlice(SHARD1).getLeader());
+
+      assertSendDocFails(3);
+
+      log.info("Do force leader...");
+      doForceLeader(cloudClient, testCollectionName, SHARD1);
+
+      // By now we have an active leader. Wait for recoveries to begin
+      waitForRecoveriesToFinish(testCollectionName, cloudClient.getZkStateReader(), true);
+
+      cloudClient.getZkStateReader().forceUpdateCollection(testCollectionName);
+      clusterState = cloudClient.getZkStateReader().getClusterState();
+      log.info("After forcing leader: " + clusterState.getCollection(testCollectionName).getSlice(SHARD1));
+      // we have a leader
+      Replica newLeader = clusterState.getCollectionOrNull(testCollectionName).getSlice(SHARD1).getLeader();
+      assertNotNull(newLeader);
+      // leader is active
+      assertEquals(State.ACTIVE, newLeader.getState());
+
+      numActiveReplicas = getNumberOfActiveReplicas(clusterState, testCollectionName, SHARD1);
+      assertEquals(2, numActiveReplicas);
+
+      // Assert that indexing works again
+      log.info("Sending doc 4...");
+      sendDoc(4);
+      log.info("Committing...");
+      cloudClient.commit();
+      log.info("Doc 4 sent and commit issued");
+
+      assertDocsExistInAllReplicas(notLeaders, testCollectionName, 1, 1);
+      assertDocsExistInAllReplicas(notLeaders, testCollectionName, 4, 4);
+
+      // Docs 1 and 4 should be here. 2 was lost during the partition, 3 had failed to be indexed.
+      log.info("Checking doc counts...");
+      ModifiableSolrParams params = new ModifiableSolrParams();
+      params.add("q", "*:*");
+      assertEquals("Expected only 2 documents in the index", 2, cloudClient.query(params).getResults().getNumFound());
+
+      bringBackOldLeaderAndSendDoc(testCollectionName, leader, notLeaders, 5);
+    } finally {
+      log.info("Cleaning up after the test.");
+      // try to clean up
+      attemptCollectionDelete(cloudClient, testCollectionName);
+    }
+  }
+
+  void putNonLeadersIntoLowerTerm(String collectionName, String shard, ZkController zkController, Replica leader, List<Replica> notLeaders) throws Exception {
+    SocketProxy[] nonLeaderProxies = new SocketProxy[notLeaders.size()];
+    for (int i = 0; i < notLeaders.size(); i++)
+      nonLeaderProxies[i] = getProxyForReplica(notLeaders.get(i));
+
+    sendDoc(1);
+
+    // ok, now introduce a network partition between the leader and both replicas
+    log.info("Closing proxies for the non-leader replicas...");
+    for (SocketProxy proxy : nonLeaderProxies)
+      proxy.close();
+    getProxyForReplica(leader).close();
+
+    // indexing during a partition
+    log.info("Sending a doc during the network partition...");
+    JettySolrRunner leaderJetty = getJettyOnPort(getReplicaPort(leader));
+    sendDoc(2, null, leaderJetty);
+    
+    for (Replica replica : notLeaders) {
+      waitForState(collectionName, replica.getName(), State.DOWN, 60000);
+    }
+
+    // Kill the leader
+    log.info("Killing leader for shard1 of " + collectionName + " on node " + leader.getNodeName() + "");
+    leaderJetty.stop();
+
+    // Wait for a steady state, till the shard is leaderless
+    log.info("Sleep and periodically wake up to check for state...");
+    for (int i = 0; i < 20; i++) {
+      ClusterState clusterState = zkController.getZkStateReader().getClusterState();
+      boolean allDown = true;
+      for (Replica replica : clusterState.getCollection(collectionName).getSlice(shard).getReplicas()) {
+        if (replica.getState() != State.DOWN) {
+          allDown = false;
+        }
+      }
+      if (allDown && clusterState.getCollection(collectionName).getSlice(shard).getLeader() == null) {
+        break;
+      }
+      Thread.sleep(1000);
+    }
+    log.info("Waking up...");
+
+    // remove the network partition
+    log.info("Reopening the proxies for the non-leader replicas...");
+    for (SocketProxy proxy : nonLeaderProxies)
+      proxy.reopen();
+
+    try (ZkShardTerms zkShardTerms = new ZkShardTerms(collectionName, shard, cloudClient.getZkStateReader().getZkClient())) {
+      for (Replica notLeader : notLeaders) {
+        assertTrue(zkShardTerms.getTerm(leader.getName()) > zkShardTerms.getTerm(notLeader.getName()));
+      }
+    }
+  }
+
   /***
    * Tests that FORCELEADER can get an active leader after leader puts all replicas in LIR and itself goes down,
    * hence resulting in a leaderless shard.
    */
   @Test
   @Slow
+  //TODO remove in SOLR-11812
   public void testReplicasInLIRNoLeader() throws Exception {
     handle.put("maxScore", SKIPVAL);
     handle.put("timestamp", SKIPVAL);
 
     String testCollectionName = "forceleader_test_collection";
-    createCollection(testCollectionName, "conf1", 1, 3, 1);
+    createOldLirCollection(testCollectionName, 3);
     cloudClient.setDefaultCollection(testCollectionName);
 
     try {
@@ -157,6 +306,28 @@ public class ForceLeaderTest extends HttpPartitionTest {
     }
   }
 
+  private void createOldLirCollection(String collection, int numReplicas) throws IOException, SolrServerException {
+    if (onlyLeaderIndexes) {
+      CollectionAdminRequest
+          .createCollection(collection, "conf1", 1, 0, numReplicas, 0)
+          .setCreateNodeSet("")
+          .process(cloudClient);
+    } else {
+      CollectionAdminRequest.createCollection(collection, "conf1", 1, numReplicas)
+          .setCreateNodeSet("")
+          .process(cloudClient);
+    }
+    Properties oldLir = new Properties();
+    oldLir.setProperty("lirVersion", "old");
+    for (int i = 0; i < numReplicas; i++) {
+      // this is the only way to create replicas which run in old lir implementation
+      CollectionAdminRequest
+          .addReplicaToShard(collection, "shard1", onlyLeaderIndexes? Replica.Type.TLOG: Replica.Type.NRT)
+          .setProperties(oldLir)
+          .process(cloudClient);
+    }
+  }
+
   /**
    * Test that FORCELEADER can set last published state of all down (live) replicas to active (so
    * that they become worthy candidates for leader election).
@@ -167,7 +338,7 @@ public class ForceLeaderTest extends HttpPartitionTest {
     handle.put("timestamp", SKIPVAL);
 
     String testCollectionName = "forceleader_last_published";
-    createCollection(testCollectionName, "conf1", 1, 3, 1);
+    createOldLirCollection(testCollectionName, 3);
     cloudClient.setDefaultCollection(testCollectionName);
     log.info("Collection created: " + testCollectionName);
 
@@ -204,33 +375,6 @@ public class ForceLeaderTest extends HttpPartitionTest {
     }
   }
 
-  protected void unsetLeader(String collection, String slice) throws Exception {
-    ZkDistributedQueue inQueue = Overseer.getStateUpdateQueue(cloudClient.getZkStateReader().getZkClient());
-    ZkStateReader zkStateReader = cloudClient.getZkStateReader();
-
-    ZkNodeProps m = new ZkNodeProps(Overseer.QUEUE_OPERATION, OverseerAction.LEADER.toLower(),
-        ZkStateReader.SHARD_ID_PROP, slice,
-        ZkStateReader.COLLECTION_PROP, collection);
-    inQueue.offer(Utils.toJSON(m));
-
-    ClusterState clusterState = null;
-    boolean transition = false;
-    for (int counter = 10; counter > 0; counter--) {
-      clusterState = zkStateReader.getClusterState();
-      Replica newLeader = clusterState.getCollection(collection).getSlice(slice).getLeader();
-      if (newLeader == null) {
-        transition = true;
-        break;
-      }
-      Thread.sleep(1000);
-    }
-
-    if (!transition) {
-      throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Could not unset replica leader" +
-          ". Cluster state: " + printClusterStateInfo(collection));
-    }
-  }
-
   protected void setReplicaState(String collection, String slice, Replica replica, Replica.State state) throws Exception {
     DistributedQueue inQueue = Overseer.getStateUpdateQueue(cloudClient.getZkStateReader().getZkClient());
     ZkStateReader zkStateReader = cloudClient.getZkStateReader();
@@ -263,23 +407,6 @@ public class ForceLeaderTest extends HttpPartitionTest {
           ". Last known state of the replica: " + replicaState);
     }
   }
-  
-  /*protected void setLastPublishedState(String collection, String slice, Replica replica, Replica.State state) throws SolrServerException, IOException,
-  KeeperException, InterruptedException {
-    ZkStateReader zkStateReader = cloudClient.getZkStateReader();
-    String baseUrl = zkStateReader.getBaseUrlForNodeName(replica.getNodeName());
-
-    ModifiableSolrParams params = new ModifiableSolrParams();
-    params.set(CoreAdminParams.ACTION, CoreAdminAction.FORCEPREPAREFORLEADERSHIP.toString());
-    params.set(CoreAdminParams.CORE, replica.getStr("core"));
-    params.set(ZkStateReader.STATE_PROP, state.toString());
-
-    SolrRequest<SimpleSolrResponse> req = new GenericSolrRequest(METHOD.GET, "/admin/cores", params);
-    NamedList resp = null;
-    try (HttpSolrClient hsc = new HttpSolrClient(baseUrl)) {
-       resp = hsc.request(req);
-    }
-  }*/
 
   protected Replica.State getLastPublishedState(String collection, String slice, Replica replica) throws SolrServerException, IOException,
   KeeperException, InterruptedException {
@@ -377,6 +504,7 @@ public class ForceLeaderTest extends HttpPartitionTest {
     // Bring back the leader which was stopped
     log.info("Bringing back originally killed leader...");
     JettySolrRunner leaderJetty = getJettyOnPort(getReplicaPort(leader));
+    getProxyForReplica(leader).reopen();
     leaderJetty.start();
     waitForRecoveriesToFinish(collection, cloudClient.getZkStateReader(), true);
     cloudClient.getZkStateReader().forceUpdateCollection(collection);

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/8c8d78a4/solr/core/src/test/org/apache/solr/cloud/HttpPartitionOnCommitTest.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/HttpPartitionOnCommitTest.java b/solr/core/src/test/org/apache/solr/cloud/HttpPartitionOnCommitTest.java
new file mode 100644
index 0000000..1580661
--- /dev/null
+++ b/solr/core/src/test/org/apache/solr/cloud/HttpPartitionOnCommitTest.java
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.cloud;
+
+import org.apache.http.NoHttpResponseException;
+import org.apache.solr.client.solrj.embedded.JettySolrRunner;
+import org.apache.solr.client.solrj.impl.HttpSolrClient;
+import org.apache.solr.common.SolrException;
+import org.apache.solr.common.cloud.Replica;
+import org.apache.solr.util.RTimer;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.lang.invoke.MethodHandles;
+import java.util.List;
+
+public class HttpPartitionOnCommitTest extends BasicDistributedZkTest {
+
+  private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+  private static final long sleepMsBeforeHealPartition = 2000L;
+
+  private final boolean onlyLeaderIndexes = random().nextBoolean();
+
+  public HttpPartitionOnCommitTest() {
+    super();
+    sliceCount = 1;
+    fixShardCount(4);
+  }
+
+  @Override
+  protected boolean useTlogReplicas() {
+    return onlyLeaderIndexes;
+  }
+
+  @Override
+  @Test
+  public void test() throws Exception {
+    oneShardTest();
+    multiShardTest();
+  }
+
+  private void multiShardTest() throws Exception {
+
+    log.info("Running multiShardTest");
+
+    // create a collection that has 2 shard and 2 replicas
+    String testCollectionName = "c8n_2x2_commits";
+    createCollection(testCollectionName, "conf1", 2, 2, 1);
+    cloudClient.setDefaultCollection(testCollectionName);
+
+    List<Replica> notLeaders =
+        ensureAllReplicasAreActive(testCollectionName, "shard1", 2, 2, 30);
+    assertTrue("Expected 1 replicas for collection " + testCollectionName
+            + " but found " + notLeaders.size() + "; clusterState: "
+            + printClusterStateInfo(),
+        notLeaders.size() == 1);
+
+    log.info("All replicas active for "+testCollectionName);
+
+    // let's put the leader in its own partition, no replicas can contact it now
+    Replica leader = cloudClient.getZkStateReader().getLeaderRetry(testCollectionName, "shard1");
+    log.info("Creating partition to leader at "+leader.getCoreUrl());
+    SocketProxy leaderProxy = getProxyForReplica(leader);
+    leaderProxy.close();
+
+    // let's find the leader of shard2 and ask him to commit
+    Replica shard2Leader = cloudClient.getZkStateReader().getLeaderRetry(testCollectionName, "shard2");
+    sendCommitWithRetry(shard2Leader);
+
+    Thread.sleep(sleepMsBeforeHealPartition);
+
+    cloudClient.getZkStateReader().forceUpdateCollection(testCollectionName); // get the latest state
+    leader = cloudClient.getZkStateReader().getLeaderRetry(testCollectionName, "shard1");
+    assertSame("Leader was not active", Replica.State.ACTIVE, leader.getState());
+
+    log.info("Healing partitioned replica at "+leader.getCoreUrl());
+    leaderProxy.reopen();
+    Thread.sleep(sleepMsBeforeHealPartition);
+
+    // try to clean up
+    attemptCollectionDelete(cloudClient, testCollectionName);
+
+    log.info("multiShardTest completed OK");
+  }
+
+  private void oneShardTest() throws Exception {
+    log.info("Running oneShardTest");
+
+    // create a collection that has 1 shard and 3 replicas
+    String testCollectionName = "c8n_1x3_commits";
+    createCollection(testCollectionName, "conf1", 1, 3, 1);
+    cloudClient.setDefaultCollection(testCollectionName);
+
+    List<Replica> notLeaders =
+        ensureAllReplicasAreActive(testCollectionName, "shard1", 1, 3, 30);
+    assertTrue("Expected 2 replicas for collection " + testCollectionName
+            + " but found " + notLeaders.size() + "; clusterState: "
+            + printClusterStateInfo(),
+        notLeaders.size() == 2);
+
+    log.info("All replicas active for "+testCollectionName);
+
+    // let's put the leader in its own partition, no replicas can contact it now
+    Replica leader = cloudClient.getZkStateReader().getLeaderRetry(testCollectionName, "shard1");
+    log.info("Creating partition to leader at "+leader.getCoreUrl());
+    SocketProxy leaderProxy = getProxyForReplica(leader);
+    leaderProxy.close();
+
+    Replica replica = notLeaders.get(0);
+    sendCommitWithRetry(replica);
+    Thread.sleep(sleepMsBeforeHealPartition);
+
+    cloudClient.getZkStateReader().forceUpdateCollection(testCollectionName); // get the latest state
+    leader = cloudClient.getZkStateReader().getLeaderRetry(testCollectionName, "shard1");
+    assertSame("Leader was not active", Replica.State.ACTIVE, leader.getState());
+
+    log.info("Healing partitioned replica at "+leader.getCoreUrl());
+    leaderProxy.reopen();
+    Thread.sleep(sleepMsBeforeHealPartition);
+
+    // try to clean up
+    attemptCollectionDelete(cloudClient, testCollectionName);
+
+    log.info("oneShardTest completed OK");
+  }
+
+  /**
+   * Overrides the parent implementation to install a SocketProxy in-front of the Jetty server.
+   */
+  @Override
+  public JettySolrRunner createJetty(File solrHome, String dataDir,
+                                     String shardList, String solrConfigOverride, String schemaOverride, Replica.Type replicaType)
+      throws Exception {
+    return createProxiedJetty(solrHome, dataDir, shardList, solrConfigOverride, schemaOverride, replicaType);
+  }
+
+  protected void sendCommitWithRetry(Replica replica) throws Exception {
+    String replicaCoreUrl = replica.getCoreUrl();
+    log.info("Sending commit request to: "+replicaCoreUrl);
+    final RTimer timer = new RTimer();
+    try (HttpSolrClient client = getHttpSolrClient(replicaCoreUrl)) {
+      try {
+        client.commit();
+
+        log.info("Sent commit request to {} OK, took {}ms", replicaCoreUrl, timer.getTime());
+      } catch (Exception exc) {
+        Throwable rootCause = SolrException.getRootCause(exc);
+        if (rootCause instanceof NoHttpResponseException) {
+          log.warn("No HTTP response from sending commit request to "+replicaCoreUrl+
+              "; will re-try after waiting 3 seconds");
+          Thread.sleep(3000);
+          client.commit();
+          log.info("Second attempt at sending commit to "+replicaCoreUrl+" succeeded.");
+        } else {
+          throw exc;
+        }
+      }
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/8c8d78a4/solr/core/src/test/org/apache/solr/cloud/HttpPartitionTest.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/HttpPartitionTest.java b/solr/core/src/test/org/apache/solr/cloud/HttpPartitionTest.java
index 58ebbf9..a18aa31 100644
--- a/solr/core/src/test/org/apache/solr/cloud/HttpPartitionTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/HttpPartitionTest.java
@@ -33,6 +33,7 @@ import java.util.concurrent.TimeUnit;
 import org.apache.lucene.util.LuceneTestCase.Slow;
 import org.apache.solr.JSONTestUtil;
 import org.apache.solr.SolrTestCaseJ4.SuppressSSL;
+import org.apache.solr.client.solrj.SolrClient;
 import org.apache.solr.client.solrj.SolrServerException;
 import org.apache.solr.client.solrj.embedded.JettySolrRunner;
 import org.apache.solr.client.solrj.impl.CloudSolrClient;
@@ -49,15 +50,21 @@ import org.apache.solr.common.cloud.SolrZkClient;
 import org.apache.solr.common.cloud.ZkCoreNodeProps;
 import org.apache.solr.common.cloud.ZkStateReader;
 import org.apache.solr.common.util.NamedList;
+import org.apache.solr.common.util.TimeSource;
 import org.apache.solr.core.CoreContainer;
 import org.apache.solr.core.SolrCore;
 import org.apache.solr.update.UpdateLog;
 import org.apache.solr.util.MockCoreContainer.MockCoreDescriptor;
 import org.apache.solr.util.RTimer;
+import org.apache.solr.util.TimeOut;
+import org.apache.zookeeper.KeeperException;
 import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import static org.apache.solr.common.cloud.Replica.State.DOWN;
+import static org.apache.solr.common.cloud.Replica.State.RECOVERING;
+
 /**
  * Simulates HTTP partitions between a leader and replica but the replica does
  * not lose its ZooKeeper connection.
@@ -121,6 +128,8 @@ public class HttpPartitionTest extends AbstractFullDistribZkTestBase {
 
     testLeaderInitiatedRecoveryCRUD();
 
+    testDoRecoveryOnRestart();
+
     // Tests that if we set a minRf that's not satisfied, no recovery is requested, but if minRf is satisfied,
     // recovery is requested
     testMinRf();
@@ -147,8 +156,9 @@ public class HttpPartitionTest extends AbstractFullDistribZkTestBase {
   }
 
   /**
-   * Tests handling of lir state znodes.
+   * Tests handling of different format of lir nodes
    */
+  //TODO remove in SOLR-11812
   protected void testLeaderInitiatedRecoveryCRUD() throws Exception {
     String testCollectionName = "c8n_crud_1x2";
     String shardId = "shard1";
@@ -182,11 +192,11 @@ public class HttpPartitionTest extends AbstractFullDistribZkTestBase {
         };
       }
     };
-    
-    zkController.updateLeaderInitiatedRecoveryState(testCollectionName, shardId, notLeader.getName(), Replica.State.DOWN, cd, true);
+
+    zkController.updateLeaderInitiatedRecoveryState(testCollectionName, shardId, notLeader.getName(), DOWN, cd, true);
     Map<String,Object> lirStateMap = zkController.getLeaderInitiatedRecoveryStateObject(testCollectionName, shardId, notLeader.getName());
     assertNotNull(lirStateMap);
-    assertSame(Replica.State.DOWN, Replica.State.getState((String) lirStateMap.get(ZkStateReader.STATE_PROP)));
+    assertSame(DOWN, Replica.State.getState((String) lirStateMap.get(ZkStateReader.STATE_PROP)));
 
     // test old non-json format handling
     SolrZkClient zkClient = zkController.getZkClient();
@@ -194,13 +204,64 @@ public class HttpPartitionTest extends AbstractFullDistribZkTestBase {
     zkClient.setData(znodePath, "down".getBytes(StandardCharsets.UTF_8), true);
     lirStateMap = zkController.getLeaderInitiatedRecoveryStateObject(testCollectionName, shardId, notLeader.getName());
     assertNotNull(lirStateMap);
-    assertSame(Replica.State.DOWN, Replica.State.getState((String) lirStateMap.get(ZkStateReader.STATE_PROP)));
+    assertSame(DOWN, Replica.State.getState((String) lirStateMap.get(ZkStateReader.STATE_PROP)));
     zkClient.delete(znodePath, -1, false);
 
     // try to clean up
     attemptCollectionDelete(cloudClient, testCollectionName);
   }
 
+  private void testDoRecoveryOnRestart() throws Exception {
+    String testCollectionName = "collDoRecoveryOnRestart";
+    try {
+      // Inject pausing in recovery op, hence the replica won't be able to finish recovery
+      System.setProperty("solr.cloud.wait-for-updates-with-stale-state-pause", String.valueOf(Integer.MAX_VALUE));
+
+      createCollection(testCollectionName, "conf1", 1, 2, 1);
+      cloudClient.setDefaultCollection(testCollectionName);
+
+      sendDoc(1, 2);
+
+      JettySolrRunner leaderJetty = getJettyOnPort(getReplicaPort(getShardLeader(testCollectionName, "shard1", 1000)));
+      List<Replica> notLeaders =
+          ensureAllReplicasAreActive(testCollectionName, "shard1", 1, 2, maxWaitSecsToSeeAllActive);
+      assertDocsExistInAllReplicas(notLeaders, testCollectionName, 1, 1);
+
+      SocketProxy proxy0 = getProxyForReplica(notLeaders.get(0));
+      SocketProxy leaderProxy = getProxyForReplica(getShardLeader(testCollectionName, "shard1", 1000));
+
+      proxy0.close();
+      leaderProxy.close();
+
+      // indexing during a partition
+      int achievedRf = sendDoc(2, 1, leaderJetty);
+      assertEquals("Unexpected achieved replication factor", 1, achievedRf);
+      try (ZkShardTerms zkShardTerms = new ZkShardTerms(testCollectionName, "shard1", cloudClient.getZkStateReader().getZkClient())) {
+        assertFalse(zkShardTerms.canBecomeLeader(notLeaders.get(0).getName()));
+      }
+      waitForState(testCollectionName, notLeaders.get(0).getName(), DOWN, 10000);
+
+      // heal partition
+      proxy0.reopen();
+      leaderProxy.reopen();
+
+      waitForState(testCollectionName, notLeaders.get(0).getName(), RECOVERING, 10000);
+
+      System.clearProperty("solr.cloud.wait-for-updates-with-stale-state-pause");
+      JettySolrRunner notLeaderJetty = getJettyOnPort(getReplicaPort(notLeaders.get(0)));
+      ChaosMonkey.stop(notLeaderJetty);
+
+      ChaosMonkey.start(notLeaderJetty);
+      ensureAllReplicasAreActive(testCollectionName, "shard1", 1, 2, 100);
+      assertDocsExistInAllReplicas(notLeaders, testCollectionName, 1, 2);
+    } finally {
+      System.clearProperty("solr.cloud.wait-for-updates-with-stale-state-pause");
+    }
+
+    // try to clean up
+    attemptCollectionDelete(cloudClient, testCollectionName);
+  }
+
   protected void testMinRf() throws Exception {
     // create a collection that has 1 shard and 3 replicas
     String testCollectionName = "collMinRf_1x3";
@@ -209,6 +270,7 @@ public class HttpPartitionTest extends AbstractFullDistribZkTestBase {
 
     sendDoc(1, 2);
 
+    JettySolrRunner leaderJetty = getJettyOnPort(getReplicaPort(getShardLeader(testCollectionName, "shard1", 1000)));
     List<Replica> notLeaders =
         ensureAllReplicasAreActive(testCollectionName, "shard1", 1, 3, maxWaitSecsToSeeAllActive);
     assertTrue("Expected 2 non-leader replicas for collection " + testCollectionName
@@ -221,27 +283,21 @@ public class HttpPartitionTest extends AbstractFullDistribZkTestBase {
     // Now introduce a network partition between the leader and 1 replica, so a minRf of 2 is still achieved
     log.info("partitioning replica :  " + notLeaders.get(0));
     SocketProxy proxy0 = getProxyForReplica(notLeaders.get(0));
+    SocketProxy leaderProxy = getProxyForReplica(getShardLeader(testCollectionName, "shard1", 1000));
 
     proxy0.close();
+    // leader still can connect to replica 2, by closing leaderProxy, replica 1 can not do recovery
+    leaderProxy.close();
 
     // indexing during a partition
-    int achievedRf = sendDoc(2, 2);
+    int achievedRf = sendDoc(2, 2, leaderJetty);
     assertEquals("Unexpected achieved replication factor", 2, achievedRf);
-
+    try (ZkShardTerms zkShardTerms = new ZkShardTerms(testCollectionName, "shard1", cloudClient.getZkStateReader().getZkClient())) {
+      assertFalse(zkShardTerms.canBecomeLeader(notLeaders.get(0).getName()));
+    }
     Thread.sleep(sleepMsBeforeHealPartition);
-
-    // Verify that the partitioned replica is DOWN
-    ZkStateReader zkr = cloudClient.getZkStateReader();
-    zkr.forceUpdateCollection(testCollectionName);; // force the state to be fresh
-    ClusterState cs = zkr.getClusterState();
-    Collection<Slice> slices = cs.getCollection(testCollectionName).getActiveSlices();
-    Slice slice = slices.iterator().next();
-    Replica partitionedReplica = slice.getReplica(notLeaders.get(0).getName());
-    assertEquals("The partitioned replica did not get marked down",
-        Replica.State.DOWN.toString(), partitionedReplica.getStr(ZkStateReader.STATE_PROP));
-    log.info("un-partitioning replica :  " + notLeaders.get(0));
-
     proxy0.reopen();
+    leaderProxy.reopen();
 
     notLeaders =
         ensureAllReplicasAreActive(testCollectionName, "shard1", 1, 3, maxWaitSecsToSeeAllActive);
@@ -254,8 +310,10 @@ public class HttpPartitionTest extends AbstractFullDistribZkTestBase {
     proxy0.close();
     SocketProxy proxy1 = getProxyForReplica(notLeaders.get(1));
     proxy1.close();
+    leaderProxy = getProxyForReplica(getShardLeader(testCollectionName, "shard1", 1000));
+    leaderProxy.close();
 
-    achievedRf = sendDoc(3, 2);
+    achievedRf = sendDoc(3, 2, leaderJetty);
     assertEquals("Unexpected achieved replication factor", 1, achievedRf);
 
     Thread.sleep(sleepMsBeforeHealPartition);
@@ -265,6 +323,7 @@ public class HttpPartitionTest extends AbstractFullDistribZkTestBase {
 
     proxy0.reopen();
     proxy1.reopen();
+    leaderProxy.reopen();
 
     notLeaders =
         ensureAllReplicasAreActive(testCollectionName, "shard1", 1, 3, maxWaitSecsToSeeAllActive);
@@ -299,30 +358,32 @@ public class HttpPartitionTest extends AbstractFullDistribZkTestBase {
     
     Replica notLeader = 
         ensureAllReplicasAreActive(testCollectionName, "shard1", 1, 2, maxWaitSecsToSeeAllActive).get(0);
-    
+    JettySolrRunner leaderJetty = getJettyOnPort(getReplicaPort(getShardLeader(testCollectionName, "shard1", 1000)));
+
     // ok, now introduce a network partition between the leader and the replica
     SocketProxy proxy = getProxyForReplica(notLeader);
+    SocketProxy leaderProxy = getProxyForReplica(getShardLeader(testCollectionName, "shard1", 1000));
     
     proxy.close();
-    
+    leaderProxy.close();
+
     // indexing during a partition
-    sendDoc(2);
-    
-    // Have the partition last at least 1 sec
-    // While this gives the impression that recovery is timing related, this is
-    // really only
-    // to give time for the state to be written to ZK before the test completes.
-    // In other words,
-    // without a brief pause, the test finishes so quickly that it doesn't give
-    // time for the recovery process to kick-in
-    Thread.sleep(sleepMsBeforeHealPartition);
+    sendDoc(2, null, leaderJetty);
+    // replica should publish itself as DOWN if the network is not healed after some amount time
+    waitForState(testCollectionName, notLeader.getName(), DOWN, 10000);
     
     proxy.reopen();
+    leaderProxy.reopen();
     
     List<Replica> notLeaders = 
         ensureAllReplicasAreActive(testCollectionName, "shard1", 1, 2, maxWaitSecsToSeeAllActive);
     
-    sendDoc(3);
+    int achievedRf = sendDoc(3);
+    if (achievedRf == 1) {
+      // this case can happen when leader reuse an connection get established before network partition
+      // TODO: Remove when SOLR-11776 get committed
+      ensureAllReplicasAreActive(testCollectionName, "shard1", 1, 2, maxWaitSecsToSeeAllActive);
+    }
     
     // sent 3 docs in so far, verify they are on the leader and replica
     assertDocsExistInAllReplicas(notLeaders, testCollectionName, 1, 3);
@@ -349,21 +410,25 @@ public class HttpPartitionTest extends AbstractFullDistribZkTestBase {
       if (d % 10 == 0) {
         if (hasPartition) {
           proxy.reopen();
+          leaderProxy.reopen();
           hasPartition = false;
         } else {
           if (d >= 10) {
             proxy.close();
+            leaderProxy.close();
             hasPartition = true;
             Thread.sleep(sleepMsBeforeHealPartition);
           }
         }
       }
-      sendDoc(d + 4); // 4 is offset as we've already indexed 1-3
+      // always send doc directly to leader without going through proxy
+      sendDoc(d + 4, null, leaderJetty); // 4 is offset as we've already indexed 1-3
     }
     
     // restore connectivity if lost
     if (hasPartition) {
       proxy.reopen();
+      leaderProxy.reopen();
     }
     
     notLeaders = ensureAllReplicasAreActive(testCollectionName, "shard1", 1, 2, maxWaitSecsToSeeAllActive);
@@ -384,7 +449,24 @@ public class HttpPartitionTest extends AbstractFullDistribZkTestBase {
     // try to clean up
     attemptCollectionDelete(cloudClient, testCollectionName);
   }
-  
+
+  protected void waitForState(String collection, String replicaName, Replica.State state, long ms) throws KeeperException, InterruptedException {
+    TimeOut timeOut = new TimeOut(ms, TimeUnit.MILLISECONDS, TimeSource.CURRENT_TIME);
+    Replica.State replicaState = Replica.State.ACTIVE;
+    while (!timeOut.hasTimedOut()) {
+      ZkStateReader zkr = cloudClient.getZkStateReader();
+      zkr.forceUpdateCollection(collection);; // force the state to be fresh
+      ClusterState cs = zkr.getClusterState();
+      Collection<Slice> slices = cs.getCollection(collection).getActiveSlices();
+      Slice slice = slices.iterator().next();
+      Replica partitionedReplica = slice.getReplica(replicaName);
+      replicaState = partitionedReplica.getState();
+      if (replicaState == state) return;
+    }
+    assertEquals("Timeout waiting for state "+ state +" of replica " + replicaName + ", current state " + replicaState,
+        state, replicaState);
+  }
+
   protected void testRf3() throws Exception {
     // create a collection that has 1 shard but 2 replicas
     String testCollectionName = "c8n_1x3";
@@ -400,27 +482,30 @@ public class HttpPartitionTest extends AbstractFullDistribZkTestBase {
         + " but found " + notLeaders.size() + "; clusterState: "
         + printClusterStateInfo(testCollectionName),
         notLeaders.size() == 2);
+    JettySolrRunner leaderJetty = getJettyOnPort(getReplicaPort(getShardLeader(testCollectionName, "shard1", 1000)));
 
     // ok, now introduce a network partition between the leader and the replica
     SocketProxy proxy0 = getProxyForReplica(notLeaders.get(0));
+    SocketProxy leaderProxy = getProxyForReplica(getShardLeader(testCollectionName, "shard1", 1000));
     
     proxy0.close();
+    leaderProxy.close();
     
     // indexing during a partition
-    sendDoc(2);
+    sendDoc(2, null, leaderJetty);
     
     Thread.sleep(sleepMsBeforeHealPartition);
-    
     proxy0.reopen();
     
     SocketProxy proxy1 = getProxyForReplica(notLeaders.get(1));
-    
     proxy1.close();
     
-    sendDoc(3);
+    sendDoc(3, null, leaderJetty);
     
     Thread.sleep(sleepMsBeforeHealPartition);
     proxy1.reopen();
+
+    leaderProxy.reopen();
     
     // sent 4 docs in so far, verify they are on the leader and replica
     notLeaders = ensureAllReplicasAreActive(testCollectionName, "shard1", 1, 3, maxWaitSecsToSeeAllActive); 
@@ -578,8 +663,19 @@ public class HttpPartitionTest extends AbstractFullDistribZkTestBase {
   protected int sendDoc(int docId) throws Exception {
     return sendDoc(docId, null);
   }
-  
+
+  // Send doc directly to a server (without going through proxy)
+  protected int sendDoc(int docId, Integer minRf, JettySolrRunner leaderJetty) throws IOException, SolrServerException {
+    try (HttpSolrClient solrClient = new HttpSolrClient.Builder(leaderJetty.getBaseUrl().toString()).build()) {
+      return sendDoc(docId, minRf, solrClient, cloudClient.getDefaultCollection());
+    }
+  }
+
   protected int sendDoc(int docId, Integer minRf) throws Exception {
+    return sendDoc(docId, minRf, cloudClient, cloudClient.getDefaultCollection());
+  }
+
+  protected int sendDoc(int docId, Integer minRf, SolrClient solrClient, String collection) throws IOException, SolrServerException {
     SolrInputDocument doc = new SolrInputDocument();
     doc.addField(id, String.valueOf(docId));
     doc.addField("a_t", "hello" + docId);
@@ -589,8 +685,7 @@ public class HttpPartitionTest extends AbstractFullDistribZkTestBase {
       up.setParam(UpdateRequest.MIN_REPFACT, String.valueOf(minRf));
     }
     up.add(doc);
-
-    return cloudClient.getMinAchievedReplicationFactor(cloudClient.getDefaultCollection(), cloudClient.request(up));
+    return cloudClient.getMinAchievedReplicationFactor(collection, solrClient.request(up, collection));
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/8c8d78a4/solr/core/src/test/org/apache/solr/cloud/LIRRollingUpdatesTest.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/LIRRollingUpdatesTest.java b/solr/core/src/test/org/apache/solr/cloud/LIRRollingUpdatesTest.java
new file mode 100644
index 0000000..a15406e
--- /dev/null
+++ b/solr/core/src/test/org/apache/solr/cloud/LIRRollingUpdatesTest.java
@@ -0,0 +1,457 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.cloud;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.OutputStreamWriter;
+import java.io.Reader;
+import java.io.Writer;
+import java.lang.invoke.MethodHandles;
+import java.net.URI;
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Supplier;
+
+import org.apache.solr.JSONTestUtil;
+import org.apache.solr.client.solrj.SolrServerException;
+import org.apache.solr.client.solrj.embedded.JettySolrRunner;
+import org.apache.solr.client.solrj.impl.HttpSolrClient;
+import org.apache.solr.client.solrj.request.CollectionAdminRequest;
+import org.apache.solr.client.solrj.request.QueryRequest;
+import org.apache.solr.common.SolrInputDocument;
+import org.apache.solr.common.cloud.Replica;
+import org.apache.solr.common.cloud.Slice;
+import org.apache.solr.common.cloud.ZkCoreNodeProps;
+import org.apache.solr.common.cloud.ZkStateReader;
+import org.apache.solr.common.util.NamedList;
+import org.apache.solr.common.util.TimeSource;
+import org.apache.solr.util.TimeOut;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class LIRRollingUpdatesTest extends SolrCloudTestCase {
+
+  private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+  private static Map<URI, SocketProxy> proxies;
+  private static Map<URI, JettySolrRunner> jettys;
+
+  @BeforeClass
+  public static void setupCluster() throws Exception {
+    configureCluster(3)
+        .addConfig("conf", configset("cloud-minimal"))
+        .configure();
+    // Add proxies
+    proxies = new HashMap<>(cluster.getJettySolrRunners().size());
+    jettys = new HashMap<>(cluster.getJettySolrRunners().size());
+    for (JettySolrRunner jetty:cluster.getJettySolrRunners()) {
+      SocketProxy proxy = new SocketProxy();
+      jetty.setProxyPort(proxy.getListenPort());
+      cluster.stopJettySolrRunner(jetty);//TODO: Can we avoid this restart
+      cluster.startJettySolrRunner(jetty);
+      proxy.open(jetty.getBaseUrl().toURI());
+      LOG.info("Adding proxy for URL: " + jetty.getBaseUrl() + ". Proxy: " + proxy.getUrl());
+      proxies.put(proxy.getUrl(), proxy);
+      jettys.put(proxy.getUrl(), jetty);
+    }
+  }
+
+
+  @AfterClass
+  public static void tearDownCluster() throws Exception {
+    for (SocketProxy proxy:proxies.values()) {
+      proxy.close();
+    }
+    proxies = null;
+    jettys = null;
+  }
+
+  @Test
+  public void testNewReplicaOldLeader() throws Exception {
+
+    String collection = "testNewReplicaOldLeader";
+    CollectionAdminRequest.createCollection(collection, 1, 2)
+        .setCreateNodeSet("")
+        .process(cluster.getSolrClient());
+    Properties oldLir = new Properties();
+    oldLir.setProperty("lirVersion", "old");
+
+    CollectionAdminRequest
+        .addReplicaToShard(collection, "shard1")
+        .setProperties(oldLir)
+        .setNode(cluster.getJettySolrRunner(0).getNodeName())
+        .process(cluster.getSolrClient());
+
+    CollectionAdminRequest
+        .addReplicaToShard(collection, "shard1")
+        .setProperties(oldLir)
+        .setNode(cluster.getJettySolrRunner(1).getNodeName())
+        .process(cluster.getSolrClient());
+    addDocs(collection, 2, 0);
+
+    Slice shard1 = getCollectionState(collection).getSlice("shard1");
+    //introduce network partition between leader & replica
+    Replica notLeader = shard1.getReplicas(x -> x != shard1.getLeader()).get(0);
+    assertTrue(runInOldLIRMode(collection, "shard1", notLeader));
+    getProxyForReplica(notLeader).close();
+    getProxyForReplica(shard1.getLeader()).close();
+
+    addDoc(collection, 2, getJettyForReplica(shard1.getLeader()));
+    waitForState("Replica " + notLeader.getName() + " is not put as DOWN", collection,
+        (liveNodes, collectionState) ->
+            collectionState.getSlice("shard1").getReplica(notLeader.getName()).getState() == Replica.State.DOWN);
+    getProxyForReplica(shard1.getLeader()).reopen();
+    getProxyForReplica(notLeader).reopen();
+    // make sure that, when new replica works with old leader, it still can recovery normally
+    waitForState("Timeout waiting for recovering", collection, clusterShape(1, 2));
+    assertDocsExistInAllReplicas(Collections.singletonList(notLeader), collection, 0, 2);
+
+    // make sure that, when new replica restart during LIR, it still can recovery normally (by looking at LIR node)
+    getProxyForReplica(notLeader).close();
+    getProxyForReplica(shard1.getLeader()).close();
+
+    addDoc(collection, 3, getJettyForReplica(shard1.getLeader()));
+    waitForState("Replica " + notLeader.getName() + " is not put as DOWN", collection,
+        (liveNodes, collectionState) ->
+            collectionState.getSlice("shard1").getReplica(notLeader.getName()).getState() == Replica.State.DOWN);
+
+    JettySolrRunner notLeaderJetty = getJettyForReplica(notLeader);
+    notLeaderJetty.stop();
+    waitForState("Node did not leave", collection, (liveNodes, collectionState) -> liveNodes.size() == 2);
+    upgrade(notLeaderJetty);
+    notLeaderJetty.start();
+
+    getProxyForReplica(shard1.getLeader()).reopen();
+    getProxyForReplica(notLeader).reopen();
+    waitForState("Timeout waiting for recovering", collection, clusterShape(1, 2));
+    assertFalse(runInOldLIRMode(collection, "shard1", notLeader));
+    assertDocsExistInAllReplicas(Collections.singletonList(notLeader), collection, 0, 3);
+
+    CollectionAdminRequest.deleteCollection(collection).process(cluster.getSolrClient());
+  }
+
+  public void testNewLeaderOldReplica() throws Exception {
+    // in case of new leader & old replica, new leader can still put old replica into LIR
+
+    String collection = "testNewLeaderOldReplica";
+    CollectionAdminRequest.createCollection(collection, 1, 2)
+        .setCreateNodeSet("")
+        .process(cluster.getSolrClient());
+    Properties oldLir = new Properties();
+    oldLir.setProperty("lirVersion", "old");
+
+    CollectionAdminRequest
+        .addReplicaToShard(collection, "shard1")
+        .setNode(cluster.getJettySolrRunner(0).getNodeName())
+        .process(cluster.getSolrClient());
+    waitForState("Timeout waiting for shard1 become active", collection, (liveNodes, collectionState) -> {
+      Slice shard1 = collectionState.getSlice("shard1");
+      if (shard1.getReplicas().size() == 1 && shard1.getLeader() != null) return true;
+      return false;
+    });
+
+    CollectionAdminRequest
+        .addReplicaToShard(collection, "shard1")
+        .setProperties(oldLir)
+        .setNode(cluster.getJettySolrRunner(1).getNodeName())
+        .process(cluster.getSolrClient());
+
+    Slice shard1 = getCollectionState(collection).getSlice("shard1");
+    Replica notLeader = shard1.getReplicas(x -> x != shard1.getLeader()).get(0);
+    Replica leader = shard1.getLeader();
+
+    assertTrue(runInOldLIRMode(collection, "shard1", notLeader));
+    assertFalse(runInOldLIRMode(collection, "shard1", leader));
+
+    addDocs(collection, 2, 0);
+    getProxyForReplica(notLeader).close();
+    getProxyForReplica(leader).close();
+
+    JettySolrRunner leaderJetty = getJettyForReplica(leader);
+    addDoc(collection, 2, leaderJetty);
+    waitForState("Replica " + notLeader.getName() + " is not put as DOWN", collection,
+        (liveNodes, collectionState) ->
+            collectionState.getSlice("shard1").getReplica(notLeader.getName()).getState() == Replica.State.DOWN);
+    // wait a little bit
+    Thread.sleep(500);
+    getProxyForReplica(notLeader).reopen();
+    getProxyForReplica(leader).reopen();
+
+    waitForState("Timeout waiting for recovering", collection, clusterShape(1, 2));
+    assertDocsExistInAllReplicas(Collections.singletonList(notLeader), collection, 0, 2);
+
+    // ensure that after recovery, the upgraded replica will clean its LIR status cause it is no longer needed
+    assertFalse(cluster.getSolrClient().getZkStateReader().getZkClient().exists(
+        ZkController.getLeaderInitiatedRecoveryZnodePath(collection, "shard1", notLeader.getName()), true));
+    // ensure that, leader should not register other replica's term
+    try (ZkShardTerms zkShardTerms = new ZkShardTerms(collection, "shard1", cluster.getZkClient())) {
+      assertFalse(zkShardTerms.getTerms().containsKey(notLeader.getName()));
+    }
+    CollectionAdminRequest.deleteCollection(collection).process(cluster.getSolrClient());
+  }
+
+  public void testLeaderAndMixedReplicas(boolean leaderInOldMode) throws Exception {
+    // in case of new leader and mixed old replica and new replica, new leader can still put all of them into recovery
+    // step1 : setup collection
+    String collection = "testMixedReplicas-"+leaderInOldMode;
+    CollectionAdminRequest.createCollection(collection, 1, 2)
+        .setCreateNodeSet("")
+        .process(cluster.getSolrClient());
+    Properties oldLir = new Properties();
+    oldLir.setProperty("lirVersion", "old");
+
+    if (leaderInOldMode) {
+      CollectionAdminRequest
+          .addReplicaToShard(collection, "shard1")
+          .setProperties(oldLir)
+          .setNode(cluster.getJettySolrRunner(0).getNodeName())
+          .process(cluster.getSolrClient());
+    } else {
+      CollectionAdminRequest
+          .addReplicaToShard(collection, "shard1")
+          .setNode(cluster.getJettySolrRunner(0).getNodeName())
+          .process(cluster.getSolrClient());
+    }
+
+    waitForState("Timeout waiting for shard1 become active", collection, clusterShape(1, 1));
+
+    CollectionAdminRequest
+        .addReplicaToShard(collection, "shard1")
+        .setProperties(oldLir)
+        .setNode(cluster.getJettySolrRunner(1).getNodeName())
+        .process(cluster.getSolrClient());
+
+    CollectionAdminRequest
+        .addReplicaToShard(collection, "shard1")
+        .setNode(cluster.getJettySolrRunner(2).getNodeName())
+        .process(cluster.getSolrClient());
+
+    Slice shard1 = getCollectionState(collection).getSlice("shard1");
+    Replica replicaInOldMode = shard1.getReplicas(x -> x != shard1.getLeader()).get(0);
+    Replica replicaInNewMode = shard1.getReplicas(x -> x != shard1.getLeader()).get(1);
+    Replica leader = shard1.getLeader();
+
+    assertEquals(leaderInOldMode, runInOldLIRMode(collection, "shard1", leader));
+    if (!runInOldLIRMode(collection, "shard1", replicaInOldMode)) {
+      Replica temp = replicaInOldMode;
+      replicaInOldMode = replicaInNewMode;
+      replicaInNewMode = temp;
+    }
+    assertTrue(runInOldLIRMode(collection, "shard1", replicaInOldMode));
+    assertFalse(runInOldLIRMode(collection, "shard1", replicaInNewMode));
+
+    addDocs(collection, 2, 0);
+
+    // step2 : introduce network partition then add doc, replicas should be put into recovery
+    getProxyForReplica(replicaInOldMode).close();
+    getProxyForReplica(replicaInNewMode).close();
+    getProxyForReplica(leader).close();
+
+    JettySolrRunner leaderJetty = getJettyForReplica(leader);
+    addDoc(collection, 2, leaderJetty);
+
+    Replica finalReplicaInOldMode = replicaInOldMode;
+    waitForState("Replica " + replicaInOldMode.getName() + " is not put as DOWN", collection,
+        (liveNodes, collectionState) ->
+            collectionState.getSlice("shard1").getReplica(finalReplicaInOldMode.getName()).getState() == Replica.State.DOWN);
+
+    // wait a little bit
+    Thread.sleep(500);
+    getProxyForReplica(replicaInOldMode).reopen();
+    getProxyForReplica(replicaInNewMode).reopen();
+    getProxyForReplica(leader).reopen();
+
+    waitForState("Timeout waiting for recovering", collection, clusterShape(1, 3));
+    assertDocsExistInAllReplicas(Arrays.asList(replicaInNewMode, replicaInOldMode), collection, 0, 2);
+
+    addDocs(collection, 3, 3);
+
+    // ensure that, leader should not register other replica's term
+    try (ZkShardTerms zkShardTerms = new ZkShardTerms(collection, "shard1", cluster.getZkClient())) {
+      assertFalse(zkShardTerms.getTerms().containsKey(replicaInOldMode.getName()));
+    }
+
+    // step3 : upgrade the replica running in old mode to the new mode
+    getProxyForReplica(leader).close();
+    getProxyForReplica(replicaInOldMode).close();
+    addDoc(collection, 6, leaderJetty);
+    JettySolrRunner oldJetty = getJettyForReplica(replicaInOldMode);
+    oldJetty.stop();
+    waitForState("Node did not leave", collection, (liveNodes, collectionState)
+        -> liveNodes.size() == 2);
+    upgrade(oldJetty);
+
+    oldJetty.start();
+    getProxyForReplica(leader).reopen();
+    getProxyForReplica(replicaInOldMode).reopen();
+
+    waitForState("Timeout waiting for recovering", collection, clusterShape(1, 3));
+    assertDocsExistInAllReplicas(Arrays.asList(replicaInNewMode, replicaInOldMode), collection, 0, 6);
+
+    CollectionAdminRequest.deleteCollection(collection).process(cluster.getSolrClient());
+  }
+
+  @Test
+  public void testNewLeaderAndMixedReplicas() throws Exception {
+    testLeaderAndMixedReplicas(false);
+  }
+
+  @Test
+  public void testOldLeaderAndMixedReplicas() throws Exception {
+    testLeaderAndMixedReplicas(true);
+  }
+
+  private void upgrade(JettySolrRunner solrRunner) {
+    File[] corePaths = new File(solrRunner.getSolrHome()).listFiles();
+    for (File corePath : corePaths) {
+      File coreProperties = new File(corePath, "core.properties");
+      if (!coreProperties.exists()) continue;
+      Properties properties = new Properties();
+
+      try (Reader reader = new InputStreamReader(new FileInputStream(coreProperties), "UTF-8")) {
+        properties.load(reader);
+      } catch (Exception e) {
+        continue;
+      }
+      properties.remove("lirVersion");
+      try (Writer writer = new OutputStreamWriter(new FileOutputStream(coreProperties), "UTF-8")) {
+        properties.store(writer, "Upgraded");
+      } catch (Exception e) {
+        continue;
+      }
+    }
+  }
+
+  protected void assertDocsExistInAllReplicas(List<Replica> notLeaders,
+                                              String testCollectionName, int firstDocId, int lastDocId)
+      throws Exception {
+    Replica leader =
+        cluster.getSolrClient().getZkStateReader().getLeaderRetry(testCollectionName, "shard1", 10000);
+    HttpSolrClient leaderSolr = getHttpSolrClient(leader, testCollectionName);
+    List<HttpSolrClient> replicas =
+        new ArrayList<HttpSolrClient>(notLeaders.size());
+
+    for (Replica r : notLeaders) {
+      replicas.add(getHttpSolrClient(r, testCollectionName));
+    }
+    try {
+      for (int d = firstDocId; d <= lastDocId; d++) {
+        String docId = String.valueOf(d);
+        assertDocExists(leaderSolr, testCollectionName, docId);
+        for (HttpSolrClient replicaSolr : replicas) {
+          assertDocExists(replicaSolr, testCollectionName, docId);
+        }
+      }
+    } finally {
+      if (leaderSolr != null) {
+        leaderSolr.close();
+      }
+      for (HttpSolrClient replicaSolr : replicas) {
+        replicaSolr.close();
+      }
+    }
+  }
+
+  protected void assertDocExists(HttpSolrClient solr, String coll, String docId) throws Exception {
+    NamedList rsp = realTimeGetDocId(solr, docId);
+    String match = JSONTestUtil.matchObj("/id", rsp.get("doc"), docId);
+    assertTrue("Doc with id=" + docId + " not found in " + solr.getBaseURL()
+        + " due to: " + match + "; rsp="+rsp, match == null);
+  }
+
+  private NamedList realTimeGetDocId(HttpSolrClient solr, String docId) throws SolrServerException, IOException {
+    QueryRequest qr = new QueryRequest(params("qt", "/get", "id", docId, "distrib", "false"));
+    return solr.request(qr);
+  }
+
+  protected HttpSolrClient getHttpSolrClient(Replica replica, String coll) throws Exception {
+    ZkCoreNodeProps zkProps = new ZkCoreNodeProps(replica);
+    String url = zkProps.getBaseUrl() + "/" + coll;
+    return getHttpSolrClient(url);
+  }
+
+  private <T> void waitFor(int waitTimeInSecs, T expected, Supplier<T> supplier) throws InterruptedException {
+    TimeOut timeOut = new TimeOut(waitTimeInSecs, TimeUnit.SECONDS, new TimeSource.CurrentTimeSource());
+    while (!timeOut.hasTimedOut()) {
+      if (expected == supplier.get()) return;
+      Thread.sleep(100);
+    }
+    assertEquals(expected, supplier.get());
+  }
+
+  private boolean runInOldLIRMode(String collection, String shard, Replica replica) {
+    try (ZkShardTerms shardTerms = new ZkShardTerms(collection, shard, cluster.getZkClient())) {
+      return !shardTerms.registered(replica.getName());
+    }
+  }
+
+  private void addDoc(String collection, int docId, JettySolrRunner solrRunner) throws IOException, SolrServerException {
+    try (HttpSolrClient solrClient = new HttpSolrClient.Builder(solrRunner.getBaseUrl().toString()).build()) {
+      solrClient.add(collection, new SolrInputDocument("id", String.valueOf(docId), "fieldName_s", String.valueOf(docId)));
+    }
+  }
+
+  private void addDocs(String collection, int numDocs, int startId) throws SolrServerException, IOException {
+    List<SolrInputDocument> docs = new ArrayList<>(numDocs);
+    for (int i = 0; i < numDocs; i++) {
+      int id = startId + i;
+      docs.add(new SolrInputDocument("id", String.valueOf(id), "fieldName_s", String.valueOf(id)));
+    }
+    cluster.getSolrClient().add(collection, docs);
+    cluster.getSolrClient().commit(collection);
+  }
+
+
+  protected JettySolrRunner getJettyForReplica(Replica replica) throws Exception {
+    String replicaBaseUrl = replica.getStr(ZkStateReader.BASE_URL_PROP);
+    assertNotNull(replicaBaseUrl);
+    URL baseUrl = new URL(replicaBaseUrl);
+
+    JettySolrRunner proxy = jettys.get(baseUrl.toURI());
+    assertNotNull("No proxy found for " + baseUrl + "!", proxy);
+    return proxy;
+  }
+
+  protected SocketProxy getProxyForReplica(Replica replica) throws Exception {
+    String replicaBaseUrl = replica.getStr(ZkStateReader.BASE_URL_PROP);
+    assertNotNull(replicaBaseUrl);
+    URL baseUrl = new URL(replicaBaseUrl);
+
+    SocketProxy proxy = proxies.get(baseUrl.toURI());
+    if (proxy == null && !baseUrl.toExternalForm().endsWith("/")) {
+      baseUrl = new URL(baseUrl.toExternalForm() + "/");
+      proxy = proxies.get(baseUrl.toURI());
+    }
+    assertNotNull("No proxy found for " + baseUrl + "!", proxy);
+    return proxy;
+  }
+}

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/8c8d78a4/solr/core/src/test/org/apache/solr/cloud/LeaderInitiatedRecoveryOnCommitTest.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/LeaderInitiatedRecoveryOnCommitTest.java b/solr/core/src/test/org/apache/solr/cloud/LeaderInitiatedRecoveryOnCommitTest.java
deleted file mode 100644
index b4e4860..0000000
--- a/solr/core/src/test/org/apache/solr/cloud/LeaderInitiatedRecoveryOnCommitTest.java
+++ /dev/null
@@ -1,178 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.solr.cloud;
-
-import org.apache.http.NoHttpResponseException;
-import org.apache.solr.client.solrj.embedded.JettySolrRunner;
-import org.apache.solr.client.solrj.impl.HttpSolrClient;
-import org.apache.solr.common.SolrException;
-import org.apache.solr.common.cloud.Replica;
-import org.apache.solr.util.RTimer;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.File;
-import java.lang.invoke.MethodHandles;
-import java.util.List;
-
-public class LeaderInitiatedRecoveryOnCommitTest extends BasicDistributedZkTest {
-
-  private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
-
-  private static final long sleepMsBeforeHealPartition = 2000L;
-
-  private final boolean onlyLeaderIndexes = random().nextBoolean();
-
-  public LeaderInitiatedRecoveryOnCommitTest() {
-    super();
-    sliceCount = 1;
-    fixShardCount(4);
-  }
-
-  @Override
-  protected boolean useTlogReplicas() {
-    return onlyLeaderIndexes;
-  }
-
-  @Override
-  @Test
-  public void test() throws Exception {
-    oneShardTest();
-    multiShardTest();
-  }
-
-  private void multiShardTest() throws Exception {
-
-    log.info("Running multiShardTest");
-
-    // create a collection that has 2 shard and 2 replicas
-    String testCollectionName = "c8n_2x2_commits";
-    createCollection(testCollectionName, "conf1", 2, 2, 1);
-    cloudClient.setDefaultCollection(testCollectionName);
-
-    List<Replica> notLeaders =
-        ensureAllReplicasAreActive(testCollectionName, "shard1", 2, 2, 30);
-    assertTrue("Expected 1 replicas for collection " + testCollectionName
-            + " but found " + notLeaders.size() + "; clusterState: "
-            + printClusterStateInfo(),
-        notLeaders.size() == 1);
-
-    log.info("All replicas active for "+testCollectionName);
-
-    // let's put the leader in its own partition, no replicas can contact it now
-    Replica leader = cloudClient.getZkStateReader().getLeaderRetry(testCollectionName, "shard1");
-    log.info("Creating partition to leader at "+leader.getCoreUrl());
-    SocketProxy leaderProxy = getProxyForReplica(leader);
-    leaderProxy.close();
-
-    // let's find the leader of shard2 and ask him to commit
-    Replica shard2Leader = cloudClient.getZkStateReader().getLeaderRetry(testCollectionName, "shard2");
-    sendCommitWithRetry(shard2Leader);
-
-    Thread.sleep(sleepMsBeforeHealPartition);
-
-    cloudClient.getZkStateReader().forceUpdateCollection(testCollectionName); // get the latest state
-    leader = cloudClient.getZkStateReader().getLeaderRetry(testCollectionName, "shard1");
-    assertSame("Leader was not active", Replica.State.ACTIVE, leader.getState());
-
-    log.info("Healing partitioned replica at "+leader.getCoreUrl());
-    leaderProxy.reopen();
-    Thread.sleep(sleepMsBeforeHealPartition);
-
-    // try to clean up
-    attemptCollectionDelete(cloudClient, testCollectionName);
-
-    log.info("multiShardTest completed OK");
-  }
-
-  private void oneShardTest() throws Exception {
-    log.info("Running oneShardTest");
-
-    // create a collection that has 1 shard and 3 replicas
-    String testCollectionName = "c8n_1x3_commits";
-    createCollection(testCollectionName, "conf1", 1, 3, 1);
-    cloudClient.setDefaultCollection(testCollectionName);
-
-    List<Replica> notLeaders =
-        ensureAllReplicasAreActive(testCollectionName, "shard1", 1, 3, 30);
-    assertTrue("Expected 2 replicas for collection " + testCollectionName
-            + " but found " + notLeaders.size() + "; clusterState: "
-            + printClusterStateInfo(),
-        notLeaders.size() == 2);
-
-    log.info("All replicas active for "+testCollectionName);
-
-    // let's put the leader in its own partition, no replicas can contact it now
-    Replica leader = cloudClient.getZkStateReader().getLeaderRetry(testCollectionName, "shard1");
-    log.info("Creating partition to leader at "+leader.getCoreUrl());
-    SocketProxy leaderProxy = getProxyForReplica(leader);
-    leaderProxy.close();
-
-    Replica replica = notLeaders.get(0);
-    sendCommitWithRetry(replica);
-    Thread.sleep(sleepMsBeforeHealPartition);
-
-    cloudClient.getZkStateReader().forceUpdateCollection(testCollectionName); // get the latest state
-    leader = cloudClient.getZkStateReader().getLeaderRetry(testCollectionName, "shard1");
-    assertSame("Leader was not active", Replica.State.ACTIVE, leader.getState());
-
-    log.info("Healing partitioned replica at "+leader.getCoreUrl());
-    leaderProxy.reopen();
-    Thread.sleep(sleepMsBeforeHealPartition);
-
-    // try to clean up
-    attemptCollectionDelete(cloudClient, testCollectionName);
-
-    log.info("oneShardTest completed OK");
-  }
-
-  /**
-   * Overrides the parent implementation to install a SocketProxy in-front of the Jetty server.
-   */
-  @Override
-  public JettySolrRunner createJetty(File solrHome, String dataDir,
-                                     String shardList, String solrConfigOverride, String schemaOverride, Replica.Type replicaType)
-      throws Exception {
-    return createProxiedJetty(solrHome, dataDir, shardList, solrConfigOverride, schemaOverride, replicaType);
-  }
-
-  protected void sendCommitWithRetry(Replica replica) throws Exception {
-    String replicaCoreUrl = replica.getCoreUrl();
-    log.info("Sending commit request to: "+replicaCoreUrl);
-    final RTimer timer = new RTimer();
-    try (HttpSolrClient client = getHttpSolrClient(replicaCoreUrl)) {
-      try {
-        client.commit();
-
-        log.info("Sent commit request to {} OK, took {}ms", replicaCoreUrl, timer.getTime());
-      } catch (Exception exc) {
-        Throwable rootCause = SolrException.getRootCause(exc);
-        if (rootCause instanceof NoHttpResponseException) {
-          log.warn("No HTTP response from sending commit request to "+replicaCoreUrl+
-              "; will re-try after waiting 3 seconds");
-          Thread.sleep(3000);
-          client.commit();
-          log.info("Second attempt at sending commit to "+replicaCoreUrl+" succeeded.");
-        } else {
-          throw exc;
-        }
-      }
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/8c8d78a4/solr/core/src/test/org/apache/solr/cloud/LeaderInitiatedRecoveryOnShardRestartTest.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/LeaderInitiatedRecoveryOnShardRestartTest.java b/solr/core/src/test/org/apache/solr/cloud/LeaderInitiatedRecoveryOnShardRestartTest.java
index 8064513..12bde17 100644
--- a/solr/core/src/test/org/apache/solr/cloud/LeaderInitiatedRecoveryOnShardRestartTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/LeaderInitiatedRecoveryOnShardRestartTest.java
@@ -18,6 +18,7 @@ package org.apache.solr.cloud;
 
 import java.lang.invoke.MethodHandles;
 import java.util.Map;
+import java.util.Properties;
 
 import org.apache.lucene.util.LuceneTestCase.Nightly;
 import org.apache.lucene.util.LuceneTestCase.Slow;
@@ -26,6 +27,7 @@ import org.apache.solr.client.solrj.SolrQuery;
 import org.apache.solr.client.solrj.embedded.JettySolrRunner;
 import org.apache.solr.client.solrj.impl.HttpSolrClient;
 import org.apache.solr.client.solrj.impl.HttpSolrClient.RemoteSolrException;
+import org.apache.solr.client.solrj.request.CollectionAdminRequest;
 import org.apache.solr.client.solrj.request.QueryRequest;
 import org.apache.solr.common.SolrInputDocument;
 import org.apache.solr.common.cloud.SolrZkClient;
@@ -45,6 +47,7 @@ import org.slf4j.LoggerFactory;
 @Slow
 @Nightly
 @AwaitsFix(bugUrl = "https://issues.apache.org/jira/browse/SOLR-10071")
+@Deprecated
 public class LeaderInitiatedRecoveryOnShardRestartTest extends AbstractFullDistribZkTestBase {
   private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
   
@@ -86,7 +89,14 @@ public class LeaderInitiatedRecoveryOnShardRestartTest extends AbstractFullDistr
     
     String testCollectionName = "all_in_lir";
     String shardId = "shard1";
-    createCollection(testCollectionName, "conf1", 1, 3, 1);
+    CollectionAdminRequest.createCollection(testCollectionName, "conf1", 1, 3)
+        .setCreateNodeSet("")
+        .process(cloudClient);
+    Properties oldLir = new Properties();
+    oldLir.setProperty("lirVersion", "old");
+    for (int i = 0; i < 3; i++) {
+      CollectionAdminRequest.addReplicaToShard(testCollectionName, "shard1").setProperties(oldLir).process(cloudClient);
+    }
     
     waitForRecoveriesToFinish(testCollectionName, false);
 

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/8c8d78a4/solr/core/src/test/org/apache/solr/cloud/TestLeaderInitiatedRecoveryThread.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/TestLeaderInitiatedRecoveryThread.java b/solr/core/src/test/org/apache/solr/cloud/TestLeaderInitiatedRecoveryThread.java
index 0fbc0a1..f099fc6 100644
--- a/solr/core/src/test/org/apache/solr/cloud/TestLeaderInitiatedRecoveryThread.java
+++ b/solr/core/src/test/org/apache/solr/cloud/TestLeaderInitiatedRecoveryThread.java
@@ -34,6 +34,7 @@ import org.apache.zookeeper.data.Stat;
 /**
  * Test for {@link LeaderInitiatedRecoveryThread}
  */
+@Deprecated
 @SolrTestCaseJ4.SuppressSSL
 public class TestLeaderInitiatedRecoveryThread extends AbstractFullDistribZkTestBase {
 

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/8c8d78a4/solr/core/src/test/org/apache/solr/cloud/ZkShardTermsTest.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/ZkShardTermsTest.java b/solr/core/src/test/org/apache/solr/cloud/ZkShardTermsTest.java
new file mode 100644
index 0000000..c205a50
--- /dev/null
+++ b/solr/core/src/test/org/apache/solr/cloud/ZkShardTermsTest.java
@@ -0,0 +1,204 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.cloud;
+
+import java.io.IOException;
+import java.lang.invoke.MethodHandles;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Supplier;
+
+import org.apache.solr.client.solrj.SolrServerException;
+import org.apache.solr.client.solrj.request.CollectionAdminRequest;
+import org.apache.solr.common.util.TimeSource;
+import org.apache.solr.util.TimeOut;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ZkShardTermsTest extends SolrCloudTestCase {
+
+  private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+  @BeforeClass
+  public static void setupCluster() throws Exception {
+    configureCluster(1)
+        .addConfig("conf1", TEST_PATH().resolve("configsets").resolve("cloud-minimal").resolve("conf"))
+        .configure();
+  }
+
+  public void testParticipationOfReplicas() throws IOException, SolrServerException, InterruptedException {
+    String collection = "collection1";
+    try (ZkShardTerms zkShardTerms = new ZkShardTerms(collection, "shard2", cluster.getZkClient())) {
+      zkShardTerms.registerTerm("replica1");
+      zkShardTerms.registerTerm("replica2");
+      zkShardTerms.ensureTermsIsHigher("replica1", Collections.singleton("replica2"));
+    }
+
+    // When new collection is created, the old term nodes will be removed
+    CollectionAdminRequest.createCollection(collection, 2, 2)
+        .setCreateNodeSet(cluster.getJettySolrRunner(0).getNodeName())
+        .setMaxShardsPerNode(1000)
+        .process(cluster.getSolrClient());
+    ZkController zkController = cluster.getJettySolrRunners().get(0).getCoreContainer().getZkController();
+    waitFor(2, () -> zkController.getShardTerms(collection, "shard1").getTerms().size());
+    assertArrayEquals(new Long[]{0L, 0L}, zkController.getShardTerms(collection, "shard1").getTerms().values().toArray(new Long[2]));
+    waitFor(2, () -> zkController.getShardTerms(collection, "shard2").getTerms().size());
+    assertArrayEquals(new Long[]{0L, 0L}, zkController.getShardTerms(collection, "shard2").getTerms().values().toArray(new Long[2]));
+  }
+
+  public void testRegisterTerm() throws InterruptedException {
+    String collection = "registerTerm";
+    ZkShardTerms rep1Terms = new ZkShardTerms(collection, "shard1", cluster.getZkClient());
+    ZkShardTerms rep2Terms = new ZkShardTerms(collection, "shard1", cluster.getZkClient());
+
+    rep1Terms.registerTerm("rep1");
+    rep2Terms.registerTerm("rep2");
+    try (ZkShardTerms zkShardTerms = new ZkShardTerms(collection, "shard1", cluster.getZkClient())) {
+      assertEquals(0L, zkShardTerms.getTerm("rep1"));
+      assertEquals(0L, zkShardTerms.getTerm("rep2"));
+    }
+    waitFor(2, () -> rep1Terms.getTerms().size());
+    rep1Terms.ensureTermsIsHigher("rep1", Collections.singleton("rep2"));
+    assertEquals(1L, rep1Terms.getTerm("rep1"));
+    assertEquals(0L, rep1Terms.getTerm("rep2"));
+
+    // assert registerTerm does not override current value
+    rep1Terms.registerTerm("rep1");
+    assertEquals(1L, rep1Terms.getTerm("rep1"));
+
+    waitFor(1L, () -> rep2Terms.getTerm("rep1"));
+    rep2Terms.setEqualsToMax("rep2");
+    assertEquals(1L, rep2Terms.getTerm("rep2"));
+    rep2Terms.registerTerm("rep2");
+    assertEquals(1L, rep2Terms.getTerm("rep2"));
+
+    // zkShardTerms must stay updated by watcher
+    Map<String, Long> expectedTerms = new HashMap<>();
+    expectedTerms.put("rep1", 1L);
+    expectedTerms.put("rep2", 1L);
+
+    TimeOut timeOut = new TimeOut(10, TimeUnit.SECONDS, new TimeSource.CurrentTimeSource());
+    while (!timeOut.hasTimedOut()) {
+      if (Objects.equals(expectedTerms, rep1Terms.getTerms()) && Objects.equals(expectedTerms, rep2Terms.getTerms())) break;
+    }
+    if (timeOut.hasTimedOut()) fail("Expected zkShardTerms must stay updated");
+
+    rep1Terms.close();
+    rep2Terms.close();
+  }
+
+  @Test
+  public void testRaceConditionOnUpdates() throws InterruptedException {
+    String collection = "raceConditionOnUpdates";
+    List<String> replicas = Arrays.asList("rep1", "rep2", "rep3", "rep4");
+    for (String replica : replicas) {
+      try (ZkShardTerms zkShardTerms = new ZkShardTerms(collection, "shard1", cluster.getZkClient())) {
+        zkShardTerms.registerTerm(replica);
+      }
+    }
+
+    List<String> failedReplicas = new ArrayList<>(replicas);
+    Collections.shuffle(failedReplicas, random());
+    while (failedReplicas.size() > 2) {
+      failedReplicas.remove(0);
+    }
+    AtomicBoolean stop = new AtomicBoolean(false);
+    Thread[] threads = new Thread[failedReplicas.size()];
+    for (int i = 0; i < failedReplicas.size(); i++) {
+      String replica = failedReplicas.get(i);
+      threads[i] = new Thread(() -> {
+        try (ZkShardTerms zkShardTerms = new ZkShardTerms(collection, "shard1", cluster.getZkClient())) {
+          while (!stop.get()) {
+            try {
+              Thread.sleep(random().nextInt(200));
+              zkShardTerms.setEqualsToMax(replica);
+            } catch (InterruptedException e) {
+              e.printStackTrace();
+            }
+          }
+        }
+      });
+      threads[i].start();
+    }
+
+    long maxTerm = 0;
+    try (ZkShardTerms shardTerms = new ZkShardTerms(collection, "shard1", cluster.getZkClient())) {
+      shardTerms.registerTerm("leader");
+      TimeOut timeOut = new TimeOut(10, TimeUnit.SECONDS, new TimeSource.CurrentTimeSource());
+      while (!timeOut.hasTimedOut()) {
+        maxTerm++;
+        assertEquals(shardTerms.getTerms().get("leader"), Collections.max(shardTerms.getTerms().values()));
+        Thread.sleep(100);
+      }
+      assertTrue(maxTerm >= Collections.max(shardTerms.getTerms().values()));
+    }
+    stop.set(true);
+    for (Thread thread : threads) {
+      thread.join();
+    }
+  }
+
+  public void testCoreTermWatcher() throws InterruptedException {
+    String collection = "coreTermWatcher";
+    ZkShardTerms leaderTerms = new ZkShardTerms(collection, "shard1", cluster.getZkClient());
+    leaderTerms.registerTerm("leader");
+    ZkShardTerms replicaTerms = new ZkShardTerms(collection, "shard1", cluster.getZkClient());
+    AtomicInteger count = new AtomicInteger(0);
+    // this will get called for almost 3 times
+    ZkShardTerms.CoreTermWatcher watcher = terms -> count.incrementAndGet() < 3;
+    replicaTerms.addListener(watcher);
+    replicaTerms.registerTerm("replica");
+    waitFor(1, count::get);
+    leaderTerms.ensureTermsIsHigher("leader", Collections.singleton("replica"));
+    waitFor(2, count::get);
+    replicaTerms.setEqualsToMax("replica");
+    waitFor(3, count::get);
+    assertEquals(0, replicaTerms.getNumListeners());
+
+    leaderTerms.close();
+    replicaTerms.close();
+  }
+
+  public void testEnsureTermsIsHigher() {
+    Map<String, Long> map = new HashMap<>();
+    map.put("leader", 0L);
+    ZkShardTerms.Terms terms = new ZkShardTerms.Terms(map, 0);
+    terms = terms.increaseTerms("leader", Collections.singleton("replica"));
+    assertEquals(1L, terms.getTerm("leader").longValue());
+  }
+
+  private <T> void waitFor(T expected, Supplier<T> supplier) throws InterruptedException {
+    TimeOut timeOut = new TimeOut(10, TimeUnit.SECONDS, new TimeSource.CurrentTimeSource());
+    while (!timeOut.hasTimedOut()) {
+      if (expected == supplier.get()) return;
+      Thread.sleep(100);
+    }
+    assertEquals(expected, supplier.get());
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/8c8d78a4/solr/core/src/test/org/apache/solr/update/TestInPlaceUpdatesDistrib.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/update/TestInPlaceUpdatesDistrib.java b/solr/core/src/test/org/apache/solr/update/TestInPlaceUpdatesDistrib.java
index 9a24c2a..14f0a7c 100644
--- a/solr/core/src/test/org/apache/solr/update/TestInPlaceUpdatesDistrib.java
+++ b/solr/core/src/test/org/apache/solr/update/TestInPlaceUpdatesDistrib.java
@@ -42,7 +42,7 @@ import org.apache.solr.client.solrj.request.schema.SchemaRequest.Field;
 import org.apache.solr.client.solrj.response.UpdateResponse;
 import org.apache.solr.client.solrj.response.schema.SchemaResponse.FieldResponse;
 import org.apache.solr.cloud.AbstractFullDistribZkTestBase;
-import org.apache.solr.cloud.ZkController;
+import org.apache.solr.cloud.ZkShardTerms;
 import org.apache.solr.common.SolrDocument;
 import org.apache.solr.common.SolrDocumentList;
 import org.apache.solr.common.SolrInputDocument;
@@ -908,23 +908,20 @@ public class TestInPlaceUpdatesDistrib extends AbstractFullDistribZkTestBase {
 
       commit();
 
-      // TODO: Could try checking ZK for LIR flags to ensure LIR has not kicked in
-      // Check every 10ms, 100 times, for a replica to go down (& assert that it doesn't)
-      ZkController zkController = shardToLeaderJetty.get(SHARD1).jetty.getCoreContainer().getZkController();
-      String lirPath = zkController.getLeaderInitiatedRecoveryZnodePath(DEFAULT_TEST_COLLECTION_NAME, SHARD1);
-      assertFalse (zkController.getZkClient().exists(lirPath, true));
-
-      for (int i=0; i<100; i++) {
-        Thread.sleep(10);
-        cloudClient.getZkStateReader().forceUpdateCollection(DEFAULT_COLLECTION);
-        ClusterState state = cloudClient.getZkStateReader().getClusterState();
-
-        int numActiveReplicas = 0;
-        for (Replica rep: state.getCollection(DEFAULT_COLLECTION).getSlice(SHARD1).getReplicas())
-          if (rep.getState().equals(Replica.State.ACTIVE))
-            numActiveReplicas++;
-
-        assertEquals("The replica receiving reordered updates must not have gone down", 3, numActiveReplicas);
+      try (ZkShardTerms zkShardTerms = new ZkShardTerms(DEFAULT_COLLECTION, SHARD1, cloudClient.getZkStateReader().getZkClient())) {
+        for (int i=0; i<100; i++) {
+          Thread.sleep(10);
+          cloudClient.getZkStateReader().forceUpdateCollection(DEFAULT_COLLECTION);
+          ClusterState state = cloudClient.getZkStateReader().getClusterState();
+
+          int numActiveReplicas = 0;
+          for (Replica rep: state.getCollection(DEFAULT_COLLECTION).getSlice(SHARD1).getReplicas()) {
+            assertTrue(zkShardTerms.canBecomeLeader(rep.getName()));
+            if (rep.getState().equals(Replica.State.ACTIVE))
+              numActiveReplicas++;
+          }
+          assertEquals("The replica receiving reordered updates must not have gone down", 3, numActiveReplicas);
+        }
       }
 
       for (SolrClient client: new SolrClient[] {LEADER, NONLEADERS.get(0), 

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/8c8d78a4/solr/test-framework/src/java/org/apache/solr/cloud/AbstractFullDistribZkTestBase.java
----------------------------------------------------------------------
diff --git a/solr/test-framework/src/java/org/apache/solr/cloud/AbstractFullDistribZkTestBase.java b/solr/test-framework/src/java/org/apache/solr/cloud/AbstractFullDistribZkTestBase.java
index b031393..be2b7db 100644
--- a/solr/test-framework/src/java/org/apache/solr/cloud/AbstractFullDistribZkTestBase.java
+++ b/solr/test-framework/src/java/org/apache/solr/cloud/AbstractFullDistribZkTestBase.java
@@ -1963,6 +1963,7 @@ public abstract class AbstractFullDistribZkTestBase extends AbstractDistribZkTes
     long waitMs = 0L;
     long maxWaitMs = maxWaitSecs * 1000L;
     Replica leader = null;
+    ZkShardTerms zkShardTerms = new ZkShardTerms(testCollectionName, shardId, cloudClient.getZkStateReader().getZkClient());
     while (waitMs < maxWaitMs && !allReplicasUp) {
       cs = cloudClient.getZkStateReader().getClusterState();
       assertNotNull(cs);
@@ -1981,7 +1982,8 @@ public abstract class AbstractFullDistribZkTestBase extends AbstractDistribZkTes
 
       // ensure all replicas are "active" and identify the non-leader replica
       for (Replica replica : replicas) {
-        if (replica.getState() != Replica.State.ACTIVE) {
+        if (!zkShardTerms.canBecomeLeader(replica.getName()) ||
+            replica.getState() != Replica.State.ACTIVE) {
           log.info("Replica {} is currently {}", replica.getName(), replica.getState());
           allReplicasUp = false;
         }
@@ -1998,6 +2000,7 @@ public abstract class AbstractFullDistribZkTestBase extends AbstractDistribZkTes
       }
     } // end while
 
+    zkShardTerms.close();
     if (!allReplicasUp)
       fail("Didn't see all replicas for shard "+shardId+" in "+testCollectionName+
           " come up within " + maxWaitMs + " ms! ClusterState: " + printClusterStateInfo());


[2/2] lucene-solr:branch_7x: SOLR-11702: Redesign current LIR implementation

Posted by da...@apache.org.
SOLR-11702: Redesign current LIR implementation


Project: http://git-wip-us.apache.org/repos/asf/lucene-solr/repo
Commit: http://git-wip-us.apache.org/repos/asf/lucene-solr/commit/8c8d78a4
Tree: http://git-wip-us.apache.org/repos/asf/lucene-solr/tree/8c8d78a4
Diff: http://git-wip-us.apache.org/repos/asf/lucene-solr/diff/8c8d78a4

Branch: refs/heads/branch_7x
Commit: 8c8d78a4bb6c0f3322471af5765a01848247409c
Parents: 16e80e6
Author: Cao Manh Dat <da...@apache.org>
Authored: Mon Jan 29 15:55:28 2018 +0700
Committer: Cao Manh Dat <da...@apache.org>
Committed: Mon Jan 29 15:57:45 2018 +0700

----------------------------------------------------------------------
 solr/CHANGES.txt                                |   6 +
 .../client/solrj/embedded/JettySolrRunner.java  |  16 +-
 .../org/apache/solr/cloud/ElectionContext.java  |  48 +-
 .../cloud/LeaderInitiatedRecoveryThread.java    |   1 +
 .../solr/cloud/RecoveringCoreTermWatcher.java   |  75 +++
 .../org/apache/solr/cloud/RecoveryStrategy.java |  75 ++-
 .../apache/solr/cloud/ZkCollectionTerms.java    |  65 +++
 .../org/apache/solr/cloud/ZkController.java     | 112 +++--
 .../org/apache/solr/cloud/ZkShardTerms.java     | 475 +++++++++++++++++++
 .../api/collections/CreateCollectionCmd.java    |  18 +-
 .../solr/handler/admin/CollectionsHandler.java  |  45 +-
 .../solr/handler/admin/PrepRecoveryOp.java      |   7 +
 .../solr/update/DefaultSolrCoreState.java       |  19 +-
 .../processor/DistributedUpdateProcessor.java   |  86 +++-
 .../org/apache/solr/cloud/ForceLeaderTest.java  | 220 +++++++--
 .../solr/cloud/HttpPartitionOnCommitTest.java   | 178 +++++++
 .../apache/solr/cloud/HttpPartitionTest.java    | 179 +++++--
 .../solr/cloud/LIRRollingUpdatesTest.java       | 457 ++++++++++++++++++
 .../LeaderInitiatedRecoveryOnCommitTest.java    | 178 -------
 ...aderInitiatedRecoveryOnShardRestartTest.java |  12 +-
 .../TestLeaderInitiatedRecoveryThread.java      |   1 +
 .../org/apache/solr/cloud/ZkShardTermsTest.java | 204 ++++++++
 .../solr/update/TestInPlaceUpdatesDistrib.java  |  33 +-
 .../cloud/AbstractFullDistribZkTestBase.java    |   5 +-
 24 files changed, 2112 insertions(+), 403 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/8c8d78a4/solr/CHANGES.txt
----------------------------------------------------------------------
diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt
index fd98ce6..672f881 100644
--- a/solr/CHANGES.txt
+++ b/solr/CHANGES.txt
@@ -51,6 +51,10 @@ Upgrade Notes
   Before 7.3, the copied over configset was named the same as the collection name, but 7.3 onwards it will be named
   with an additional ".AUTOCREATED" suffix.
 
+* SOLR-11702: The old LIR implementation (SOLR-5495) is now deprecated and replaced.
+  Solr will support rolling upgrades from old 7.x versions of Solr to the new one until
+  the last release of the 7.x major version.
+
 New Features
 ----------------------
 * SOLR-11285: Simulation framework for autoscaling. (ab)
@@ -93,6 +97,8 @@ New Features
 * SOLR-11617: Alias metadata is now mutable via a new MODIFYALIAS command.  Metadata is returned from LISTALIASES.
   (Gus Heck via David Smiley)
 
+* SOLR-11702: Redesign current LIR implementation (Cao Manh Dat, shalin)
+
 Bug Fixes
 ----------------------
 

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/8c8d78a4/solr/core/src/java/org/apache/solr/client/solrj/embedded/JettySolrRunner.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/client/solrj/embedded/JettySolrRunner.java b/solr/core/src/java/org/apache/solr/client/solrj/embedded/JettySolrRunner.java
index e5b81f8..23a8dc1 100644
--- a/solr/core/src/java/org/apache/solr/client/solrj/embedded/JettySolrRunner.java
+++ b/solr/core/src/java/org/apache/solr/client/solrj/embedded/JettySolrRunner.java
@@ -84,7 +84,7 @@ public class JettySolrRunner {
   FilterHolder debugFilter;
 
   private boolean waitOnSolr = false;
-  private int lastPort = -1;
+  private int jettyPort = -1;
 
   private final JettyConfig config;
   private final String solrHome;
@@ -280,8 +280,10 @@ public class JettySolrRunner {
       @Override
       public void lifeCycleStarted(LifeCycle arg0) {
 
-        lastPort = getFirstConnectorPort();
-        nodeProperties.setProperty("hostPort", Integer.toString(lastPort));
+        jettyPort = getFirstConnectorPort();
+        int port = jettyPort;
+        if (proxyPort != -1) port = proxyPort;
+        nodeProperties.setProperty("hostPort", Integer.toString(port));
         nodeProperties.setProperty("hostContext", config.context);
 
         root.getServletContext().setAttribute(SolrDispatchFilter.PROPERTIES_ATTRIBUTE, nodeProperties);
@@ -384,7 +386,7 @@ public class JettySolrRunner {
       // if started before, make a new server
       if (startedBefore) {
         waitOnSolr = false;
-        int port = reusePort ? lastPort : this.config.port;
+        int port = reusePort ? jettyPort : this.config.port;
         init(port);
       } else {
         startedBefore = true;
@@ -456,7 +458,7 @@ public class JettySolrRunner {
     if (0 == conns.length) {
       throw new RuntimeException("Jetty Server has no Connectors");
     }
-    return (proxyPort != -1) ? proxyPort : ((ServerConnector) conns[0]).getLocalPort();
+    return ((ServerConnector) conns[0]).getLocalPort();
   }
   
   /**
@@ -465,10 +467,10 @@ public class JettySolrRunner {
    * @exception RuntimeException if there is no Connector
    */
   public int getLocalPort() {
-    if (lastPort == -1) {
+    if (jettyPort == -1) {
       throw new IllegalStateException("You cannot get the port until this instance has started");
     }
-    return (proxyPort != -1) ? proxyPort : lastPort;
+    return (proxyPort != -1) ? proxyPort : jettyPort;
   }
   
   /**

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/8c8d78a4/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java b/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java
index 7169ea8..2d00151 100644
--- a/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java
+++ b/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java
@@ -20,6 +20,7 @@ import java.io.Closeable;
 import java.io.IOException;
 import java.lang.invoke.MethodHandles;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.EnumSet;
 import java.util.List;
 import java.util.concurrent.Future;
@@ -491,7 +492,7 @@ final class ShardLeaderElectionContext extends ShardLeaderElectionContextBase {
             rejoinLeaderElection(core);
           }
         }
-        
+
         if (isLeader) {
           // check for any replicas in my shard that were set to down by the previous leader
           try {
@@ -530,6 +531,7 @@ final class ShardLeaderElectionContext extends ShardLeaderElectionContextBase {
     return docCollection.getReplica(replicaName);
   }
 
+  @Deprecated
   public void checkLIR(String coreName, boolean allReplicasInLine)
       throws InterruptedException, KeeperException, IOException {
     if (allReplicasInLine) {
@@ -551,7 +553,7 @@ final class ShardLeaderElectionContext extends ShardLeaderElectionContextBase {
               leaderProps.getStr(ZkStateReader.CORE_NODE_NAME_PROP), Replica.State.ACTIVE, core.getCoreDescriptor(), true);
         }
       }
-      
+
     } else {
       try (SolrCore core = cc.getCore(coreName)) {
         if (core != null) {
@@ -567,7 +569,8 @@ final class ShardLeaderElectionContext extends ShardLeaderElectionContextBase {
       }
     }
   }
-  
+
+  @Deprecated
   private void startLeaderInitiatedRecoveryOnReplicas(String coreName) throws Exception {
     try (SolrCore core = cc.getCore(coreName)) {
       CloudDescriptor cloudDesc = core.getCoreDescriptor().getCloudDescriptor();
@@ -577,10 +580,10 @@ final class ShardLeaderElectionContext extends ShardLeaderElectionContextBase {
 
       if (coll == null || shardId == null) {
         log.error("Cannot start leader-initiated recovery on new leader (core="+
-           coreName+",coreNodeName=" + coreNodeName + ") because collection and/or shard is null!");
+            coreName+",coreNodeName=" + coreNodeName + ") because collection and/or shard is null!");
         return;
       }
-      
+
       String znodePath = zkController.getLeaderInitiatedRecoveryZnodePath(coll, shardId);
       List<String> replicas = null;
       try {
@@ -588,21 +591,28 @@ final class ShardLeaderElectionContext extends ShardLeaderElectionContextBase {
       } catch (NoNodeException nne) {
         // this can be ignored
       }
-      
+
       if (replicas != null && replicas.size() > 0) {
         for (String replicaCoreNodeName : replicas) {
-          
+
           if (coreNodeName.equals(replicaCoreNodeName))
             continue; // added safe-guard so we don't mark this core as down
-          
+
+          if (zkController.getShardTerms(collection, shardId).registered(replicaCoreNodeName)) {
+            // the replica registered its term so it is running with the new LIR implementation
+            // we can put this replica into recovery by increase our terms
+            zkController.getShardTerms(collection, shardId).ensureTermsIsHigher(coreNodeName, Collections.singleton(replicaCoreNodeName));
+            continue;
+          }
+
           final Replica.State lirState = zkController.getLeaderInitiatedRecoveryState(coll, shardId, replicaCoreNodeName);
           if (lirState == Replica.State.DOWN || lirState == Replica.State.RECOVERY_FAILED) {
             log.info("After core={} coreNodeName={} was elected leader, a replica coreNodeName={} was found in state: "
                 + lirState.toString() + " and needing recovery.", coreName, coreNodeName, replicaCoreNodeName);
-            List<ZkCoreNodeProps> replicaProps =  
+            List<ZkCoreNodeProps> replicaProps =
                 zkController.getZkStateReader().getReplicaProps(collection, shardId, coreNodeName);
-            
-            if (replicaProps != null && replicaProps.size() > 0) {                
+
+            if (replicaProps != null && replicaProps.size() > 0) {
               ZkCoreNodeProps coreNodeProps = null;
               for (ZkCoreNodeProps p : replicaProps) {
                 if (((Replica)p.getNodeProps()).getName().equals(replicaCoreNodeName)) {
@@ -610,17 +620,18 @@ final class ShardLeaderElectionContext extends ShardLeaderElectionContextBase {
                   break;
                 }
               }
-              
+
               zkController.ensureReplicaInLeaderInitiatedRecovery(cc,
                   collection, shardId, coreNodeProps, core.getCoreDescriptor(),
                   false /* forcePublishState */);
-            }              
+            }
           }
         }
       }
-    } // core gets closed automagically    
+    } // core gets closed automagically
   }
 
+
   // returns true if all replicas are found to be up, false if not
   private boolean waitForReplicasToComeUp(int timeoutms) throws InterruptedException {
     long timeoutAt = System.nanoTime() + TimeUnit.NANOSECONDS.convert(timeoutms, TimeUnit.MILLISECONDS);
@@ -743,7 +754,14 @@ final class ShardLeaderElectionContext extends ShardLeaderElectionContextBase {
       // to make sure others participate in sync and leader election, we can be leader
       return true;
     }
-    
+
+    String coreNodeName = core.getCoreDescriptor().getCloudDescriptor().getCoreNodeName();
+    if (zkController.getShardTerms(collection, shardId).registered(coreNodeName)
+        && !zkController.getShardTerms(collection, shardId).canBecomeLeader(coreNodeName)) {
+      log.info("Can't become leader, term of replica {} less than leader", coreNodeName);
+      return false;
+    }
+
     if (core.getCoreDescriptor().getCloudDescriptor().getLastPublished() == Replica.State.ACTIVE) {
       log.debug("My last published State was Active, it's okay to be the leader.");
       return true;

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/8c8d78a4/solr/core/src/java/org/apache/solr/cloud/LeaderInitiatedRecoveryThread.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/LeaderInitiatedRecoveryThread.java b/solr/core/src/java/org/apache/solr/cloud/LeaderInitiatedRecoveryThread.java
index 8c892ce..9c46236 100644
--- a/solr/core/src/java/org/apache/solr/cloud/LeaderInitiatedRecoveryThread.java
+++ b/solr/core/src/java/org/apache/solr/cloud/LeaderInitiatedRecoveryThread.java
@@ -45,6 +45,7 @@ import java.util.List;
  * replica; used by a shard leader to nag a replica into recovering after the
  * leader experiences an error trying to send an update request to the replica.
  */
+@Deprecated
 public class LeaderInitiatedRecoveryThread extends Thread {
 
   private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/8c8d78a4/solr/core/src/java/org/apache/solr/cloud/RecoveringCoreTermWatcher.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/RecoveringCoreTermWatcher.java b/solr/core/src/java/org/apache/solr/cloud/RecoveringCoreTermWatcher.java
new file mode 100644
index 0000000..26fec97
--- /dev/null
+++ b/solr/core/src/java/org/apache/solr/cloud/RecoveringCoreTermWatcher.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.cloud;
+
+import java.lang.invoke.MethodHandles;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.solr.core.SolrCore;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Start recovery of a core if its term is less than leader's term
+ */
+public class RecoveringCoreTermWatcher implements ZkShardTerms.CoreTermWatcher {
+  private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+  private final SolrCore solrCore;
+  // used to prevent the case when term of other replicas get changed, we redo recovery
+  // the idea here is with a specific term of a replica, we only do recovery one
+  private final AtomicLong lastTermDoRecovery;
+
+  RecoveringCoreTermWatcher(SolrCore solrCore) {
+    this.solrCore = solrCore;
+    this.lastTermDoRecovery = new AtomicLong(-1);
+  }
+
+  @Override
+  public boolean onTermChanged(ZkShardTerms.Terms terms) {
+    if (solrCore.isClosed()) {
+      return false;
+    }
+
+    if (solrCore.getCoreDescriptor() == null || solrCore.getCoreDescriptor().getCloudDescriptor() == null) return true;
+
+    String coreNodeName = solrCore.getCoreDescriptor().getCloudDescriptor().getCoreNodeName();
+    if (terms.canBecomeLeader(coreNodeName)) return true;
+    if (lastTermDoRecovery.get() < terms.getTerm(coreNodeName)) {
+      log.info("Start recovery on {} because core's term is less than leader's term", coreNodeName);
+      lastTermDoRecovery.set(terms.getTerm(coreNodeName));
+      solrCore.getUpdateHandler().getSolrCoreState().doRecovery(solrCore.getCoreContainer(), solrCore.getCoreDescriptor());
+    }
+
+    return true;
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) return true;
+    if (o == null || getClass() != o.getClass()) return false;
+
+    RecoveringCoreTermWatcher that = (RecoveringCoreTermWatcher) o;
+
+    return solrCore.equals(that.solrCore);
+  }
+
+  @Override
+  public int hashCode() {
+    return solrCore.hashCode();
+  }
+}

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/8c8d78a4/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java b/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java
index 3ab4eca..63dfe19 100644
--- a/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java
+++ b/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java
@@ -35,8 +35,10 @@ import org.apache.solr.client.solrj.impl.HttpSolrClient.HttpUriRequestResponse;
 import org.apache.solr.client.solrj.request.AbstractUpdateRequest;
 import org.apache.solr.client.solrj.request.CoreAdminRequest.WaitForState;
 import org.apache.solr.client.solrj.request.UpdateRequest;
+import org.apache.solr.client.solrj.response.SolrPingResponse;
 import org.apache.solr.common.SolrException;
 import org.apache.solr.common.SolrException.ErrorCode;
+import org.apache.solr.common.cloud.DocCollection;
 import org.apache.solr.common.cloud.Replica;
 import org.apache.solr.common.cloud.Slice;
 import org.apache.solr.common.cloud.ZkCoreNodeProps;
@@ -458,7 +460,7 @@ public class RecoveryStrategy implements Runnable, Closeable {
           core.getCoreDescriptor());
       return;
     }
-    
+
     // we temporary ignore peersync for tlog replicas
     boolean firstTime = replicaType != Replica.Type.TLOG;
 
@@ -516,21 +518,18 @@ public class RecoveryStrategy implements Runnable, Closeable {
       zkController.stopReplicationFromLeader(coreName);
     }
 
+    final String ourUrl = ZkCoreNodeProps.getCoreUrl(baseUrl, coreName);
     Future<RecoveryInfo> replayFuture = null;
     while (!successfulRecovery && !Thread.currentThread().isInterrupted() && !isClosed()) { // don't use interruption or it will close channels though
       try {
         CloudDescriptor cloudDesc = core.getCoreDescriptor().getCloudDescriptor();
-        ZkNodeProps leaderprops = zkStateReader.getLeaderRetry(
-            cloudDesc.getCollectionName(), cloudDesc.getShardId());
-      
-        final String leaderBaseUrl = leaderprops.getStr(ZkStateReader.BASE_URL_PROP);
-        final String leaderCoreName = leaderprops.getStr(ZkStateReader.CORE_NAME_PROP);
-
-        String leaderUrl = ZkCoreNodeProps.getCoreUrl(leaderBaseUrl, leaderCoreName);
-
-        String ourUrl = ZkCoreNodeProps.getCoreUrl(baseUrl, coreName);
+        final Replica leader = pingLeader(ourUrl, core.getCoreDescriptor(), true);
+        if (isClosed()) {
+          LOG.info("RecoveryStrategy has been closed");
+          break;
+        }
 
-        boolean isLeader = leaderUrl.equals(ourUrl);
+        boolean isLeader = leader.getCoreUrl().equals(ourUrl);
         if (isLeader && !cloudDesc.isLeader()) {
           throw new SolrException(ErrorCode.SERVER_ERROR, "Cloud state still says we are leader.");
         }
@@ -541,12 +540,12 @@ public class RecoveryStrategy implements Runnable, Closeable {
           zkController.publish(core.getCoreDescriptor(), Replica.State.ACTIVE);
           return;
         }
-        
+
         LOG.info("Begin buffering updates. core=[{}]", coreName);
         ulog.bufferUpdates();
         replayed = false;
         
-        LOG.info("Publishing state of core [{}] as recovering, leader is [{}] and I am [{}]", core.getName(), leaderUrl,
+        LOG.info("Publishing state of core [{}] as recovering, leader is [{}] and I am [{}]", core.getName(), leader.getCoreUrl(),
             ourUrl);
         zkController.publish(core.getCoreDescriptor(), Replica.State.RECOVERING);
         
@@ -565,7 +564,7 @@ public class RecoveryStrategy implements Runnable, Closeable {
           break;
         }
 
-        sendPrepRecoveryCmd(leaderBaseUrl, leaderCoreName, slice);
+        sendPrepRecoveryCmd(leader.getBaseUrl(), leader.getCoreName(), slice);
         
         if (isClosed()) {
           LOG.info("RecoveryStrategy has been closed");
@@ -585,11 +584,11 @@ public class RecoveryStrategy implements Runnable, Closeable {
         // first thing we just try to sync
         if (firstTime) {
           firstTime = false; // only try sync the first time through the loop
-          LOG.info("Attempting to PeerSync from [{}] - recoveringAfterStartup=[{}]", leaderUrl, recoveringAfterStartup);
+          LOG.info("Attempting to PeerSync from [{}] - recoveringAfterStartup=[{}]", leader.getCoreUrl(), recoveringAfterStartup);
           // System.out.println("Attempting to PeerSync from " + leaderUrl
           // + " i am:" + zkController.getNodeName());
           PeerSync peerSync = new PeerSync(core,
-              Collections.singletonList(leaderUrl), ulog.getNumRecordsToKeep(), false, false);
+              Collections.singletonList(leader.getCoreUrl()), ulog.getNumRecordsToKeep(), false, false);
           peerSync.setStartingVersions(recentVersions);
           boolean syncSuccess = peerSync.sync().isSuccess();
           if (syncSuccess) {
@@ -623,7 +622,7 @@ public class RecoveryStrategy implements Runnable, Closeable {
 
         try {
 
-          replicate(zkController.getNodeName(), core, leaderprops);
+          replicate(zkController.getNodeName(), core, leader);
 
           if (isClosed()) {
             LOG.info("RecoveryStrategy has been closed");
@@ -745,6 +744,48 @@ public class RecoveryStrategy implements Runnable, Closeable {
     LOG.info("Finished recovery process, successful=[{}]", Boolean.toString(successfulRecovery));
   }
 
+  private final Replica pingLeader(String ourUrl, CoreDescriptor coreDesc, boolean mayPutReplicaAsDown) throws Exception {
+    int numTried = 0;
+    while (true) {
+      CloudDescriptor cloudDesc = coreDesc.getCloudDescriptor();
+      DocCollection docCollection = zkStateReader.getClusterState().getCollection(cloudDesc.getCollectionName());
+      if (mayPutReplicaAsDown && numTried == 1 &&
+          docCollection.getReplica(coreDesc.getCloudDescriptor().getCoreNodeName()).getState() == Replica.State.ACTIVE) {
+        // this operation may take a long time, by putting replica into DOWN state, client won't query this replica
+        zkController.publish(coreDesc, Replica.State.DOWN);
+      }
+      numTried++;
+      final Replica leaderReplica = zkStateReader.getLeaderRetry(
+          cloudDesc.getCollectionName(), cloudDesc.getShardId());
+
+      if (isClosed()) {
+        return leaderReplica;
+      }
+
+      if (leaderReplica.getCoreUrl().equals(ourUrl)) {
+        return leaderReplica;
+      }
+
+      try (HttpSolrClient httpSolrClient = new HttpSolrClient.Builder(leaderReplica.getCoreUrl())
+          .withSocketTimeout(1000)
+          .withConnectionTimeout(1000)
+          .build()) {
+        SolrPingResponse resp = httpSolrClient.ping();
+        return leaderReplica;
+      } catch (IOException e) {
+        LOG.info("Failed to connect leader {} on recovery, try again", leaderReplica.getBaseUrl(), e);
+        Thread.sleep(500);
+      } catch (Exception e) {
+        if (e.getCause() instanceof IOException) {
+          LOG.info("Failed to connect leader {} on recovery, try again", leaderReplica.getBaseUrl(), e);
+          Thread.sleep(500);
+        } else {
+          return leaderReplica;
+        }
+      }
+    }
+  }
+
   public static Runnable testing_beforeReplayBufferingUpdates;
 
   final private Future<RecoveryInfo> replay(SolrCore core)

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/8c8d78a4/solr/core/src/java/org/apache/solr/cloud/ZkCollectionTerms.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/ZkCollectionTerms.java b/solr/core/src/java/org/apache/solr/cloud/ZkCollectionTerms.java
new file mode 100644
index 0000000..b232f9b
--- /dev/null
+++ b/solr/core/src/java/org/apache/solr/cloud/ZkCollectionTerms.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.cloud;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.solr.common.cloud.SolrZkClient;
+import org.apache.solr.common.util.ObjectReleaseTracker;
+import org.apache.solr.core.CoreDescriptor;
+
+/**
+ * Used to manage all ZkShardTerms of a collection
+ */
+class ZkCollectionTerms implements AutoCloseable {
+  private final String collection;
+  private final Map<String, ZkShardTerms> terms;
+  private final SolrZkClient zkClient;
+
+  ZkCollectionTerms(String collection, SolrZkClient client) {
+    this.collection = collection;
+    this.terms = new HashMap<>();
+    this.zkClient = client;
+    ObjectReleaseTracker.track(this);
+  }
+
+
+  public ZkShardTerms getShard(String shardId) {
+    synchronized (terms) {
+      if (!terms.containsKey(shardId)) terms.put(shardId, new ZkShardTerms(collection, shardId, zkClient));
+      return terms.get(shardId);
+    }
+  }
+
+  public void remove(String shardId, CoreDescriptor coreDescriptor) {
+    synchronized (terms) {
+      if (getShard(shardId).removeTerm(coreDescriptor)) {
+        terms.remove(shardId).close();
+      }
+    }
+  }
+
+  public void close() {
+    synchronized (terms) {
+      terms.values().forEach(ZkShardTerms::close);
+    }
+    ObjectReleaseTracker.release(this);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/8c8d78a4/solr/core/src/java/org/apache/solr/cloud/ZkController.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/ZkController.java b/solr/core/src/java/org/apache/solr/cloud/ZkController.java
index 365da65..7898e96 100644
--- a/solr/core/src/java/org/apache/solr/cloud/ZkController.java
+++ b/solr/core/src/java/org/apache/solr/cloud/ZkController.java
@@ -193,7 +193,6 @@ public class ZkController {
   private final Map<ContextKey, ElectionContext> electionContexts = Collections.synchronizedMap(new HashMap<>());
 
   private final SolrZkClient zkClient;
-  private final ZkCmdExecutor cmdExecutor;
   public final ZkStateReader zkStateReader;
   private SolrCloudManager cloudManager;
   private CloudSolrClient cloudSolrClient;
@@ -210,6 +209,7 @@ public class ZkController {
   private LeaderElector overseerElector;
 
   private Map<String, ReplicateFromLeader> replicateFromLeaders = new ConcurrentHashMap<>();
+  private final Map<String, ZkCollectionTerms> collectionToTerms = new HashMap<>();
 
   // for now, this can be null in tests, in which case recovery will be inactive, and other features
   // may accept defaults or use mocks rather than pulling things from a CoreContainer
@@ -226,6 +226,7 @@ public class ZkController {
 
   private volatile boolean isClosed;
 
+  @Deprecated
   // keeps track of replicas that have been asked to recover by leaders running on this node
   private final Map<String, String> replicasInLeaderInitiatedRecovery = new HashMap<String, String>();
 
@@ -323,7 +324,7 @@ public class ZkController {
           @Override
           public void command() {
             log.info("ZooKeeper session re-connected ... refreshing core states after session expiration.");
-
+            clearZkCollectionTerms();
             try {
               zkStateReader.createClusterStateWatchersAndUpdate();
 
@@ -435,7 +436,6 @@ public class ZkController {
     this.overseerRunningMap = Overseer.getRunningMap(zkClient);
     this.overseerCompletedMap = Overseer.getCompletedMap(zkClient);
     this.overseerFailureMap = Overseer.getFailureMap(zkClient);
-    cmdExecutor = new ZkCmdExecutor(clientTimeout);
     zkStateReader = new ZkStateReader(zkClient, () -> {
       if (cc != null) cc.securityNodeChanged();
     });
@@ -547,6 +547,9 @@ public class ZkController {
    */
   public void close() {
     this.isClosed = true;
+    synchronized (collectionToTerms) {
+      collectionToTerms.values().forEach(ZkCollectionTerms::close);
+    }
     try {
       for (ElectionContext context : electionContexts.values()) {
         try {
@@ -1034,7 +1037,14 @@ public class ZkController {
       
       final String coreZkNodeName = desc.getCloudDescriptor().getCoreNodeName();
       assert coreZkNodeName != null : "we should have a coreNodeName by now";
-      
+
+      ZkShardTerms shardTerms = getShardTerms(collection, cloudDesc.getShardId());
+
+      // This flag is used for testing rolling updates and should be removed in SOLR-11812
+      boolean isRunningInNewLIR = "new".equals(desc.getCoreProperty("lirVersion", "new"));
+      if (isRunningInNewLIR) {
+        shardTerms.registerTerm(coreZkNodeName);
+      }
       String shardId = cloudDesc.getShardId();
       Map<String,Object> props = new HashMap<>();
       // we only put a subset of props into the leader node
@@ -1118,15 +1128,17 @@ public class ZkController {
           }
         }
         boolean didRecovery
-            = checkRecovery(recoverReloadedCores, isLeader, skipRecovery, collection, coreZkNodeName, core, cc, afterExpiration);
+            = checkRecovery(recoverReloadedCores, isLeader, skipRecovery, collection, coreZkNodeName, shardId, core, cc, afterExpiration);
         if (!didRecovery) {
           if (isTlogReplicaAndNotLeader) {
             startReplicationFromLeader(coreName, true);
           }
           publish(desc, Replica.State.ACTIVE);
         }
-        
-        
+
+        if (isRunningInNewLIR && replicaType != Type.PULL) {
+          shardTerms.addListener(new RecoveringCoreTermWatcher(core));
+        }
         core.getCoreDescriptor().getCloudDescriptor().setHasRegistered(true);
       }
       
@@ -1295,7 +1307,7 @@ public class ZkController {
    * Returns whether or not a recovery was started
    */
   private boolean checkRecovery(boolean recoverReloadedCores, final boolean isLeader, boolean skipRecovery,
-                                final String collection, String shardId,
+                                final String collection, String coreZkNodeName, String shardId,
                                 SolrCore core, CoreContainer cc, boolean afterExpiration) {
     if (SKIP_AUTO_RECOVERY) {
       log.warn("Skipping recovery according to sys prop solrcloud.skip.autorecovery");
@@ -1322,6 +1334,13 @@ public class ZkController {
         core.getUpdateHandler().getSolrCoreState().doRecovery(cc, core.getCoreDescriptor());
         return true;
       }
+
+      ZkShardTerms zkShardTerms = getShardTerms(collection, shardId);
+      if (zkShardTerms.registered(coreZkNodeName) && !zkShardTerms.canBecomeLeader(coreZkNodeName)) {
+        log.info("Leader's term larger than core " + core.getName() + "; starting recovery process");
+        core.getUpdateHandler().getSolrCoreState().doRecovery(cc, core.getCoreDescriptor());
+        return true;
+      }
     } else {
       log.info("I am the leader, no recovery necessary");
     }
@@ -1372,6 +1391,7 @@ public class ZkController {
       String shardId = cd.getCloudDescriptor().getShardId();
       
       String coreNodeName = cd.getCloudDescriptor().getCoreNodeName();
+
       // If the leader initiated recovery, then verify that this replica has performed
       // recovery as requested before becoming active; don't even look at lirState if going down
       if (state != Replica.State.DOWN) {
@@ -1394,7 +1414,7 @@ public class ZkController {
           }
         }
       }
-      
+
       Map<String,Object> props = new HashMap<>();
       props.put(Overseer.QUEUE_OPERATION, "state");
       props.put(ZkStateReader.STATE_PROP, state.toString());
@@ -1430,6 +1450,11 @@ public class ZkController {
         log.info("The core '{}' had failed to initialize before.", cd.getName());
       }
 
+      // This flag is used for testing rolling updates and should be removed in SOLR-11812
+      boolean isRunningInNewLIR = "new".equals(cd.getCoreProperty("lirVersion", "new"));
+      if (state == Replica.State.RECOVERING && isRunningInNewLIR) {
+        getShardTerms(collection, shardId).setEqualsToMax(coreNodeName);
+      }
       ZkNodeProps m = new ZkNodeProps(props);
       
       if (updateLastState) {
@@ -1441,23 +1466,28 @@ public class ZkController {
     }
   }
 
-  private boolean needsToBeAssignedShardId(final CoreDescriptor desc,
-                                           final ClusterState state, final String coreNodeName) {
-
-    final CloudDescriptor cloudDesc = desc.getCloudDescriptor();
+  public ZkShardTerms getShardTerms(String collection, String shardId) {
+    return getCollectionTerms(collection).getShard(shardId);
+  }
 
-    final String shardId = state.getShardId(getNodeName(), desc.getName());
+  private ZkCollectionTerms getCollectionTerms(String collection) {
+    synchronized (collectionToTerms) {
+      if (!collectionToTerms.containsKey(collection)) collectionToTerms.put(collection, new ZkCollectionTerms(collection, zkClient));
+      return collectionToTerms.get(collection);
+    }
+  }
 
-    if (shardId != null) {
-      cloudDesc.setShardId(shardId);
-      return false;
+  public void clearZkCollectionTerms() {
+    synchronized (collectionToTerms) {
+      collectionToTerms.values().forEach(ZkCollectionTerms::close);
+      collectionToTerms.clear();
     }
-    return true;
   }
 
   public void unregister(String coreName, CoreDescriptor cd) throws Exception {
     final String coreNodeName = cd.getCloudDescriptor().getCoreNodeName();
     final String collection = cd.getCloudDescriptor().getCollectionName();
+    getCollectionTerms(collection).remove(cd.getCloudDescriptor().getShardId(), cd);
 
     if (Strings.isNullOrEmpty(collection)) {
       log.error("No collection was specified.");
@@ -1733,7 +1763,7 @@ public class ZkController {
     boolean isLeader = leaderProps.getCoreUrl().equals(ourUrl);
     if (!isLeader && !SKIP_AUTO_RECOVERY) {
 
-      // detect if this core is in leader-initiated recovery and if so, 
+      // detect if this core is in leader-initiated recovery and if so,
       // then we don't need the leader to wait on seeing the down state
       Replica.State lirState = null;
       try {
@@ -1743,9 +1773,9 @@ public class ZkController {
             " is in leader-initiated recovery due to: " + exc, exc);
       }
 
-      if (lirState != null) {
-        log.debug("Replica " + myCoreNodeName +
-            " is already in leader-initiated recovery, so not waiting for leader to see down state.");
+      if (lirState != null || !getShardTerms(collection, shard).canBecomeLeader(myCoreNodeName)) {
+        log.debug("Term of replica " + myCoreNodeName +
+            " is already less than leader, so not waiting for leader to see down state.");
       } else {
 
         log.info("Replica " + myCoreNodeName +
@@ -2055,6 +2085,7 @@ public class ZkController {
    * false means the node is not live either, so no point in trying to send recovery commands
    * to it.
    */
+  @Deprecated
   public boolean ensureReplicaInLeaderInitiatedRecovery(
       final CoreContainer container,
       final String collection, final String shardId, final ZkCoreNodeProps replicaCoreProps,
@@ -2117,13 +2148,14 @@ public class ZkController {
                 " is not live, so skipping leader-initiated recovery for replica: core={} coreNodeName={}",
             replicaCoreProps.getCoreName(), replicaCoreNodeName);
         // publishDownState will be false to avoid publishing the "down" state too many times
-        // as many errors can occur together and will each call into this method (SOLR-6189)        
+        // as many errors can occur together and will each call into this method (SOLR-6189)
       }
     }
 
     return nodeIsLive;
   }
 
+  @Deprecated
   public boolean isReplicaInRecoveryHandling(String replicaUrl) {
     boolean exists = false;
     synchronized (replicasInLeaderInitiatedRecovery) {
@@ -2132,12 +2164,14 @@ public class ZkController {
     return exists;
   }
 
+  @Deprecated
   public void removeReplicaFromLeaderInitiatedRecoveryHandling(String replicaUrl) {
     synchronized (replicasInLeaderInitiatedRecovery) {
       replicasInLeaderInitiatedRecovery.remove(replicaUrl);
     }
   }
 
+  @Deprecated
   public Replica.State getLeaderInitiatedRecoveryState(String collection, String shardId, String coreNodeName) {
     final Map<String, Object> stateObj = getLeaderInitiatedRecoveryStateObject(collection, shardId, coreNodeName);
     if (stateObj == null) {
@@ -2147,6 +2181,7 @@ public class ZkController {
     return stateStr == null ? null : Replica.State.getState(stateStr);
   }
 
+  @Deprecated
   public Map<String, Object> getLeaderInitiatedRecoveryStateObject(String collection, String shardId, String coreNodeName) {
 
     if (collection == null || shardId == null || coreNodeName == null)
@@ -2191,6 +2226,7 @@ public class ZkController {
     return stateObj;
   }
 
+  @Deprecated
   public void updateLeaderInitiatedRecoveryState(String collection, String shardId, String coreNodeName,
       Replica.State state, CoreDescriptor leaderCd, boolean retryOnConnLoss) {
     if (collection == null || shardId == null || coreNodeName == null) {
@@ -2199,12 +2235,12 @@ public class ZkController {
           + "; shardId=" + shardId + "; coreNodeName=" + coreNodeName);
       return; // if we don't have complete data about a core in cloud mode, do nothing
     }
-    
+
     assert leaderCd != null;
     assert leaderCd.getCloudDescriptor() != null;
 
     String leaderCoreNodeName = leaderCd.getCloudDescriptor().getCoreNodeName();
-    
+
     String znodePath = getLeaderInitiatedRecoveryZnodePath(collection, shardId, coreNodeName);
 
     if (state == Replica.State.ACTIVE) {
@@ -2269,29 +2305,29 @@ public class ZkController {
   private void markShardAsDownIfLeader(String collection, String shardId, CoreDescriptor leaderCd,
                                        String znodePath, byte[] znodeData,
                                        boolean retryOnConnLoss) throws KeeperException, InterruptedException {
-    
+
 
     if (!leaderCd.getCloudDescriptor().isLeader()) {
       log.info("No longer leader, aborting attempt to mark shard down as part of LIR");
       throw new NotLeaderException(ErrorCode.SERVER_ERROR, "Locally, we do not think we are the leader.");
     }
-    
+
     ContextKey key = new ContextKey(collection, leaderCd.getCloudDescriptor().getCoreNodeName());
     ElectionContext context = electionContexts.get(key);
-    
+
     // we make sure we locally think we are the leader before and after getting the context - then
     // we only try zk if we still think we are the leader and have our leader context
     if (context == null || !leaderCd.getCloudDescriptor().isLeader()) {
       log.info("No longer leader, aborting attempt to mark shard down as part of LIR");
       throw new NotLeaderException(ErrorCode.SERVER_ERROR, "Locally, we do not think we are the leader.");
     }
-    
+
     // we think we are the leader - get the expected shard leader version
     // we use this version and multi to ensure *only* the current zk registered leader
     // for a shard can put a replica into LIR
-    
+
     Integer leaderZkNodeParentVersion = ((ShardLeaderElectionContextBase)context).getLeaderZkNodeParentVersion();
-    
+
     // TODO: should we do this optimistically to avoid races?
     if (zkClient.exists(znodePath, retryOnConnLoss)) {
       List<Op> ops = new ArrayList<>(2);
@@ -2306,7 +2342,7 @@ public class ZkController {
       } catch (KeeperException.NodeExistsException nee) {
         // if it exists, that's great!
       }
-      
+
       // we only create the entry if the context we are using is registered as the current leader in ZK
       List<Op> ops = new ArrayList<>(2);
       ops.add(Op.check(new org.apache.hadoop.fs.Path(((ShardLeaderElectionContextBase)context).leaderPath).getParent().toString(), leaderZkNodeParentVersion));
@@ -2316,11 +2352,13 @@ public class ZkController {
     }
   }
 
-  public String getLeaderInitiatedRecoveryZnodePath(String collection, String shardId) {
+  @Deprecated
+  public static String getLeaderInitiatedRecoveryZnodePath(String collection, String shardId) {
     return "/collections/" + collection + "/leader_initiated_recovery/" + shardId;
   }
 
-  public String getLeaderInitiatedRecoveryZnodePath(String collection, String shardId, String coreNodeName) {
+  @Deprecated
+  public static String getLeaderInitiatedRecoveryZnodePath(String collection, String shardId, String coreNodeName) {
     return getLeaderInitiatedRecoveryZnodePath(collection, shardId) + "/" + coreNodeName;
   }
 
@@ -2608,12 +2646,6 @@ public class ZkController {
     };
   }
 
-  public String getLeaderSeqPath(String collection, String coreNodeName) {
-    ContextKey key = new ContextKey(collection, coreNodeName);
-    ElectionContext context = electionContexts.get(key);
-    return context != null ? context.leaderSeqPath : null;
-  }
-
   /**
    * Thrown during leader initiated recovery process if current node is not leader
    */

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/8c8d78a4/solr/core/src/java/org/apache/solr/cloud/ZkShardTerms.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/ZkShardTerms.java b/solr/core/src/java/org/apache/solr/cloud/ZkShardTerms.java
new file mode 100644
index 0000000..7dc0d57
--- /dev/null
+++ b/solr/core/src/java/org/apache/solr/cloud/ZkShardTerms.java
@@ -0,0 +1,475 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.cloud;
+
+import java.lang.invoke.MethodHandles;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.solr.common.SolrException;
+import org.apache.solr.common.cloud.SolrZkClient;
+import org.apache.solr.common.cloud.ZkStateReader;
+import org.apache.solr.common.util.ObjectReleaseTracker;
+import org.apache.solr.common.util.Utils;
+import org.apache.solr.core.CoreDescriptor;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.data.Stat;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Class used for interact with a ZK term node.
+ * Each ZK term node relates to a shard of a collection and have this format (in json)
+ * <p>
+ * <code>
+ * {
+ *   "replicaNodeName1" : 1,
+ *   "replicaNodeName2" : 2,
+ *   ..
+ * }
+ * </code>
+ * <p>
+ * The values correspond to replicas are called terms.
+ * Only replicas with highest term value are considered up to date and be able to become leader and serve queries.
+ * <p>
+ * Terms can only updated in two strict ways:
+ * <ul>
+ * <li>A replica sets its term equals to leader's term
+ * <li>The leader increase its term and some other replicas by 1
+ * </ul>
+ * This class should not be reused after {@link org.apache.zookeeper.Watcher.Event.KeeperState#Expired} event
+ */
+public class ZkShardTerms implements AutoCloseable{
+
+  private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+  private final Object writingLock = new Object();
+  private final String collection;
+  private final String shard;
+  private final String znodePath;
+  private final SolrZkClient zkClient;
+  private final Set<CoreTermWatcher> listeners = new HashSet<>();
+  private final AtomicBoolean isClosed = new AtomicBoolean(false);
+
+  private Terms terms;
+
+  // Listener of a core for shard's term change events
+  interface CoreTermWatcher {
+    // return true if the listener wanna to be triggered in the next time
+    boolean onTermChanged(Terms terms);
+  }
+
+  public ZkShardTerms(String collection, String shard, SolrZkClient zkClient) {
+    this.znodePath = ZkStateReader.COLLECTIONS_ZKNODE + "/" + collection + "/terms/" + shard;
+    this.collection = collection;
+    this.shard = shard;
+    this.zkClient = zkClient;
+    ensureTermNodeExist();
+    refreshTerms();
+    retryRegisterWatcher();
+    ObjectReleaseTracker.track(this);
+  }
+
+  /**
+   * Ensure that leader's term is higher than some replica's terms
+   * @param leader coreNodeName of leader
+   * @param replicasNeedingRecovery set of replicas in which their terms should be lower than leader's term
+   */
+  public void ensureTermsIsHigher(String leader, Set<String> replicasNeedingRecovery) {
+    Terms newTerms;
+    while( (newTerms = terms.increaseTerms(leader, replicasNeedingRecovery)) != null) {
+      if (forceSaveTerms(newTerms)) return;
+    }
+  }
+
+  /**
+   * Can this replica become leader or is this replica's term equals to leader's term?
+   * @param coreNodeName of the replica
+   * @return true if this replica can become leader, false if otherwise
+   */
+  public boolean canBecomeLeader(String coreNodeName) {
+    return terms.canBecomeLeader(coreNodeName);
+  }
+
+  /**
+   * Did this replica registered its term? This is a sign to check f
+   * @param coreNodeName of the replica
+   * @return true if this replica registered its term, false if otherwise
+   */
+  public boolean registered(String coreNodeName) {
+    return terms.getTerm(coreNodeName) != null;
+  }
+
+  public void close() {
+    // no watcher will be registered
+    isClosed.set(true);
+    synchronized (listeners) {
+      listeners.clear();
+    }
+    ObjectReleaseTracker.release(this);
+  }
+
+  // package private for testing, only used by tests
+  Map<String, Long> getTerms() {
+    synchronized (writingLock) {
+      return new HashMap<>(terms.values);
+    }
+  }
+
+  /**
+   * Add a listener so the next time the shard's term get updated, listeners will be called
+   */
+  void addListener(CoreTermWatcher listener) {
+    synchronized (listeners) {
+      listeners.add(listener);
+    }
+  }
+
+  /**
+   * Remove the coreNodeName from terms map and also remove any expired listeners
+   * @return Return true if this object should not be reused
+   */
+  boolean removeTerm(CoreDescriptor cd) {
+    int numListeners;
+    synchronized (listeners) {
+      // solrcore already closed
+      listeners.removeIf(coreTermWatcher -> !coreTermWatcher.onTermChanged(terms));
+      numListeners = listeners.size();
+    }
+    Terms newTerms;
+    while ( (newTerms = terms.removeTerm(cd.getCloudDescriptor().getCoreNodeName())) != null) {
+      try {
+        if (saveTerms(newTerms)) return numListeners == 0;
+      } catch (KeeperException.NoNodeException e) {
+        return true;
+      }
+    }
+    return true;
+  }
+
+  /**
+   * Register a replica's term (term value will be 0).
+   * If a term is already associate with this replica do nothing
+   * @param coreNodeName of the replica
+   */
+  void registerTerm(String coreNodeName) {
+    Terms newTerms;
+    while ( (newTerms = terms.registerTerm(coreNodeName)) != null) {
+      if (forceSaveTerms(newTerms)) break;
+    }
+  }
+
+  /**
+   * Set a replica's term equals to leader's term
+   * @param coreNodeName of the replica
+   */
+  public void setEqualsToMax(String coreNodeName) {
+    Terms newTerms;
+    while ( (newTerms = terms.setEqualsToMax(coreNodeName)) != null) {
+      if (forceSaveTerms(newTerms)) break;
+    }
+  }
+
+  public long getTerm(String coreNodeName) {
+    Long term = terms.getTerm(coreNodeName);
+    return term == null? -1 : term;
+  }
+
+  // package private for testing, only used by tests
+  int getNumListeners() {
+    synchronized (listeners) {
+      return listeners.size();
+    }
+  }
+
+  /**
+   * Set new terms to ZK.
+   * In case of correspond ZK term node is not created, create it
+   * @param newTerms to be set
+   * @return true if terms is saved successfully to ZK, false if otherwise
+   */
+  private boolean forceSaveTerms(Terms newTerms) {
+    try {
+      return saveTerms(newTerms);
+    } catch (KeeperException.NoNodeException e) {
+      ensureTermNodeExist();
+      return false;
+    }
+  }
+
+  /**
+   * Set new terms to ZK, the version of new terms must match the current ZK term node
+   * @param newTerms to be set
+   * @return true if terms is saved successfully to ZK, false if otherwise
+   * @throws KeeperException.NoNodeException correspond ZK term node is not created
+   */
+  private boolean saveTerms(Terms newTerms) throws KeeperException.NoNodeException {
+    byte[] znodeData = Utils.toJSON(newTerms.values);
+    try {
+      Stat stat = zkClient.setData(znodePath, znodeData, newTerms.version, true);
+      setNewTerms(new Terms(newTerms.values, stat.getVersion()));
+      return true;
+    } catch (KeeperException.BadVersionException e) {
+      log.info("Failed to save terms, version is not match, retrying");
+      refreshTerms();
+    } catch (KeeperException.NoNodeException e) {
+      throw e;
+    } catch (Exception e) {
+      throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Error save shard term for collection:" + collection, e);
+    }
+    return false;
+  }
+
+  /**
+   * Create correspond ZK term node
+   */
+  private void ensureTermNodeExist() {
+    String path = "/collections/"+collection+ "/terms";
+    try {
+      if (!zkClient.exists(path, true)) {
+        try {
+          zkClient.makePath(path, true);
+        } catch (KeeperException.NodeExistsException e) {
+          // it's okay if another beats us creating the node
+        }
+      }
+      path += "/"+shard;
+      if (!zkClient.exists(path, true)) {
+        try {
+          Map<String, Long> initialTerms = new HashMap<>();
+          zkClient.create(path, Utils.toJSON(initialTerms), CreateMode.PERSISTENT, true);
+        } catch (KeeperException.NodeExistsException e) {
+          // it's okay if another beats us creating the node
+        }
+      }
+    }  catch (InterruptedException e) {
+      Thread.interrupted();
+      throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Error creating shard term node in Zookeeper for collection:" + collection, e);
+    } catch (KeeperException e) {
+      throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Error creating shard term node in Zookeeper for collection:" + collection, e);
+    }
+  }
+
+  /**
+   * Fetch latest terms from ZK
+   */
+  public void refreshTerms() {
+    Terms newTerms;
+    try {
+      Stat stat = new Stat();
+      byte[] data = zkClient.getData(znodePath, null, stat, true);
+      newTerms = new Terms((Map<String, Long>) Utils.fromJSON(data), stat.getVersion());
+    } catch (KeeperException e) {
+      Thread.interrupted();
+      throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Error updating shard term for collection:" + collection, e);
+    } catch (InterruptedException e) {
+      throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Error updating shard term for collection:" + collection, e);
+    }
+
+    setNewTerms(newTerms);
+  }
+
+  /**
+   * Retry register a watcher to the correspond ZK term node
+   */
+  private void retryRegisterWatcher() {
+    while (!isClosed.get()) {
+      try {
+        registerWatcher();
+        return;
+      } catch (KeeperException.SessionExpiredException | KeeperException.AuthFailedException e) {
+        isClosed.set(true);
+        log.error("Failed watching shard term for collection: {} due to unrecoverable exception", collection, e);
+        return;
+      } catch (KeeperException e) {
+        log.warn("Failed watching shard term for collection:{}, retrying!", collection, e);
+        try {
+          zkClient.getConnectionManager().waitForConnected(zkClient.getZkClientTimeout());
+        } catch (TimeoutException te) {
+          if (Thread.interrupted()) {
+            throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Error watching shard term for collection:" + collection, te);
+          }
+        }
+      }
+    }
+  }
+
+  /**
+   * Register a watcher to the correspond ZK term node
+   */
+  private void registerWatcher() throws KeeperException {
+    Watcher watcher = event -> {
+      // session events are not change events, and do not remove the watcher
+      if (Watcher.Event.EventType.None == event.getType()) {
+        return;
+      }
+      retryRegisterWatcher();
+      // Some events may be missed during register a watcher, so it is safer to refresh terms after registering watcher
+      refreshTerms();
+    };
+    try {
+      // exists operation is faster than getData operation
+      zkClient.exists(znodePath, watcher, true);
+    } catch (InterruptedException e) {
+      Thread.interrupted();
+      throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Error watching shard term for collection:" + collection, e);
+    }
+  }
+
+
+  /**
+   * Atomically update {@link ZkShardTerms#terms} and call listeners
+   * @param newTerms to be set
+   */
+  private void setNewTerms(Terms newTerms) {
+    boolean isChanged = false;
+    synchronized (writingLock) {
+      if (terms == null || newTerms.version > terms.version) {
+        terms = newTerms;
+        isChanged = true;
+      }
+    }
+    if (isChanged) onTermUpdates(newTerms);
+  }
+
+  private void onTermUpdates(Terms newTerms) {
+    synchronized (listeners) {
+      listeners.removeIf(coreTermWatcher -> !coreTermWatcher.onTermChanged(newTerms));
+    }
+  }
+
+  /**
+   * Hold values of terms, this class is immutable
+   */
+  static class Terms {
+    private final Map<String, Long> values;
+    // ZK node version
+    private final int version;
+
+    public Terms () {
+      this(new HashMap<>(), 0);
+    }
+
+    public Terms(Map<String, Long> values, int version) {
+      this.values = values;
+      this.version = version;
+    }
+
+    /**
+     * Can this replica become leader or is this replica's term equals to leader's term?
+     * @param coreNodeName of the replica
+     * @return true if this replica can become leader, false if otherwise
+     */
+    boolean canBecomeLeader(String coreNodeName) {
+      if (values.isEmpty()) return true;
+      long maxTerm = Collections.max(values.values());
+      return values.getOrDefault(coreNodeName, 0L) == maxTerm;
+    }
+
+    Long getTerm(String coreNodeName) {
+      return values.get(coreNodeName);
+    }
+
+    /**
+     * Return a new {@link Terms} in which term of {@code leader} is higher than {@code replicasNeedingRecovery}
+     * @param leader coreNodeName of leader
+     * @param replicasNeedingRecovery set of replicas in which their terms should be lower than leader's term
+     * @return null if term of {@code leader} is already higher than {@code replicasNeedingRecovery}
+     */
+    Terms increaseTerms(String leader, Set<String> replicasNeedingRecovery) {
+      if (!values.containsKey(leader)) {
+        throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Can not find leader's term " + leader);
+      }
+
+      boolean changed = false;
+      boolean foundReplicasInLowerTerms = false;
+
+      HashMap<String, Long> newValues = new HashMap<>(values);
+      long leaderTerm = newValues.get(leader);
+      for (String replica : newValues.keySet()) {
+        if (replicasNeedingRecovery.contains(replica)) foundReplicasInLowerTerms = true;
+        if (Objects.equals(newValues.get(replica), leaderTerm)) {
+          if(replicasNeedingRecovery.contains(replica)) {
+            changed = true;
+          } else {
+            newValues.put(replica, leaderTerm+1);
+          }
+        }
+      }
+
+      // We should skip the optimization if there are no replicasNeedingRecovery present in local terms,
+      // this may indicate that the current value is stale
+      if (!changed && foundReplicasInLowerTerms) return null;
+      return new Terms(newValues, version);
+    }
+
+    /**
+     * Return a new {@link Terms} in which term of {@code coreNodeName} is removed
+     * @param coreNodeName of the replica
+     * @return null if term of {@code coreNodeName} is already not exist
+     */
+    Terms removeTerm(String coreNodeName) {
+      if (!values.containsKey(coreNodeName)) return null;
+
+      HashMap<String, Long> newValues = new HashMap<>(values);
+      newValues.remove(coreNodeName);
+      return new Terms(newValues, version);
+    }
+
+    /**
+     * Return a new {@link Terms} in which the associate term of {@code coreNodeName} is not null
+     * @param coreNodeName of the replica
+     * @return null if term of {@code coreNodeName} is already exist
+     */
+    Terms registerTerm(String coreNodeName) {
+      if (values.containsKey(coreNodeName)) return null;
+
+      HashMap<String, Long> newValues = new HashMap<>(values);
+      newValues.put(coreNodeName, 0L);
+      return new Terms(newValues, version);
+    }
+
+    /**
+     * Return a new {@link Terms} in which the term of {@code coreNodeName} is max
+     * @param coreNodeName of the replica
+     * @return null if term of {@code coreNodeName} is already maximum
+     */
+    Terms setEqualsToMax(String coreNodeName) {
+      long maxTerm;
+      try {
+        maxTerm = Collections.max(values.values());
+      } catch (NoSuchElementException e){
+        maxTerm = 0;
+      }
+      if (values.get(coreNodeName) == maxTerm) return null;
+
+      HashMap<String, Long> newValues = new HashMap<>(values);
+      newValues.put(coreNodeName, maxTerm);
+      return new Terms(newValues, version);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/8c8d78a4/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateCollectionCmd.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateCollectionCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateCollectionCmd.java
index 4c6ce47..428ad83 100644
--- a/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateCollectionCmd.java
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateCollectionCmd.java
@@ -34,6 +34,7 @@ import java.util.concurrent.atomic.AtomicReference;
 
 import org.apache.solr.client.solrj.cloud.autoscaling.AlreadyExistsException;
 import org.apache.solr.client.solrj.cloud.autoscaling.AutoScalingConfig;
+import org.apache.solr.client.solrj.cloud.autoscaling.BadVersionException;
 import org.apache.solr.client.solrj.cloud.autoscaling.DistribStateManager;
 import org.apache.solr.client.solrj.cloud.autoscaling.Policy;
 import org.apache.solr.client.solrj.cloud.autoscaling.PolicyHelper;
@@ -392,7 +393,22 @@ public class CreateCollectionCmd implements OverseerCollectionMessageHandler.Cmd
   public static void createCollectionZkNode(DistribStateManager stateManager, String collection, Map<String,String> params) {
     log.debug("Check for collection zkNode:" + collection);
     String collectionPath = ZkStateReader.COLLECTIONS_ZKNODE + "/" + collection;
-
+    // clean up old terms node
+    String termsPath = ZkStateReader.COLLECTIONS_ZKNODE + "/" + collection + "/terms";
+    try {
+      if (stateManager.hasData(termsPath)) {
+        List<String> paths = stateManager.listData(termsPath);
+        for (String path : paths) {
+          stateManager.removeData(termsPath + "/" + path, -1);
+        }
+        stateManager.removeData(termsPath, -1);
+      }
+    } catch (InterruptedException e) {
+      Thread.interrupted();
+      throw new SolrException(ErrorCode.SERVER_ERROR, "Error deleting old term nodes for collection from Zookeeper", e);
+    } catch (KeeperException | IOException | BadVersionException e) {
+      throw new SolrException(ErrorCode.SERVER_ERROR, "Error deleting old term nodes for collection from Zookeeper", e);
+    }
     try {
       if (!stateManager.hasData(collectionPath)) {
         log.debug("Creating collection in ZooKeeper:" + collection);

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/8c8d78a4/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java b/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java
index cebb2d0..dcc3de6 100644
--- a/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java
+++ b/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java
@@ -28,8 +28,10 @@ import java.util.List;
 import java.util.Locale;
 import java.util.Map;
 import java.util.Optional;
+import java.util.OptionalLong;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
 import java.util.function.BiConsumer;
 
 import com.google.common.collect.ImmutableSet;
@@ -47,6 +49,7 @@ import org.apache.solr.cloud.OverseerSolrResponse;
 import org.apache.solr.cloud.OverseerTaskQueue;
 import org.apache.solr.cloud.OverseerTaskQueue.QueueEvent;
 import org.apache.solr.cloud.ZkController;
+import org.apache.solr.cloud.ZkShardTerms;
 import org.apache.solr.cloud.api.collections.OverseerCollectionMessageHandler;
 import org.apache.solr.cloud.overseer.SliceMutator;
 import org.apache.solr.cloud.rule.ReplicaAssigner;
@@ -1067,7 +1070,8 @@ public class CollectionsHandler extends RequestHandlerBase implements Permission
   }
 
   private static void forceLeaderElection(SolrQueryRequest req, CollectionsHandler handler) {
-    ClusterState clusterState = handler.coreContainer.getZkController().getClusterState();
+    ZkController zkController = handler.coreContainer.getZkController();
+    ClusterState clusterState = zkController.getClusterState();
     String collectionName = req.getParams().required().get(COLLECTION_PROP);
     String sliceId = req.getParams().required().get(SHARD_ID_PROP);
 
@@ -1079,7 +1083,7 @@ public class CollectionsHandler extends RequestHandlerBase implements Permission
           "No shard with name " + sliceId + " exists for collection " + collectionName);
     }
 
-    try {
+    try (ZkShardTerms zkShardTerms = new ZkShardTerms(collectionName, slice.getName(), zkController.getZkClient())) {
       // if an active replica is the leader, then all is fine already
       Replica leader = slice.getLeader();
       if (leader != null && leader.getState() == State.ACTIVE) {
@@ -1096,20 +1100,37 @@ public class CollectionsHandler extends RequestHandlerBase implements Permission
         handler.coreContainer.getZkController().getZkClient().clean(lirPath);
       }
 
+      final Set<String> liveNodes = clusterState.getLiveNodes();
+      List<Replica> liveReplicas = slice.getReplicas().stream()
+          .filter(rep -> liveNodes.contains(rep.getNodeName())).collect(Collectors.toList());
+      boolean shouldIncreaseReplicaTerms = liveReplicas.stream()
+          .noneMatch(rep -> zkShardTerms.registered(rep.getName()) && zkShardTerms.canBecomeLeader(rep.getName()));
+      // we won't increase replica's terms if exist a live replica with term equals to leader
+      if (shouldIncreaseReplicaTerms) {
+        OptionalLong optionalMaxTerm = liveReplicas.stream()
+            .filter(rep -> zkShardTerms.registered(rep.getName()))
+            .mapToLong(rep -> zkShardTerms.getTerm(rep.getName()))
+            .max();
+        // increase terms of replicas less out-of-sync
+        if (optionalMaxTerm.isPresent()) {
+          liveReplicas.stream()
+              .filter(rep -> zkShardTerms.getTerm(rep.getName()) == optionalMaxTerm.getAsLong())
+              .forEach(rep -> zkShardTerms.setEqualsToMax(rep.getName()));
+        }
+      }
+
       // Call all live replicas to prepare themselves for leadership, e.g. set last published
       // state to active.
-      for (Replica rep : slice.getReplicas()) {
-        if (clusterState.getLiveNodes().contains(rep.getNodeName())) {
-          ShardHandler shardHandler = handler.coreContainer.getShardHandlerFactory().getShardHandler();
+      for (Replica rep : liveReplicas) {
+        ShardHandler shardHandler = handler.coreContainer.getShardHandlerFactory().getShardHandler();
 
-          ModifiableSolrParams params = new ModifiableSolrParams();
-          params.set(CoreAdminParams.ACTION, CoreAdminAction.FORCEPREPAREFORLEADERSHIP.toString());
-          params.set(CoreAdminParams.CORE, rep.getStr("core"));
-          String nodeName = rep.getNodeName();
+        ModifiableSolrParams params = new ModifiableSolrParams();
+        params.set(CoreAdminParams.ACTION, CoreAdminAction.FORCEPREPAREFORLEADERSHIP.toString());
+        params.set(CoreAdminParams.CORE, rep.getStr("core"));
+        String nodeName = rep.getNodeName();
 
-          OverseerCollectionMessageHandler.sendShardRequest(nodeName, params, shardHandler, null, null,
-              CommonParams.CORES_HANDLER_PATH, handler.coreContainer.getZkController().getZkStateReader()); // synchronous request
-        }
+        OverseerCollectionMessageHandler.sendShardRequest(nodeName, params, shardHandler, null, null,
+            CommonParams.CORES_HANDLER_PATH, handler.coreContainer.getZkController().getZkStateReader()); // synchronous request
       }
 
       // Wait till we have an active leader

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/8c8d78a4/solr/core/src/java/org/apache/solr/handler/admin/PrepRecoveryOp.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/handler/admin/PrepRecoveryOp.java b/solr/core/src/java/org/apache/solr/handler/admin/PrepRecoveryOp.java
index 0a6d5ce..3647735 100644
--- a/solr/core/src/java/org/apache/solr/handler/admin/PrepRecoveryOp.java
+++ b/solr/core/src/java/org/apache/solr/handler/admin/PrepRecoveryOp.java
@@ -21,6 +21,7 @@ import java.lang.invoke.MethodHandles;
 import java.util.Objects;
 
 import org.apache.solr.cloud.CloudDescriptor;
+import org.apache.solr.cloud.ZkShardTerms;
 import org.apache.solr.common.SolrException;
 import org.apache.solr.common.cloud.ClusterState;
 import org.apache.solr.common.cloud.DocCollection;
@@ -124,6 +125,12 @@ class PrepRecoveryOp implements CoreAdminHandler.CoreAdminOp {
                 log.warn("Leader " + core.getName() + " ignoring request to be in the recovering state because it is live and active.");
               }
 
+              ZkShardTerms shardTerms = coreContainer.getZkController().getShardTerms(collectionName, slice.getName());
+              // if the replica is waiting for leader to see recovery state, the leader should refresh its terms
+              if (waitForState == Replica.State.RECOVERING && shardTerms.registered(coreNodeName) && !shardTerms.canBecomeLeader(coreNodeName)) {
+                shardTerms.refreshTerms();
+              }
+
               boolean onlyIfActiveCheckResult = onlyIfLeaderActive != null && onlyIfLeaderActive && localState != Replica.State.ACTIVE;
               log.info("In WaitForState(" + waitForState + "): collection=" + collectionName + ", shard=" + slice.getName() +
                   ", thisCore=" + core.getName() + ", leaderDoesNotNeedRecovery=" + leaderDoesNotNeedRecovery +

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/8c8d78a4/solr/core/src/java/org/apache/solr/update/DefaultSolrCoreState.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/update/DefaultSolrCoreState.java b/solr/core/src/java/org/apache/solr/update/DefaultSolrCoreState.java
index b418a19..739604f 100644
--- a/solr/core/src/java/org/apache/solr/update/DefaultSolrCoreState.java
+++ b/solr/core/src/java/org/apache/solr/update/DefaultSolrCoreState.java
@@ -308,19 +308,20 @@ public final class DefaultSolrCoreState extends SolrCoreState implements Recover
           // after the current one, and if there is, bail
           boolean locked = recoveryLock.tryLock();
           try {
-            if (!locked) {
-              if (recoveryWaiting.get() > 0) {
-                return;
-              }
-              recoveryWaiting.incrementAndGet();
-            } else {
-              recoveryWaiting.incrementAndGet();
-              cancelRecovery();
+            if (!locked && recoveryWaiting.get() > 0) {
+              return;
             }
+
+            recoveryWaiting.incrementAndGet();
+            cancelRecovery();
             
             recoveryLock.lock();
             try {
-              recoveryWaiting.decrementAndGet();
+              // don't use recoveryLock.getQueueLength() for this
+              if (recoveryWaiting.decrementAndGet() > 0) {
+                // another recovery waiting behind us, let it run now instead of after we finish
+                return;
+              }
               
               // to be air tight we must also check after lock
               if (cc.isShutDown()) {

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/8c8d78a4/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java b/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java
index de031a2..3cff171 100644
--- a/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java
+++ b/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java
@@ -23,6 +23,7 @@ import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.EnumSet;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
@@ -44,6 +45,7 @@ import org.apache.solr.client.solrj.response.SimpleSolrResponse;
 import org.apache.solr.cloud.CloudDescriptor;
 import org.apache.solr.cloud.Overseer;
 import org.apache.solr.cloud.ZkController;
+import org.apache.solr.cloud.ZkShardTerms;
 import org.apache.solr.cloud.overseer.OverseerAction;
 import org.apache.solr.common.SolrException;
 import org.apache.solr.common.SolrException.ErrorCode;
@@ -184,6 +186,10 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
   private final boolean cloneRequiredOnLeader;
   private final Replica.Type replicaType;
 
+  @Deprecated
+  // this flag, used for testing rolling updates, should be removed by SOLR-11812
+  private final boolean isOldLIRMode;
+
   public DistributedUpdateProcessor(SolrQueryRequest req, SolrQueryResponse rsp, UpdateRequestProcessor next) {
     this(req, rsp, new AtomicUpdateDocumentMerger(req), next);
   }
@@ -202,6 +208,7 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
 
     this.ulog = req.getCore().getUpdateHandler().getUpdateLog();
     this.vinfo = ulog == null ? null : ulog.getVersionInfo();
+    this.isOldLIRMode = !"new".equals(req.getCore().getCoreDescriptor().getCoreProperty("lirVersion", "new"));
     versionsStored = this.vinfo != null && this.vinfo.getVersionField() != null;
     returnVersions = req.getParams().getBool(UpdateParams.VERSIONS ,false);
 
@@ -343,13 +350,13 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
         }
 
         List<Node> nodes = new ArrayList<>(replicaProps.size());
+        ZkShardTerms zkShardTerms = zkController.getShardTerms(collection, shardId);
         for (ZkCoreNodeProps props : replicaProps) {
-          if (skipList != null) {
-            boolean skip = skipListSet.contains(props.getCoreUrl());
-            log.info("check url:" + props.getCoreUrl() + " against:" + skipListSet + " result:" + skip);
-            if (!skip) {
-              nodes.add(new StdNode(props, collection, shardId));
-            }
+          String coreNodeName = ((Replica) props.getNodeProps()).getName();
+          if (skipList != null && skipListSet.contains(props.getCoreUrl())) {
+            log.info("check url:" + props.getCoreUrl() + " against:" + skipListSet + " result:true");
+          } else if(!isOldLIRMode && zkShardTerms.registered(coreNodeName) && !zkShardTerms.canBecomeLeader(coreNodeName)) {
+            log.info("skip url:{} cause its term is less than leader", props.getCoreUrl());
           } else {
             nodes.add(new StdNode(props, collection, shardId));
           }
@@ -751,7 +758,7 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
     // TODO - we may need to tell about more than one error...
 
     List<Error> errorsForClient = new ArrayList<>(errors.size());
-    
+    Map<ShardInfo, Set<String>> failedReplicas = new HashMap<>();
     for (final SolrCmdDistributor.Error error : errors) {
       
       if (error.req.node instanceof RetryNode) {
@@ -843,18 +850,27 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
             && foundErrorNodeInReplicaList // we found an error for one of replicas
             && !stdNode.getNodeProps().getCoreUrl().equals(leaderProps.getCoreUrl())) { // we do not want to put ourself into LIR
           try {
+            String coreNodeName = ((Replica) stdNode.getNodeProps().getNodeProps()).getName();
             // if false, then the node is probably not "live" anymore
             // and we do not need to send a recovery message
             Throwable rootCause = SolrException.getRootCause(error.e);
-            log.error("Setting up to try to start recovery on replica {}", replicaUrl, rootCause);
-            zkController.ensureReplicaInLeaderInitiatedRecovery(
-                req.getCore().getCoreContainer(),
-                collection,
-                shardId,
-                stdNode.getNodeProps(),
-                req.getCore().getCoreDescriptor(),
-                false /* forcePublishState */
-            );
+            if (!isOldLIRMode && zkController.getShardTerms(collection, shardId).registered(coreNodeName)) {
+              log.error("Setting up to try to start recovery on replica {} with url {} by increasing leader term", coreNodeName, replicaUrl, rootCause);
+              ShardInfo shardInfo = new ShardInfo(collection, shardId, leaderCoreNodeName);
+              failedReplicas.putIfAbsent(shardInfo, new HashSet<>());
+              failedReplicas.get(shardInfo).add(coreNodeName);
+            } else {
+              // The replica did not registered its term, so it must run with old LIR implementation
+              log.error("Setting up to try to start recovery on replica {}", replicaUrl, rootCause);
+              zkController.ensureReplicaInLeaderInitiatedRecovery(
+                  req.getCore().getCoreContainer(),
+                  collection,
+                  shardId,
+                  stdNode.getNodeProps(),
+                  req.getCore().getCoreDescriptor(),
+                  false /* forcePublishState */
+              );
+            }
           } catch (Exception exc) {
             Throwable setLirZnodeFailedCause = SolrException.getRootCause(exc);
             log.error("Leader failed to set replica " +
@@ -873,6 +889,12 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
         }
       }
     }
+    if (!isOldLIRMode) {
+      for (Map.Entry<ShardInfo, Set<String>> entry : failedReplicas.entrySet()) {
+        ShardInfo shardInfo = entry.getKey();
+        zkController.getShardTerms(shardInfo.collection, shardInfo.shard).ensureTermsIsHigher(shardInfo.leader, entry.getValue());
+      }
+    }
     // in either case, we need to attach the achieved and min rf to the response.
     if (leaderReplicationTracker != null || rollupReplicationTracker != null) {
       int achievedRf = Integer.MAX_VALUE;
@@ -905,6 +927,38 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
     }
   }
 
+  private class ShardInfo {
+    private String collection;
+    private String shard;
+    private String leader;
+
+    public ShardInfo(String collection, String shard, String leader) {
+      this.collection = collection;
+      this.shard = shard;
+      this.leader = leader;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+      if (this == o) return true;
+      if (o == null || getClass() != o.getClass()) return false;
+
+      ShardInfo shardInfo = (ShardInfo) o;
+
+      if (!collection.equals(shardInfo.collection)) return false;
+      if (!shard.equals(shardInfo.shard)) return false;
+      return leader.equals(shardInfo.leader);
+    }
+
+    @Override
+    public int hashCode() {
+      int result = collection.hashCode();
+      result = 31 * result + shard.hashCode();
+      result = 31 * result + leader.hashCode();
+      return result;
+    }
+  }
+
  
   // must be synchronized by bucket
   private void doLocalAdd(AddUpdateCommand cmd) throws IOException {