You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by da...@apache.org on 2018/03/04 05:57:17 UTC

lucene-solr:master: SOLR-12011: Consistence problem when in-sync replicas are DOWN

Repository: lucene-solr
Updated Branches:
  refs/heads/master ad7e94afb -> 9de4225e9


SOLR-12011: Consistence problem when in-sync replicas are DOWN


Project: http://git-wip-us.apache.org/repos/asf/lucene-solr/repo
Commit: http://git-wip-us.apache.org/repos/asf/lucene-solr/commit/9de4225e
Tree: http://git-wip-us.apache.org/repos/asf/lucene-solr/tree/9de4225e
Diff: http://git-wip-us.apache.org/repos/asf/lucene-solr/diff/9de4225e

Branch: refs/heads/master
Commit: 9de4225e9a54ba987c2c7c9d4510bea3e4f9de97
Parents: ad7e94a
Author: Cao Manh Dat <da...@apache.org>
Authored: Sun Mar 4 12:57:05 2018 +0700
Committer: Cao Manh Dat <da...@apache.org>
Committed: Sun Mar 4 12:57:05 2018 +0700

----------------------------------------------------------------------
 solr/CHANGES.txt                                |   5 +
 .../org/apache/solr/cloud/CloudDescriptor.java  |   2 +-
 .../org/apache/solr/cloud/ElectionContext.java  |  84 ++----
 .../solr/cloud/RecoveringCoreTermWatcher.java   |   2 +-
 .../org/apache/solr/cloud/ZkController.java     |  15 +-
 .../org/apache/solr/cloud/ZkShardTerms.java     | 156 ++++++++++-
 .../org/apache/solr/core/CoreContainer.java     |   2 +
 .../solr/handler/admin/CollectionsHandler.java  |   2 +-
 .../solr/handler/admin/CoreAdminOperation.java  |  11 -
 .../solr/handler/admin/PrepRecoveryOp.java      |   5 +-
 .../solr/handler/admin/RestoreCoreOp.java       |  13 +
 .../org/apache/solr/handler/admin/SplitOp.java  |  21 ++
 .../processor/DistributedUpdateProcessor.java   |  90 +++---
 .../apache/solr/cloud/TestCloudConsistency.java | 276 +++++++++++++++++++
 .../org/apache/solr/cloud/TestPullReplica.java  |  14 +
 .../org/apache/solr/cloud/ZkShardTermsTest.java |  41 ++-
 16 files changed, 595 insertions(+), 144 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/9de4225e/solr/CHANGES.txt
----------------------------------------------------------------------
diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt
index 7b4f878..e0ef777 100644
--- a/solr/CHANGES.txt
+++ b/solr/CHANGES.txt
@@ -78,6 +78,9 @@ Upgrade Notes
 * LUCENE-8161: If you are using the spatial JTS library with Solr, you must upgrade to 1.15.0.  This new version
   of JTS is now dual-licensed to include a BSD style license.
 
+* SOLR-12011: Replicas which are not up-to-date are not allowed to become leader. Use FORCELEADER API to
+  allow these replicas become leader.
+
 New Features
 ----------------------
 * SOLR-11285: Simulation framework for autoscaling. (ab)
@@ -236,6 +239,8 @@ Bug Fixes
   Also changed the display label in the Admin UI from routerField to router.field to match the actual API.
   (Shawn Heisey via Cassandra Targett)
 
+* SOLR-12011: Consistence problem when in-sync replicas are DOWN. (Cao Manh Dat)
+
 Optimizations
 ----------------------
 

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/9de4225e/solr/core/src/java/org/apache/solr/cloud/CloudDescriptor.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/CloudDescriptor.java b/solr/core/src/java/org/apache/solr/cloud/CloudDescriptor.java
index 32cb65b..068191e 100644
--- a/solr/core/src/java/org/apache/solr/cloud/CloudDescriptor.java
+++ b/solr/core/src/java/org/apache/solr/cloud/CloudDescriptor.java
@@ -42,7 +42,7 @@ public class CloudDescriptor {
   // set to true once a core has registered in zk
   // set to false on detecting a session expiration
   private volatile boolean hasRegistered = false;
-  volatile Replica.State lastPublished = Replica.State.ACTIVE;
+  private volatile Replica.State lastPublished = Replica.State.ACTIVE;
 
   public static final String NUM_SHARDS = "numShards";
   

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/9de4225e/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java b/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java
index 2d00151..3ba4dfc 100644
--- a/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java
+++ b/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java
@@ -20,9 +20,10 @@ import java.io.Closeable;
 import java.io.IOException;
 import java.lang.invoke.MethodHandles;
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.EnumSet;
+import java.util.HashSet;
 import java.util.List;
+import java.util.Set;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 
@@ -359,12 +360,18 @@ final class ShardLeaderElectionContext extends ShardLeaderElectionContextBase {
         }
         
         replicaType = core.getCoreDescriptor().getCloudDescriptor().getReplicaType();
-        
+        String coreNodeName = core.getCoreDescriptor().getCloudDescriptor().getCoreNodeName();
         // should I be leader?
-        if (weAreReplacement && !shouldIBeLeader(leaderProps, core, weAreReplacement)) {
+        if (zkController.getShardTerms(collection, shardId).registered(coreNodeName)
+            && !zkController.getShardTerms(collection, shardId).canBecomeLeader(coreNodeName)) {
+          log.info("Can't become leader, term of replica {} less than leader", coreNodeName);
           rejoinLeaderElection(core);
           return;
         }
+
+        if (isClosed) {
+          return;
+        }
         
         log.info("I may be the new leader - try and sync");
         
@@ -516,8 +523,7 @@ final class ShardLeaderElectionContext extends ShardLeaderElectionContextBase {
         zkStateReader.forceUpdateCollection(collection);
         ClusterState clusterState = zkStateReader.getClusterState();
         Replica rep = getReplica(clusterState, collection, leaderProps.getStr(ZkStateReader.CORE_NODE_NAME_PROP));
-        if (rep != null && rep.getState() != Replica.State.ACTIVE
-            && rep.getState() != Replica.State.RECOVERING) {
+        if (rep != null && rep.getState() != Replica.State.ACTIVE) {
           log.debug("We have become the leader after core registration but are not in an ACTIVE state - publishing ACTIVE");
           zkController.publish(core.getCoreDescriptor(), Replica.State.ACTIVE);
         }
@@ -593,40 +599,43 @@ final class ShardLeaderElectionContext extends ShardLeaderElectionContextBase {
       }
 
       if (replicas != null && replicas.size() > 0) {
+        // set of replicas which is running in new LIR but lirState=DOWN
+        Set<String> replicasMustBeInLowerTerm = new HashSet<>();
         for (String replicaCoreNodeName : replicas) {
 
           if (coreNodeName.equals(replicaCoreNodeName))
             continue; // added safe-guard so we don't mark this core as down
 
-          if (zkController.getShardTerms(collection, shardId).registered(replicaCoreNodeName)) {
-            // the replica registered its term so it is running with the new LIR implementation
-            // we can put this replica into recovery by increase our terms
-            zkController.getShardTerms(collection, shardId).ensureTermsIsHigher(coreNodeName, Collections.singleton(replicaCoreNodeName));
-            continue;
-          }
-
           final Replica.State lirState = zkController.getLeaderInitiatedRecoveryState(coll, shardId, replicaCoreNodeName);
           if (lirState == Replica.State.DOWN || lirState == Replica.State.RECOVERY_FAILED) {
             log.info("After core={} coreNodeName={} was elected leader, a replica coreNodeName={} was found in state: "
                 + lirState.toString() + " and needing recovery.", coreName, coreNodeName, replicaCoreNodeName);
-            List<ZkCoreNodeProps> replicaProps =
-                zkController.getZkStateReader().getReplicaProps(collection, shardId, coreNodeName);
+            List<Replica> replicasProps =
+                zkController.getZkStateReader().getClusterState().getCollection(collection)
+                    .getSlice(shardId).getReplicas(EnumSet.of(Replica.Type.NRT, Replica.Type.TLOG));
 
-            if (replicaProps != null && replicaProps.size() > 0) {
+            if (replicasProps != null && replicasProps.size() > 0) {
               ZkCoreNodeProps coreNodeProps = null;
-              for (ZkCoreNodeProps p : replicaProps) {
-                if (((Replica)p.getNodeProps()).getName().equals(replicaCoreNodeName)) {
-                  coreNodeProps = p;
+              for (Replica p : replicasProps) {
+                if (p.getName().equals(replicaCoreNodeName)) {
+                  coreNodeProps = new ZkCoreNodeProps(p);
                   break;
                 }
               }
 
-              zkController.ensureReplicaInLeaderInitiatedRecovery(cc,
-                  collection, shardId, coreNodeProps, core.getCoreDescriptor(),
-                  false /* forcePublishState */);
+              if (zkController.getShardTerms(collection, shardId).registered(replicaCoreNodeName)) {
+                replicasMustBeInLowerTerm.add(replicaCoreNodeName);
+              } else {
+                zkController.ensureReplicaInLeaderInitiatedRecovery(cc,
+                    collection, shardId, coreNodeProps, core.getCoreDescriptor(),
+                    false /* forcePublishState */);
+              }
             }
           }
         }
+        // these replicas registered their terms so it is running with the new LIR implementation
+        // we can put this replica into recovery by increase our terms
+        zkController.getShardTerms(collection, shardId).ensureTermsIsHigher(coreNodeName, replicasMustBeInLowerTerm);
       }
     } // core gets closed automagically
   }
@@ -741,39 +750,6 @@ final class ShardLeaderElectionContext extends ShardLeaderElectionContextBase {
     leaderElector.joinElection(this, true);
   }
 
-  private boolean shouldIBeLeader(ZkNodeProps leaderProps, SolrCore core, boolean weAreReplacement) {
-    log.debug("Checking if I should try and be the leader.");
-    
-    if (isClosed) {
-      log.debug("Bailing on leader process because we have been closed");
-      return false;
-    }
-    
-    if (!weAreReplacement) {
-      // we are the first node starting in the shard - there is a configurable wait
-      // to make sure others participate in sync and leader election, we can be leader
-      return true;
-    }
-
-    String coreNodeName = core.getCoreDescriptor().getCloudDescriptor().getCoreNodeName();
-    if (zkController.getShardTerms(collection, shardId).registered(coreNodeName)
-        && !zkController.getShardTerms(collection, shardId).canBecomeLeader(coreNodeName)) {
-      log.info("Can't become leader, term of replica {} less than leader", coreNodeName);
-      return false;
-    }
-
-    if (core.getCoreDescriptor().getCloudDescriptor().getLastPublished() == Replica.State.ACTIVE) {
-      log.debug("My last published State was Active, it's okay to be the leader.");
-      return true;
-    }
-    log.debug("My last published State was "
-        + core.getCoreDescriptor().getCloudDescriptor().getLastPublished()
-        + ", I won't be the leader.");
-    // TODO: and if no one is a good candidate?
-    
-    return false;
-  }
-  
 }
 
 final class OverseerElectionContext extends ElectionContext {

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/9de4225e/solr/core/src/java/org/apache/solr/cloud/RecoveringCoreTermWatcher.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/RecoveringCoreTermWatcher.java b/solr/core/src/java/org/apache/solr/cloud/RecoveringCoreTermWatcher.java
index 26fec97..90a500a 100644
--- a/solr/core/src/java/org/apache/solr/cloud/RecoveringCoreTermWatcher.java
+++ b/solr/core/src/java/org/apache/solr/cloud/RecoveringCoreTermWatcher.java
@@ -48,7 +48,7 @@ public class RecoveringCoreTermWatcher implements ZkShardTerms.CoreTermWatcher {
     if (solrCore.getCoreDescriptor() == null || solrCore.getCoreDescriptor().getCloudDescriptor() == null) return true;
 
     String coreNodeName = solrCore.getCoreDescriptor().getCloudDescriptor().getCoreNodeName();
-    if (terms.canBecomeLeader(coreNodeName)) return true;
+    if (terms.haveHighestTermValue(coreNodeName)) return true;
     if (lastTermDoRecovery.get() < terms.getTerm(coreNodeName)) {
       log.info("Start recovery on {} because core's term is less than leader's term", coreNodeName);
       lastTermDoRecovery.set(terms.getTerm(coreNodeName));

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/9de4225e/solr/core/src/java/org/apache/solr/cloud/ZkController.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/ZkController.java b/solr/core/src/java/org/apache/solr/cloud/ZkController.java
index cb1fcea..a159db5 100644
--- a/solr/core/src/java/org/apache/solr/cloud/ZkController.java
+++ b/solr/core/src/java/org/apache/solr/cloud/ZkController.java
@@ -1045,7 +1045,7 @@ public class ZkController {
 
       // This flag is used for testing rolling updates and should be removed in SOLR-11812
       boolean isRunningInNewLIR = "new".equals(desc.getCoreProperty("lirVersion", "new"));
-      if (isRunningInNewLIR) {
+      if (isRunningInNewLIR && cloudDesc.getReplicaType() != Type.PULL) {
         shardTerms.registerTerm(coreZkNodeName);
       }
       String shardId = cloudDesc.getShardId();
@@ -1455,13 +1455,20 @@ public class ZkController {
 
       // This flag is used for testing rolling updates and should be removed in SOLR-11812
       boolean isRunningInNewLIR = "new".equals(cd.getCoreProperty("lirVersion", "new"));
-      if (state == Replica.State.RECOVERING && isRunningInNewLIR) {
-        getShardTerms(collection, shardId).setEqualsToMax(coreNodeName);
+      // pull replicas are excluded because their terms are not considered
+      if (state == Replica.State.RECOVERING && isRunningInNewLIR && cd.getCloudDescriptor().getReplicaType() != Type.PULL) {
+        // state is used by client, state of replica can change from RECOVERING to DOWN without needed to finish recovery
+        // by calling this we will know that a replica actually finished recovery or not
+        getShardTerms(collection, shardId).startRecovering(coreNodeName);
       }
+      if (state == Replica.State.ACTIVE && isRunningInNewLIR && cd.getCloudDescriptor().getReplicaType() != Type.PULL) {
+        getShardTerms(collection, shardId).doneRecovering(coreNodeName);
+      }
+
       ZkNodeProps m = new ZkNodeProps(props);
       
       if (updateLastState) {
-        cd.getCloudDescriptor().lastPublished = state;
+        cd.getCloudDescriptor().setLastPublished(state);
       }
       overseerJobQueue.offer(Utils.toJSON(m));
     } finally {

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/9de4225e/solr/core/src/java/org/apache/solr/cloud/ZkShardTerms.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/ZkShardTerms.java b/solr/core/src/java/org/apache/solr/cloud/ZkShardTerms.java
index 7dc0d57..50b424c 100644
--- a/solr/core/src/java/org/apache/solr/cloud/ZkShardTerms.java
+++ b/solr/core/src/java/org/apache/solr/cloud/ZkShardTerms.java
@@ -22,7 +22,6 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
-import java.util.NoSuchElementException;
 import java.util.Objects;
 import java.util.Set;
 import java.util.concurrent.TimeoutException;
@@ -100,6 +99,8 @@ public class ZkShardTerms implements AutoCloseable{
    * @param replicasNeedingRecovery set of replicas in which their terms should be lower than leader's term
    */
   public void ensureTermsIsHigher(String leader, Set<String> replicasNeedingRecovery) {
+    if (replicasNeedingRecovery.isEmpty()) return;
+
     Terms newTerms;
     while( (newTerms = terms.increaseTerms(leader, replicasNeedingRecovery)) != null) {
       if (forceSaveTerms(newTerms)) return;
@@ -107,7 +108,7 @@ public class ZkShardTerms implements AutoCloseable{
   }
 
   /**
-   * Can this replica become leader or is this replica's term equals to leader's term?
+   * Can this replica become leader?
    * @param coreNodeName of the replica
    * @return true if this replica can become leader, false if otherwise
    */
@@ -116,6 +117,15 @@ public class ZkShardTerms implements AutoCloseable{
   }
 
   /**
+   * Should leader skip sending updates to this replica?
+   * @param coreNodeName of the replica
+   * @return true if this replica has term equals to leader's term, false if otherwise
+   */
+  public boolean skipSendingUpdatesTo(String coreNodeName) {
+    return !terms.haveHighestTermValue(coreNodeName);
+  }
+
+  /**
    * Did this replica registered its term? This is a sign to check f
    * @param coreNodeName of the replica
    * @return true if this replica registered its term, false if otherwise
@@ -184,16 +194,59 @@ public class ZkShardTerms implements AutoCloseable{
   }
 
   /**
-   * Set a replica's term equals to leader's term
+   * Set a replica's term equals to leader's term.
+   * This call should only be used by {@link org.apache.solr.common.params.CollectionParams.CollectionAction#FORCELEADER}
    * @param coreNodeName of the replica
    */
-  public void setEqualsToMax(String coreNodeName) {
+  public void setTermEqualsToLeader(String coreNodeName) {
+    Terms newTerms;
+    while ( (newTerms = terms.setTermEqualsToLeader(coreNodeName)) != null) {
+      if (forceSaveTerms(newTerms)) break;
+    }
+  }
+
+  public void setTermToZero(String coreNodeName) {
+    Terms newTerms;
+    while ( (newTerms = terms.setTermToZero(coreNodeName)) != null) {
+      if (forceSaveTerms(newTerms)) break;
+    }
+  }
+
+  /**
+   * Mark {@code coreNodeName} as recovering
+   */
+  public void startRecovering(String coreNodeName) {
+    Terms newTerms;
+    while ( (newTerms = terms.startRecovering(coreNodeName)) != null) {
+      if (forceSaveTerms(newTerms)) break;
+    }
+  }
+
+  /**
+   * Mark {@code coreNodeName} as finished recovering
+   */
+  public void doneRecovering(String coreNodeName) {
     Terms newTerms;
-    while ( (newTerms = terms.setEqualsToMax(coreNodeName)) != null) {
+    while ( (newTerms = terms.doneRecovering(coreNodeName)) != null) {
       if (forceSaveTerms(newTerms)) break;
     }
   }
 
+  /**
+   * When first updates come in, all replicas have some data now,
+   * so we must switch from term 0 (registered) to 1 (have some data)
+   */
+  public void ensureHighestTermsAreNotZero() {
+    Terms newTerms;
+    while ( (newTerms = terms.ensureHighestTermsAreNotZero()) != null) {
+      if (forceSaveTerms(newTerms)) break;
+    }
+  }
+
+  public long getHighestTerm() {
+    return terms.getMaxTerm();
+  }
+
   public long getTerm(String coreNodeName) {
     Long term = terms.getTerm(coreNodeName);
     return term == null? -1 : term;
@@ -232,6 +285,7 @@ public class ZkShardTerms implements AutoCloseable{
     try {
       Stat stat = zkClient.setData(znodePath, znodeData, newTerms.version, true);
       setNewTerms(new Terms(newTerms.values, stat.getVersion()));
+      log.info("Successful update terms at {} to {}", znodePath, newTerms);
       return true;
     } catch (KeeperException.BadVersionException e) {
       log.info("Failed to save terms, version is not match, retrying");
@@ -367,6 +421,7 @@ public class ZkShardTerms implements AutoCloseable{
    */
   static class Terms {
     private final Map<String, Long> values;
+    private final long maxTerm;
     // ZK node version
     private final int version;
 
@@ -377,14 +432,25 @@ public class ZkShardTerms implements AutoCloseable{
     public Terms(Map<String, Long> values, int version) {
       this.values = values;
       this.version = version;
+      if (values.isEmpty()) this.maxTerm = 0;
+      else this.maxTerm = Collections.max(values.values());
     }
 
     /**
-     * Can this replica become leader or is this replica's term equals to leader's term?
+     * Can {@code coreNodeName} become leader?
      * @param coreNodeName of the replica
-     * @return true if this replica can become leader, false if otherwise
+     * @return true if {@code coreNodeName} can become leader, false if otherwise
      */
     boolean canBecomeLeader(String coreNodeName) {
+      return haveHighestTermValue(coreNodeName) && !values.containsKey(coreNodeName + "_recovering");
+    }
+
+    /**
+     * Is {@code coreNodeName}'s term highest?
+     * @param coreNodeName of the replica
+     * @return true if term of {@code coreNodeName} is highest
+     */
+    boolean haveHighestTermValue(String coreNodeName) {
       if (values.isEmpty()) return true;
       long maxTerm = Collections.max(values.values());
       return values.getOrDefault(coreNodeName, 0L) == maxTerm;
@@ -428,6 +494,21 @@ public class ZkShardTerms implements AutoCloseable{
     }
 
     /**
+     * Return a new {@link Terms} in which highest terms are not zero
+     * @return null if highest terms are already larger than zero
+     */
+    Terms ensureHighestTermsAreNotZero() {
+      if (maxTerm > 0) return null;
+      else {
+        HashMap<String, Long> newValues = new HashMap<>(values);
+        for (String replica : values.keySet()) {
+          newValues.put(replica, 1L);
+        }
+        return new Terms(newValues, version);
+      }
+    }
+
+    /**
      * Return a new {@link Terms} in which term of {@code coreNodeName} is removed
      * @param coreNodeName of the replica
      * @return null if term of {@code coreNodeName} is already not exist
@@ -453,23 +534,70 @@ public class ZkShardTerms implements AutoCloseable{
       return new Terms(newValues, version);
     }
 
+    Terms setTermToZero(String coreNodeName) {
+      if (values.getOrDefault(coreNodeName, -1L) == 0) {
+        return null;
+      }
+      HashMap<String, Long> newValues = new HashMap<>(values);
+      newValues.put(coreNodeName, 0L);
+      return new Terms(newValues, version);
+    }
+
     /**
      * Return a new {@link Terms} in which the term of {@code coreNodeName} is max
      * @param coreNodeName of the replica
      * @return null if term of {@code coreNodeName} is already maximum
      */
-    Terms setEqualsToMax(String coreNodeName) {
-      long maxTerm;
-      try {
-        maxTerm = Collections.max(values.values());
-      } catch (NoSuchElementException e){
-        maxTerm = 0;
-      }
+    Terms setTermEqualsToLeader(String coreNodeName) {
+      long maxTerm = getMaxTerm();
       if (values.get(coreNodeName) == maxTerm) return null;
 
       HashMap<String, Long> newValues = new HashMap<>(values);
       newValues.put(coreNodeName, maxTerm);
       return new Terms(newValues, version);
     }
+
+    long getMaxTerm() {
+      return maxTerm;
+    }
+
+    /**
+     * Mark {@code coreNodeName} as recovering
+     * @param coreNodeName of the replica
+     * @return null if {@code coreNodeName} is already marked as doing recovering
+     */
+    Terms startRecovering(String coreNodeName) {
+      long maxTerm = getMaxTerm();
+      if (values.get(coreNodeName) == maxTerm && values.getOrDefault(coreNodeName+"_recovering", -1L) == maxTerm)
+        return null;
+
+      HashMap<String, Long> newValues = new HashMap<>(values);
+      newValues.put(coreNodeName, maxTerm);
+      newValues.put(coreNodeName+"_recovering", maxTerm);
+      return new Terms(newValues, version);
+    }
+
+    /**
+     * Mark {@code coreNodeName} as finished recovering
+     * @param coreNodeName of the replica
+     * @return null if term of {@code coreNodeName} is already finished doing recovering
+     */
+    Terms doneRecovering(String coreNodeName) {
+      if (!values.containsKey(coreNodeName+"_recovering")) {
+        return null;
+      }
+
+      HashMap<String, Long> newValues = new HashMap<>(values);
+      newValues.remove(coreNodeName+"_recovering");
+      return new Terms(newValues, version);
+    }
+
+    @Override
+    public String toString() {
+      return "Terms{" +
+          "values=" + values +
+          ", version=" + version +
+          '}';
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/9de4225e/solr/core/src/java/org/apache/solr/core/CoreContainer.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/core/CoreContainer.java b/solr/core/src/java/org/apache/solr/core/CoreContainer.java
index bf25d69..c735071 100644
--- a/solr/core/src/java/org/apache/solr/core/CoreContainer.java
+++ b/solr/core/src/java/org/apache/solr/core/CoreContainer.java
@@ -1132,6 +1132,8 @@ public class CoreContainer {
             if (leader != null && leader.getState() == State.ACTIVE) {
               log.info("Found active leader, will attempt to create fresh core and recover.");
               resetIndexDirectory(dcore, coreConfig);
+              // the index of this core is emptied, its term should be set to 0
+              getZkController().getShardTerms(desc.getCollectionName(), desc.getShardId()).setTermToZero(desc.getCoreNodeName());
               return new SolrCore(this, dcore, coreConfig);
             }
           } catch (SolrException se) {

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/9de4225e/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java b/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java
index 4933559..db70796 100644
--- a/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java
+++ b/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java
@@ -1149,7 +1149,7 @@ public class CollectionsHandler extends RequestHandlerBase implements Permission
         if (optionalMaxTerm.isPresent()) {
           liveReplicas.stream()
               .filter(rep -> zkShardTerms.getTerm(rep.getName()) == optionalMaxTerm.getAsLong())
-              .forEach(rep -> zkShardTerms.setEqualsToMax(rep.getName()));
+              .forEach(rep -> zkShardTerms.setTermEqualsToLeader(rep.getName()));
         }
       }
 

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/9de4225e/solr/core/src/java/org/apache/solr/handler/admin/CoreAdminOperation.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/handler/admin/CoreAdminOperation.java b/solr/core/src/java/org/apache/solr/handler/admin/CoreAdminOperation.java
index fbf24a1..179589b 100644
--- a/solr/core/src/java/org/apache/solr/handler/admin/CoreAdminOperation.java
+++ b/solr/core/src/java/org/apache/solr/handler/admin/CoreAdminOperation.java
@@ -28,7 +28,6 @@ import org.apache.lucene.store.Directory;
 import org.apache.solr.cloud.ZkController;
 import org.apache.solr.common.SolrException;
 import org.apache.solr.common.SolrException.ErrorCode;
-import org.apache.solr.common.cloud.Replica;
 import org.apache.solr.common.params.CoreAdminParams;
 import org.apache.solr.common.params.SolrParams;
 import org.apache.solr.common.util.NamedList;
@@ -234,16 +233,6 @@ enum CoreAdminOperation implements CoreAdminOp {
     if (cname == null) {
       throw new IllegalArgumentException(CoreAdminParams.CORE + " is required");
     }
-    try (SolrCore core = it.handler.coreContainer.getCore(cname)) {
-
-      // Setting the last published state for this core to be ACTIVE
-      if (core != null) {
-        core.getCoreDescriptor().getCloudDescriptor().setLastPublished(Replica.State.ACTIVE);
-        log().info("Setting the last published state for this core, {}, to {}", core.getName(), Replica.State.ACTIVE);
-      } else {
-        SolrException.log(log(), "Could not find core: " + cname);
-      }
-    }
   }),
 
   BACKUPCORE_OP(BACKUPCORE, new BackupCoreOp()),

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/9de4225e/solr/core/src/java/org/apache/solr/handler/admin/PrepRecoveryOp.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/handler/admin/PrepRecoveryOp.java b/solr/core/src/java/org/apache/solr/handler/admin/PrepRecoveryOp.java
index 3647735..d064e78 100644
--- a/solr/core/src/java/org/apache/solr/handler/admin/PrepRecoveryOp.java
+++ b/solr/core/src/java/org/apache/solr/handler/admin/PrepRecoveryOp.java
@@ -127,7 +127,10 @@ class PrepRecoveryOp implements CoreAdminHandler.CoreAdminOp {
 
               ZkShardTerms shardTerms = coreContainer.getZkController().getShardTerms(collectionName, slice.getName());
               // if the replica is waiting for leader to see recovery state, the leader should refresh its terms
-              if (waitForState == Replica.State.RECOVERING && shardTerms.registered(coreNodeName) && !shardTerms.canBecomeLeader(coreNodeName)) {
+              if (waitForState == Replica.State.RECOVERING && shardTerms.registered(coreNodeName) && shardTerms.skipSendingUpdatesTo(coreNodeName)) {
+                // The replica changed it term, then published itself as RECOVERING.
+                // This core already see replica as RECOVERING
+                // so it is guarantees that a live-fetch will be enough for this core to see max term published
                 shardTerms.refreshTerms();
               }
 

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/9de4225e/solr/core/src/java/org/apache/solr/handler/admin/RestoreCoreOp.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/handler/admin/RestoreCoreOp.java b/solr/core/src/java/org/apache/solr/handler/admin/RestoreCoreOp.java
index 03d1478..dbb2af0 100644
--- a/solr/core/src/java/org/apache/solr/handler/admin/RestoreCoreOp.java
+++ b/solr/core/src/java/org/apache/solr/handler/admin/RestoreCoreOp.java
@@ -20,8 +20,10 @@ package org.apache.solr.handler.admin;
 import java.net.URI;
 import java.util.Optional;
 
+import org.apache.solr.cloud.CloudDescriptor;
 import org.apache.solr.cloud.ZkController;
 import org.apache.solr.common.SolrException;
+import org.apache.solr.common.cloud.Slice;
 import org.apache.solr.common.params.CoreAdminParams;
 import org.apache.solr.common.params.SolrParams;
 import org.apache.solr.core.SolrCore;
@@ -61,11 +63,22 @@ class RestoreCoreOp implements CoreAdminHandler.CoreAdminOp {
 
     URI locationUri = repository.createURI(location);
     try (SolrCore core = it.handler.coreContainer.getCore(cname)) {
+      CloudDescriptor cd = core.getCoreDescriptor().getCloudDescriptor();
+      // this core must be the only replica in its shard otherwise
+      // we cannot guarantee consistency between replicas because when we add data (or restore index) to this replica
+      Slice slice = zkController.getClusterState().getCollection(cd.getCollectionName()).getSlice(cd.getShardId());
+      if (slice.getReplicas().size() != 1) {
+        throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
+            "Failed to restore core=" + core.getName() + ", the core must be the only replica in its shard");
+      }
       RestoreCore restoreCore = new RestoreCore(repository, core, locationUri, name);
       boolean success = restoreCore.doRestore();
       if (!success) {
         throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Failed to restore core=" + core.getName());
       }
+      // other replicas to-be-created will know that they are out of date by
+      // looking at their term : 0 compare to term of this core : 1
+      zkController.getShardTerms(cd.getCollectionName(), cd.getShardId()).ensureHighestTermsAreNotZero();
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/9de4225e/solr/core/src/java/org/apache/solr/handler/admin/SplitOp.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/handler/admin/SplitOp.java b/solr/core/src/java/org/apache/solr/handler/admin/SplitOp.java
index 5267c75..5e924d8 100644
--- a/solr/core/src/java/org/apache/solr/handler/admin/SplitOp.java
+++ b/solr/core/src/java/org/apache/solr/handler/admin/SplitOp.java
@@ -23,6 +23,8 @@ import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
 
+import org.apache.solr.cloud.CloudDescriptor;
+import org.apache.solr.cloud.ZkShardTerms;
 import org.apache.solr.common.SolrException;
 import org.apache.solr.common.cloud.ClusterState;
 import org.apache.solr.common.cloud.DocCollection;
@@ -111,6 +113,16 @@ class SplitOp implements CoreAdminHandler.CoreAdminOp {
           SolrCore newcore = it.handler.coreContainer.getCore(newCoreName);
           if (newcore != null) {
             newCores.add(newcore);
+            if (it.handler.coreContainer.isZooKeeperAware()) {
+              // this core must be the only replica in its shard otherwise
+              // we cannot guarantee consistency between replicas because when we add data to this replica
+              CloudDescriptor cd = newcore.getCoreDescriptor().getCloudDescriptor();
+              ClusterState clusterState = it.handler.coreContainer.getZkController().getClusterState();
+              if (clusterState.getCollection(cd.getCollectionName()).getSlice(cd.getShardId()).getReplicas().size() != 1) {
+                throw new SolrException(SolrException.ErrorCode.BAD_REQUEST,
+                    "Core with core name " + newCoreName + " must be the only replica in shard " + cd.getShardId());
+              }
+            }
           } else {
             throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Core with core name " + newCoreName + " expected but doesn't exist.");
           }
@@ -123,6 +135,15 @@ class SplitOp implements CoreAdminHandler.CoreAdminOp {
       SplitIndexCommand cmd = new SplitIndexCommand(req, paths, newCores, ranges, router, routeFieldName, splitKey);
       core.getUpdateHandler().split(cmd);
 
+      if (it.handler.coreContainer.isZooKeeperAware()) {
+        for (SolrCore newcore : newCores) {
+          // the index of the core changed from empty to have some data, its term must be not zero
+          CloudDescriptor cd = newcore.getCoreDescriptor().getCloudDescriptor();
+          ZkShardTerms zkShardTerms = it.handler.coreContainer.getZkController().getShardTerms(cd.getCollectionName(), cd.getShardId());
+          zkShardTerms.ensureHighestTermsAreNotZero();
+        }
+      }
+
       // After the split has completed, someone (here?) should start the process of replaying the buffered updates.
 
     } catch (Exception e) {

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/9de4225e/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java b/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java
index b37cb9c..5070582 100644
--- a/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java
+++ b/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java
@@ -23,7 +23,6 @@ import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.EnumSet;
-import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
@@ -173,6 +172,8 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
   private boolean forwardToLeader = false;
   private boolean isSubShardLeader = false;
   private List<Node> nodes;
+  private Set<String> skippedCoreNodeNames;
+  private boolean isIndexChanged = false;
 
   private UpdateCommand updateCommand;  // the current command this processor is working on.
     
@@ -334,9 +335,13 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
         // that means I want to forward onto my replicas...
         // so get the replicas...
         forwardToLeader = false;
-        List<ZkCoreNodeProps> replicaProps = zkController.getZkStateReader()
-            .getReplicaProps(collection, shardId, leaderReplica.getName(), null, Replica.State.DOWN);
-        if (replicaProps == null) {
+        ClusterState clusterState = zkController.getZkStateReader().getClusterState();
+        String leaderCoreNodeName = leaderReplica.getName();
+        List<Replica> replicas = clusterState.getCollection(collection)
+            .getSlice(shardId)
+            .getReplicas(EnumSet.of(Replica.Type.NRT, Replica.Type.TLOG));
+        replicas.removeIf((replica) -> replica.getName().equals(leaderCoreNodeName));
+        if (replicas.isEmpty()) {
           return null;
         }
 
@@ -349,16 +354,20 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
           log.info("test.distrib.skip.servers was found and contains:" + skipListSet);
         }
 
-        List<Node> nodes = new ArrayList<>(replicaProps.size());
+        List<Node> nodes = new ArrayList<>(replicas.size());
+        skippedCoreNodeNames = new HashSet<>();
         ZkShardTerms zkShardTerms = zkController.getShardTerms(collection, shardId);
-        for (ZkCoreNodeProps props : replicaProps) {
-          String coreNodeName = ((Replica) props.getNodeProps()).getName();
-          if (skipList != null && skipListSet.contains(props.getCoreUrl())) {
-            log.info("check url:" + props.getCoreUrl() + " against:" + skipListSet + " result:true");
-          } else if(!isOldLIRMode && zkShardTerms.registered(coreNodeName) && !zkShardTerms.canBecomeLeader(coreNodeName)) {
-            log.info("skip url:{} cause its term is less than leader", props.getCoreUrl());
+        for (Replica replica: replicas) {
+          String coreNodeName = replica.getName();
+          if (skipList != null && skipListSet.contains(replica.getCoreUrl())) {
+            log.info("check url:" + replica.getCoreUrl() + " against:" + skipListSet + " result:true");
+          } else if(!isOldLIRMode && zkShardTerms.registered(coreNodeName) && zkShardTerms.skipSendingUpdatesTo(coreNodeName)) {
+            log.debug("skip url:{} cause its term is less than leader", replica.getCoreUrl());
+            skippedCoreNodeNames.add(replica.getName());
+          } else if (!clusterState.getLiveNodes().contains(replica.getNodeName()) || replica.getState() == Replica.State.DOWN) {
+            skippedCoreNodeNames.add(replica.getName());
           } else {
-            nodes.add(new StdNode(props, collection, shardId));
+            nodes.add(new StdNode(new ZkCoreNodeProps(replica), collection, shardId));
           }
         }
         return nodes;
@@ -750,6 +759,14 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
  
   // TODO: optionally fail if n replicas are not reached...
   private void doFinish() {
+    boolean shouldUpdateTerms = isLeader && !isOldLIRMode && isIndexChanged;
+    if (shouldUpdateTerms) {
+      ZkShardTerms zkShardTerms = zkController.getShardTerms(cloudDesc.getCollectionName(), cloudDesc.getShardId());
+      if (skippedCoreNodeNames != null) {
+        zkShardTerms.ensureTermsIsHigher(cloudDesc.getCoreNodeName(), skippedCoreNodeNames);
+      }
+      zkController.getShardTerms(collection, cloudDesc.getShardId()).ensureHighestTermsAreNotZero();
+    }
     // TODO: if not a forward and replication req is not specified, we could
     // send in a background thread
 
@@ -758,7 +775,7 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
     // TODO - we may need to tell about more than one error...
 
     List<Error> errorsForClient = new ArrayList<>(errors.size());
-    Map<ShardInfo, Set<String>> failedReplicas = new HashMap<>();
+    Set<String> replicasShouldBeInLowerTerms = new HashSet<>();
     for (final SolrCmdDistributor.Error error : errors) {
       
       if (error.req.node instanceof RetryNode) {
@@ -856,9 +873,7 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
             Throwable rootCause = SolrException.getRootCause(error.e);
             if (!isOldLIRMode && zkController.getShardTerms(collection, shardId).registered(coreNodeName)) {
               log.error("Setting up to try to start recovery on replica {} with url {} by increasing leader term", coreNodeName, replicaUrl, rootCause);
-              ShardInfo shardInfo = new ShardInfo(collection, shardId, leaderCoreNodeName);
-              failedReplicas.putIfAbsent(shardInfo, new HashSet<>());
-              failedReplicas.get(shardInfo).add(coreNodeName);
+              replicasShouldBeInLowerTerms.add(coreNodeName);
             } else {
               // The replica did not registered its term, so it must run with old LIR implementation
               log.error("Setting up to try to start recovery on replica {}", replicaUrl, rootCause);
@@ -891,11 +906,9 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
         }
       }
     }
-    if (!isOldLIRMode) {
-      for (Map.Entry<ShardInfo, Set<String>> entry : failedReplicas.entrySet()) {
-        ShardInfo shardInfo = entry.getKey();
-        zkController.getShardTerms(shardInfo.collection, shardInfo.shard).ensureTermsIsHigher(shardInfo.leader, entry.getValue());
-      }
+    if (!isOldLIRMode && !replicasShouldBeInLowerTerms.isEmpty()) {
+      zkController.getShardTerms(cloudDesc.getCollectionName(), cloudDesc.getShardId())
+          .ensureTermsIsHigher(cloudDesc.getCoreNodeName(), replicasShouldBeInLowerTerms);
     }
     // in either case, we need to attach the achieved and min rf to the response.
     if (leaderReplicationTracker != null || rollupReplicationTracker != null) {
@@ -928,48 +941,17 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
       throw new DistributedUpdatesAsyncException(errorsForClient);
     }
   }
-
-  private class ShardInfo {
-    private String collection;
-    private String shard;
-    private String leader;
-
-    public ShardInfo(String collection, String shard, String leader) {
-      this.collection = collection;
-      this.shard = shard;
-      this.leader = leader;
-    }
-
-    @Override
-    public boolean equals(Object o) {
-      if (this == o) return true;
-      if (o == null || getClass() != o.getClass()) return false;
-
-      ShardInfo shardInfo = (ShardInfo) o;
-
-      if (!collection.equals(shardInfo.collection)) return false;
-      if (!shard.equals(shardInfo.shard)) return false;
-      return leader.equals(shardInfo.leader);
-    }
-
-    @Override
-    public int hashCode() {
-      int result = collection.hashCode();
-      result = 31 * result + shard.hashCode();
-      result = 31 * result + leader.hashCode();
-      return result;
-    }
-  }
-
  
   // must be synchronized by bucket
   private void doLocalAdd(AddUpdateCommand cmd) throws IOException {
     super.processAdd(cmd);
+    isIndexChanged = true;
   }
 
   // must be synchronized by bucket
   private void doLocalDelete(DeleteUpdateCommand cmd) throws IOException {
     super.processDelete(cmd);
+    isIndexChanged = true;
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/9de4225e/solr/core/src/test/org/apache/solr/cloud/TestCloudConsistency.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/TestCloudConsistency.java b/solr/core/src/test/org/apache/solr/cloud/TestCloudConsistency.java
new file mode 100644
index 0000000..ee8bf51
--- /dev/null
+++ b/solr/core/src/test/org/apache/solr/cloud/TestCloudConsistency.java
@@ -0,0 +1,276 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.cloud;
+
+import java.io.IOException;
+import java.lang.invoke.MethodHandles;
+import java.net.URI;
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.solr.JSONTestUtil;
+import org.apache.solr.client.solrj.SolrServerException;
+import org.apache.solr.client.solrj.embedded.JettySolrRunner;
+import org.apache.solr.client.solrj.impl.HttpSolrClient;
+import org.apache.solr.client.solrj.request.CollectionAdminRequest;
+import org.apache.solr.client.solrj.request.QueryRequest;
+import org.apache.solr.common.SolrInputDocument;
+import org.apache.solr.common.cloud.Replica;
+import org.apache.solr.common.cloud.ZkCoreNodeProps;
+import org.apache.solr.common.cloud.ZkStateReader;
+import org.apache.solr.common.util.NamedList;
+import org.apache.solr.common.util.TimeSource;
+import org.apache.solr.util.TimeOut;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class TestCloudConsistency extends SolrCloudTestCase {
+
+  private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+  private static Map<JettySolrRunner, SocketProxy> proxies;
+  private static Map<URI, JettySolrRunner> jettys;
+
+  @BeforeClass
+  public static void setupCluster() throws Exception {
+    System.setProperty("solr.directoryFactory", "solr.StandardDirectoryFactory");
+    System.setProperty("solr.ulog.numRecordsToKeep", "1000");
+
+    configureCluster(4)
+        .addConfig("conf", configset("cloud-minimal"))
+        .configure();
+    // Add proxies
+    proxies = new HashMap<>(cluster.getJettySolrRunners().size());
+    jettys = new HashMap<>();
+    for (JettySolrRunner jetty:cluster.getJettySolrRunners()) {
+      SocketProxy proxy = new SocketProxy();
+      jetty.setProxyPort(proxy.getListenPort());
+      cluster.stopJettySolrRunner(jetty);//TODO: Can we avoid this restart
+      cluster.startJettySolrRunner(jetty);
+      proxy.open(jetty.getBaseUrl().toURI());
+      LOG.info("Adding proxy for URL: " + jetty.getBaseUrl() + ". Proxy: " + proxy.getUrl());
+      proxies.put(jetty, proxy);
+      jettys.put(proxy.getUrl(), jetty);
+    }
+  }
+
+  @AfterClass
+  public static void tearDownCluster() throws Exception {
+    for (SocketProxy proxy:proxies.values()) {
+      proxy.close();
+    }
+    proxies = null;
+    jettys = null;
+  }
+
+  @Test
+  public void testOutOfSyncReplicasCannotBecomeLeader() throws Exception {
+    testOutOfSyncReplicasCannotBecomeLeader(false);
+  }
+
+  @Test
+  public void testOutOfSyncReplicasCannotBecomeLeaderAfterRestart() throws Exception {
+    testOutOfSyncReplicasCannotBecomeLeader(true);
+  }
+
+  public void testOutOfSyncReplicasCannotBecomeLeader(boolean onRestart) throws Exception {
+    final String collectionName = "outOfSyncReplicasCannotBecomeLeader-"+onRestart;
+    CollectionAdminRequest.createCollection(collectionName, 1, 3)
+        .setCreateNodeSet("")
+        .process(cluster.getSolrClient());
+    CollectionAdminRequest.addReplicaToShard(collectionName, "shard1")
+        .setNode(cluster.getJettySolrRunner(0).getNodeName())
+        .process(cluster.getSolrClient());
+    waitForState("Timeout waiting for shard leader", collectionName, clusterShape(1, 1));
+
+    CollectionAdminRequest.addReplicaToShard(collectionName, "shard1")
+        .setNode(cluster.getJettySolrRunner(1).getNodeName())
+        .process(cluster.getSolrClient());
+    CollectionAdminRequest.addReplicaToShard(collectionName, "shard1")
+        .setNode(cluster.getJettySolrRunner(2).getNodeName())
+        .process(cluster.getSolrClient());
+    waitForState("Timeout waiting for 1x3 collection", collectionName, clusterShape(1, 3));
+
+    addDocs(collectionName, 3, 1);
+
+    final Replica oldLeader = getCollectionState(collectionName).getSlice("shard1").getLeader();
+    assertEquals(cluster.getJettySolrRunner(0).getNodeName(), oldLeader.getNodeName());
+
+    if (onRestart) {
+      addDocToWhenOtherReplicasAreDown(collectionName, oldLeader, 4);
+    } else {
+      addDocWhenOtherReplicasAreNetworkPartitioned(collectionName, oldLeader, 4);
+    }
+
+    assertDocsExistInAllReplicas(getCollectionState(collectionName).getReplicas(), collectionName, 1, 4);
+
+    CollectionAdminRequest.deleteCollection(collectionName).process(cluster.getSolrClient());
+  }
+
+
+  /**
+   * Adding doc when replicas (not leader) are down,
+   * These replicas are out-of-sync hence they should not become leader even when current leader is DOWN.
+   * Leader should be on node - 0
+   */
+  private void addDocToWhenOtherReplicasAreDown(String collection, Replica leader, int docId) throws Exception {
+    ChaosMonkey.stop(cluster.getJettySolrRunner(1));
+    ChaosMonkey.stop(cluster.getJettySolrRunner(2));
+    waitForState("", collection, (liveNodes, collectionState) ->
+      collectionState.getSlice("shard1").getReplicas().stream()
+          .filter(replica -> replica.getState() == Replica.State.DOWN).count() == 2);
+
+    addDocs(collection, 1, docId);
+    ChaosMonkey.stop(cluster.getJettySolrRunner(0));
+    waitForState("", collection, (liveNodes, collectionState) -> collectionState.getReplica(leader.getName()).getState() == Replica.State.DOWN);
+
+    ChaosMonkey.start(cluster.getJettySolrRunner(1));
+    ChaosMonkey.start(cluster.getJettySolrRunner(2));
+    TimeOut timeOut = new TimeOut(10, TimeUnit.SECONDS, TimeSource.CURRENT_TIME);
+    while (!timeOut.hasTimedOut()) {
+      Replica newLeader = getCollectionState(collection).getSlice("shard1").getLeader();
+      if (newLeader != null && !newLeader.getName().equals(leader.getName()) && newLeader.getState() == Replica.State.ACTIVE) {
+        fail("Out of sync replica became leader " + newLeader);
+      }
+    }
+
+    ChaosMonkey.start(cluster.getJettySolrRunner(0));
+    waitForState("Timeout waiting for leader", collection, (liveNodes, collectionState) -> {
+      Replica newLeader = collectionState.getLeader("shard1");
+      return newLeader != null && newLeader.getName().equals(leader.getName());
+    });
+    waitForState("Timeout waiting for active collection", collection, clusterShape(1, 3));
+  }
+
+
+  /**
+   * Adding doc when replicas (not leader) are network partitioned with leader,
+   * These replicas are out-of-sync hence they should not become leader even when current leader is DOWN.
+   * Leader should be on node - 0
+   */
+  private void addDocWhenOtherReplicasAreNetworkPartitioned(String collection, Replica leader, int docId) throws Exception {
+    for (int i = 0; i < 3; i++) {
+      proxies.get(cluster.getJettySolrRunner(i)).close();
+    }
+    addDoc(collection, docId, cluster.getJettySolrRunner(0));
+    ChaosMonkey.stop(cluster.getJettySolrRunner(0));
+    for (int i = 1; i < 3; i++) {
+      proxies.get(cluster.getJettySolrRunner(i)).reopen();
+    }
+    waitForState("Timeout waiting for leader goes DOWN", collection, (liveNodes, collectionState)
+        -> collectionState.getReplica(leader.getName()).getState() == Replica.State.DOWN);
+
+    TimeOut timeOut = new TimeOut(10, TimeUnit.SECONDS, TimeSource.CURRENT_TIME);
+    while (!timeOut.hasTimedOut()) {
+      Replica newLeader = getCollectionState(collection).getLeader("shard1");
+      if (newLeader != null && !newLeader.getName().equals(leader.getName()) && newLeader.getState() == Replica.State.ACTIVE) {
+        fail("Out of sync replica became leader " + newLeader);
+      }
+    }
+
+    proxies.get(cluster.getJettySolrRunner(0)).reopen();
+    ChaosMonkey.start(cluster.getJettySolrRunner(0));
+    waitForState("Timeout waiting for leader", collection, (liveNodes, collectionState) -> {
+      Replica newLeader = collectionState.getLeader("shard1");
+      return newLeader != null && newLeader.getName().equals(leader.getName());
+    });
+    waitForState("Timeout waiting for active collection", collection, clusterShape(1, 3));
+  }
+
+  private void addDocs(String collection, int numDocs, int startId) throws SolrServerException, IOException {
+    List<SolrInputDocument> docs = new ArrayList<>(numDocs);
+    for (int i = 0; i < numDocs; i++) {
+      int id = startId + i;
+      docs.add(new SolrInputDocument("id", String.valueOf(id), "fieldName_s", String.valueOf(id)));
+    }
+    cluster.getSolrClient().add(collection, docs);
+    cluster.getSolrClient().commit(collection);
+  }
+
+  private void addDoc(String collection, int docId, JettySolrRunner solrRunner) throws IOException, SolrServerException {
+    try (HttpSolrClient solrClient = new HttpSolrClient.Builder(solrRunner.getBaseUrl().toString()).build()) {
+      solrClient.add(collection, new SolrInputDocument("id", String.valueOf(docId), "fieldName_s", String.valueOf(docId)));
+    }
+  }
+
+  private void assertDocsExistInAllReplicas(List<Replica> notLeaders,
+                                              String testCollectionName, int firstDocId, int lastDocId) throws Exception {
+    Replica leader =
+        cluster.getSolrClient().getZkStateReader().getLeaderRetry(testCollectionName, "shard1", 10000);
+    HttpSolrClient leaderSolr = getHttpSolrClient(leader, testCollectionName);
+    List<HttpSolrClient> replicas =
+        new ArrayList<HttpSolrClient>(notLeaders.size());
+
+    for (Replica r : notLeaders) {
+      replicas.add(getHttpSolrClient(r, testCollectionName));
+    }
+    try {
+      for (int d = firstDocId; d <= lastDocId; d++) {
+        String docId = String.valueOf(d);
+        assertDocExists(leaderSolr, testCollectionName, docId);
+        for (HttpSolrClient replicaSolr : replicas) {
+          assertDocExists(replicaSolr, testCollectionName, docId);
+        }
+      }
+    } finally {
+      if (leaderSolr != null) {
+        leaderSolr.close();
+      }
+      for (HttpSolrClient replicaSolr : replicas) {
+        replicaSolr.close();
+      }
+    }
+  }
+
+  private void assertDocExists(HttpSolrClient solr, String coll, String docId) throws Exception {
+    NamedList rsp = realTimeGetDocId(solr, docId);
+    String match = JSONTestUtil.matchObj("/id", rsp.get("doc"), docId);
+    assertTrue("Doc with id=" + docId + " not found in " + solr.getBaseURL()
+        + " due to: " + match + "; rsp="+rsp, match == null);
+  }
+
+  private NamedList realTimeGetDocId(HttpSolrClient solr, String docId) throws SolrServerException, IOException {
+    QueryRequest qr = new QueryRequest(params("qt", "/get", "id", docId, "distrib", "false"));
+    return solr.request(qr);
+  }
+
+  protected HttpSolrClient getHttpSolrClient(Replica replica, String coll) throws Exception {
+    ZkCoreNodeProps zkProps = new ZkCoreNodeProps(replica);
+    String url = zkProps.getBaseUrl() + "/" + coll;
+    return getHttpSolrClient(url);
+  }
+
+
+  protected JettySolrRunner getJettyForReplica(Replica replica) throws Exception {
+    String replicaBaseUrl = replica.getStr(ZkStateReader.BASE_URL_PROP);
+    assertNotNull(replicaBaseUrl);
+    URL baseUrl = new URL(replicaBaseUrl);
+
+    JettySolrRunner proxy = jettys.get(baseUrl.toURI());
+    assertNotNull("No proxy found for " + baseUrl + "!", proxy);
+    return proxy;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/9de4225e/solr/core/src/test/org/apache/solr/cloud/TestPullReplica.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/TestPullReplica.java b/solr/core/src/test/org/apache/solr/cloud/TestPullReplica.java
index 146f5c0..c10ec0f 100644
--- a/solr/core/src/test/org/apache/solr/cloud/TestPullReplica.java
+++ b/solr/core/src/test/org/apache/solr/cloud/TestPullReplica.java
@@ -425,10 +425,19 @@ public class TestPullReplica extends SolrCloudTestCase {
     Replica pullReplica = docCollection.getSlice("shard1").getReplicas(EnumSet.of(Replica.Type.PULL)).get(0);
     assertTrue(pullReplica.isActive(cluster.getSolrClient().getZkStateReader().getClusterState().getLiveNodes()));
 
+    long highestTerm = 0L;
+    try (ZkShardTerms zkShardTerms = new ZkShardTerms(collectionName, "shard1", zkClient())) {
+      highestTerm = zkShardTerms.getHighestTerm();
+    }
     // add document, this should fail since there is no leader. Pull replica should not accept the update
     expectThrows(SolrException.class, () -> 
       cluster.getSolrClient().add(collectionName, new SolrInputDocument("id", "2", "foo", "zoo"))
     );
+    if (removeReplica) {
+      try(ZkShardTerms zkShardTerms = new ZkShardTerms(collectionName, "shard1", zkClient())) {
+        assertEquals(highestTerm, zkShardTerms.getHighestTerm());
+      }
+    }
     
     // Also fails if I send the update to the pull replica explicitly
     try (HttpSolrClient pullReplicaClient = getHttpSolrClient(docCollection.getReplicas(EnumSet.of(Replica.Type.PULL)).get(0).getCoreUrl())) {
@@ -436,6 +445,11 @@ public class TestPullReplica extends SolrCloudTestCase {
         cluster.getSolrClient().add(collectionName, new SolrInputDocument("id", "2", "foo", "zoo"))
       );
     }
+    if (removeReplica) {
+      try(ZkShardTerms zkShardTerms = new ZkShardTerms(collectionName, "shard1", zkClient())) {
+        assertEquals(highestTerm, zkShardTerms.getHighestTerm());
+      }
+    }
     
     // Queries should still work
     waitForNumDocsInAllReplicas(1, docCollection.getReplicas(EnumSet.of(Replica.Type.PULL)));

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/9de4225e/solr/core/src/test/org/apache/solr/cloud/ZkShardTermsTest.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/ZkShardTermsTest.java b/solr/core/src/test/org/apache/solr/cloud/ZkShardTermsTest.java
index 037517b..d557b29 100644
--- a/solr/core/src/test/org/apache/solr/cloud/ZkShardTermsTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/ZkShardTermsTest.java
@@ -94,7 +94,7 @@ public class ZkShardTermsTest extends SolrCloudTestCase {
     assertEquals(1L, rep1Terms.getTerm("rep1"));
 
     waitFor(1L, () -> rep2Terms.getTerm("rep1"));
-    rep2Terms.setEqualsToMax("rep2");
+    rep2Terms.setTermEqualsToLeader("rep2");
     assertEquals(1L, rep2Terms.getTerm("rep2"));
     rep2Terms.registerTerm("rep2");
     assertEquals(1L, rep2Terms.getTerm("rep2"));
@@ -138,7 +138,7 @@ public class ZkShardTermsTest extends SolrCloudTestCase {
           while (!stop.get()) {
             try {
               Thread.sleep(random().nextInt(200));
-              zkShardTerms.setEqualsToMax(replica);
+              zkShardTerms.setTermEqualsToLeader(replica);
             } catch (InterruptedException e) {
               e.printStackTrace();
             }
@@ -178,7 +178,7 @@ public class ZkShardTermsTest extends SolrCloudTestCase {
     waitFor(1, count::get);
     leaderTerms.ensureTermsIsHigher("leader", Collections.singleton("replica"));
     waitFor(2, count::get);
-    replicaTerms.setEqualsToMax("replica");
+    replicaTerms.setTermEqualsToLeader("replica");
     waitFor(3, count::get);
     assertEquals(0, replicaTerms.getNumListeners());
 
@@ -194,6 +194,41 @@ public class ZkShardTermsTest extends SolrCloudTestCase {
     assertEquals(1L, terms.getTerm("leader").longValue());
   }
 
+  public void testSetTermToZero() {
+    String collection = "setTermToZero";
+    ZkShardTerms terms = new ZkShardTerms(collection, "shard1", cluster.getZkClient());
+    terms.registerTerm("leader");
+    terms.registerTerm("replica");
+    terms.ensureTermsIsHigher("leader", Collections.singleton("replica"));
+    assertEquals(1L, terms.getTerm("leader"));
+    terms.setTermToZero("leader");
+    assertEquals(0L, terms.getTerm("leader"));
+    terms.close();
+  }
+
+  public void testReplicaCanBecomeLeader() throws InterruptedException {
+    String collection = "replicaCanBecomeLeader";
+    ZkShardTerms leaderTerms = new ZkShardTerms(collection, "shard1", cluster.getZkClient());
+    ZkShardTerms replicaTerms = new ZkShardTerms(collection, "shard1", cluster.getZkClient());
+    leaderTerms.registerTerm("leader");
+    replicaTerms.registerTerm("replica");
+
+    leaderTerms.ensureTermsIsHigher("leader", Collections.singleton("replica"));
+    waitFor(false, () -> replicaTerms.canBecomeLeader("replica"));
+    waitFor(true, () -> leaderTerms.skipSendingUpdatesTo("replica"));
+
+    replicaTerms.startRecovering("replica");
+    waitFor(false, () -> replicaTerms.canBecomeLeader("replica"));
+    waitFor(false, () -> leaderTerms.skipSendingUpdatesTo("replica"));
+
+    replicaTerms.doneRecovering("replica");
+    waitFor(true, () -> replicaTerms.canBecomeLeader("replica"));
+    waitFor(false, () -> leaderTerms.skipSendingUpdatesTo("replica"));
+
+    leaderTerms.close();
+    replicaTerms.close();
+  }
+
   private <T> void waitFor(T expected, Supplier<T> supplier) throws InterruptedException {
     TimeOut timeOut = new TimeOut(10, TimeUnit.SECONDS, new TimeSource.CurrentTimeSource());
     while (!timeOut.hasTimedOut()) {