You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by no...@apache.org on 2019/12/03 04:37:44 UTC

[lucene-solr] branch branch_8x updated: SOLR-13995: Move ZkShardTerms.Terms to SolrJ

This is an automated email from the ASF dual-hosted git repository.

noble pushed a commit to branch branch_8x
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git


The following commit(s) were added to refs/heads/branch_8x by this push:
     new 664d935  SOLR-13995: Move ZkShardTerms.Terms to SolrJ
664d935 is described below

commit 664d93591f45336d3e89df002c29124cd07f334d
Author: noble <no...@apache.org>
AuthorDate: Tue Dec 3 15:16:34 2019 +1100

    SOLR-13995: Move ZkShardTerms.Terms to SolrJ
---
 .../solr/cloud/RecoveringCoreTermWatcher.java      |   3 +-
 .../java/org/apache/solr/cloud/ZkShardTerms.java   | 259 +++------------------
 .../org/apache/solr/cloud/ZkShardTermsTest.java    |   3 +-
 .../apache/solr/client/solrj/cloud/ShardTerms.java | 256 ++++++++++++++++++++
 4 files changed, 286 insertions(+), 235 deletions(-)

diff --git a/solr/core/src/java/org/apache/solr/cloud/RecoveringCoreTermWatcher.java b/solr/core/src/java/org/apache/solr/cloud/RecoveringCoreTermWatcher.java
index 007d221..5d4ec17 100644
--- a/solr/core/src/java/org/apache/solr/cloud/RecoveringCoreTermWatcher.java
+++ b/solr/core/src/java/org/apache/solr/cloud/RecoveringCoreTermWatcher.java
@@ -20,6 +20,7 @@ package org.apache.solr.cloud;
 import java.lang.invoke.MethodHandles;
 import java.util.concurrent.atomic.AtomicLong;
 
+import org.apache.solr.client.solrj.cloud.ShardTerms;
 import org.apache.solr.core.CoreContainer;
 import org.apache.solr.core.CoreDescriptor;
 import org.apache.solr.core.SolrCore;
@@ -44,7 +45,7 @@ public class RecoveringCoreTermWatcher implements ZkShardTerms.CoreTermWatcher {
   }
 
   @Override
-  public boolean onTermChanged(ZkShardTerms.Terms terms) {
+  public boolean onTermChanged(ShardTerms terms) {
     if (coreContainer.isShutDown()) return false;
 
     try (SolrCore solrCore = coreContainer.getCore(coreDescriptor.getName())) {
diff --git a/solr/core/src/java/org/apache/solr/cloud/ZkShardTerms.java b/solr/core/src/java/org/apache/solr/cloud/ZkShardTerms.java
index 2c97164..2c2a24a 100644
--- a/solr/core/src/java/org/apache/solr/cloud/ZkShardTerms.java
+++ b/solr/core/src/java/org/apache/solr/cloud/ZkShardTerms.java
@@ -18,15 +18,14 @@
 package org.apache.solr.cloud;
 
 import java.lang.invoke.MethodHandles;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
-import java.util.Objects;
 import java.util.Set;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicBoolean;
 
+import org.apache.solr.client.solrj.cloud.ShardTerms;
 import org.apache.solr.common.SolrException;
 import org.apache.solr.common.cloud.SolrZkClient;
 import org.apache.solr.common.cloud.ZkStateReader;
@@ -74,14 +73,12 @@ public class ZkShardTerms implements AutoCloseable{
   private final Set<CoreTermWatcher> listeners = new HashSet<>();
   private final AtomicBoolean isClosed = new AtomicBoolean(false);
 
-  private static final String RECOVERING_TERM_SUFFIX = "_recovering";
-
-  private Terms terms;
+  private ShardTerms terms;
 
   // Listener of a core for shard's term change events
   interface CoreTermWatcher {
     // return true if the listener wanna to be triggered in the next time
-    boolean onTermChanged(Terms terms);
+    boolean onTermChanged(ShardTerms terms);
   }
 
   public ZkShardTerms(String collection, String shard, SolrZkClient zkClient) {
@@ -103,12 +100,15 @@ public class ZkShardTerms implements AutoCloseable{
   public void ensureTermsIsHigher(String leader, Set<String> replicasNeedingRecovery) {
     if (replicasNeedingRecovery.isEmpty()) return;
 
-    Terms newTerms;
+    ShardTerms newTerms;
     while( (newTerms = terms.increaseTerms(leader, replicasNeedingRecovery)) != null) {
       if (forceSaveTerms(newTerms)) return;
     }
   }
 
+  public ShardTerms getShardTerms() {
+    return terms;
+  }
   /**
    * Can this replica become leader?
    * @param coreNodeName of the replica
@@ -148,7 +148,7 @@ public class ZkShardTerms implements AutoCloseable{
   // package private for testing, only used by tests
   Map<String, Long> getTerms() {
     synchronized (writingLock) {
-      return new HashMap<>(terms.values);
+      return terms.getTerms();
     }
   }
 
@@ -178,7 +178,7 @@ public class ZkShardTerms implements AutoCloseable{
   // package private for testing, only used by tests
   // return true if this object should not be reused
   boolean removeTerm(String coreNodeName) {
-    Terms newTerms;
+    ShardTerms newTerms;
     while ( (newTerms = terms.removeTerm(coreNodeName)) != null) {
       try {
         if (saveTerms(newTerms)) return false;
@@ -195,7 +195,7 @@ public class ZkShardTerms implements AutoCloseable{
    * @param coreNodeName of the replica
    */
   void registerTerm(String coreNodeName) {
-    Terms newTerms;
+    ShardTerms newTerms;
     while ( (newTerms = terms.registerTerm(coreNodeName)) != null) {
       if (forceSaveTerms(newTerms)) break;
     }
@@ -207,14 +207,14 @@ public class ZkShardTerms implements AutoCloseable{
    * @param coreNodeName of the replica
    */
   public void setTermEqualsToLeader(String coreNodeName) {
-    Terms newTerms;
+    ShardTerms newTerms;
     while ( (newTerms = terms.setTermEqualsToLeader(coreNodeName)) != null) {
       if (forceSaveTerms(newTerms)) break;
     }
   }
 
   public void setTermToZero(String coreNodeName) {
-    Terms newTerms;
+    ShardTerms newTerms;
     while ( (newTerms = terms.setTermToZero(coreNodeName)) != null) {
       if (forceSaveTerms(newTerms)) break;
     }
@@ -224,7 +224,7 @@ public class ZkShardTerms implements AutoCloseable{
    * Mark {@code coreNodeName} as recovering
    */
   public void startRecovering(String coreNodeName) {
-    Terms newTerms;
+    ShardTerms newTerms;
     while ( (newTerms = terms.startRecovering(coreNodeName)) != null) {
       if (forceSaveTerms(newTerms)) break;
     }
@@ -234,27 +234,22 @@ public class ZkShardTerms implements AutoCloseable{
    * Mark {@code coreNodeName} as finished recovering
    */
   public void doneRecovering(String coreNodeName) {
-    Terms newTerms;
+    ShardTerms newTerms;
     while ( (newTerms = terms.doneRecovering(coreNodeName)) != null) {
       if (forceSaveTerms(newTerms)) break;
     }
   }
 
   public boolean isRecovering(String name) {
-    return terms.values.containsKey(recoveringTerm(name));
-  }
-
-  public static String recoveringTerm(String coreNodeName) {
-    return coreNodeName + RECOVERING_TERM_SUFFIX;
+    return terms.isRecovering(name);
   }
 
-
   /**
    * When first updates come in, all replicas have some data now,
    * so we must switch from term 0 (registered) to 1 (have some data)
    */
   public void ensureHighestTermsAreNotZero() {
-    Terms newTerms;
+    ShardTerms newTerms;
     while ( (newTerms = terms.ensureHighestTermsAreNotZero()) != null) {
       if (forceSaveTerms(newTerms)) break;
     }
@@ -282,7 +277,7 @@ public class ZkShardTerms implements AutoCloseable{
    * @param newTerms to be set
    * @return true if terms is saved successfully to ZK, false if otherwise
    */
-  private boolean forceSaveTerms(Terms newTerms) {
+  private boolean forceSaveTerms(ShardTerms newTerms) {
     try {
       return saveTerms(newTerms);
     } catch (KeeperException.NoNodeException e) {
@@ -297,11 +292,11 @@ public class ZkShardTerms implements AutoCloseable{
    * @return true if terms is saved successfully to ZK, false if otherwise
    * @throws KeeperException.NoNodeException correspond ZK term node is not created
    */
-  private boolean saveTerms(Terms newTerms) throws KeeperException.NoNodeException {
-    byte[] znodeData = Utils.toJSON(newTerms.values);
+  private boolean saveTerms(ShardTerms newTerms) throws KeeperException.NoNodeException {
+    byte[] znodeData = Utils.toJSON(newTerms);
     try {
-      Stat stat = zkClient.setData(znodePath, znodeData, newTerms.version, true);
-      setNewTerms(new Terms(newTerms.values, stat.getVersion()));
+      Stat stat = zkClient.setData(znodePath, znodeData, newTerms.getVersion(), true);
+      setNewTerms(new ShardTerms(newTerms, stat.getVersion()));
       log.info("Successful update of terms at {} to {}", znodePath, newTerms);
       return true;
     } catch (KeeperException.BadVersionException e) {
@@ -344,11 +339,11 @@ public class ZkShardTerms implements AutoCloseable{
    * Fetch latest terms from ZK
    */
   public void refreshTerms() {
-    Terms newTerms;
+    ShardTerms newTerms;
     try {
       Stat stat = new Stat();
       byte[] data = zkClient.getData(znodePath, null, stat, true);
-      newTerms = new Terms((Map<String, Long>) Utils.fromJSON(data), stat.getVersion());
+      newTerms = new ShardTerms((Map<String, Long>) Utils.fromJSON(data), stat.getVersion());
     } catch (KeeperException e) {
       Thread.interrupted();
       throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Error updating shard term for collection: " + collection, e);
@@ -411,10 +406,10 @@ public class ZkShardTerms implements AutoCloseable{
    * Atomically update {@link ZkShardTerms#terms} and call listeners
    * @param newTerms to be set
    */
-  private void setNewTerms(Terms newTerms) {
+  private void setNewTerms(ShardTerms newTerms) {
     boolean isChanged = false;
     synchronized (writingLock) {
-      if (terms == null || newTerms.version > terms.version) {
+      if (terms == null || newTerms.getVersion() > terms.getVersion()) {
         terms = newTerms;
         isChanged = true;
       }
@@ -422,211 +417,9 @@ public class ZkShardTerms implements AutoCloseable{
     if (isChanged) onTermUpdates(newTerms);
   }
 
-  private void onTermUpdates(Terms newTerms) {
+  private void onTermUpdates(ShardTerms newTerms) {
     synchronized (listeners) {
       listeners.removeIf(coreTermWatcher -> !coreTermWatcher.onTermChanged(newTerms));
     }
   }
-
-  /**
-   * Hold values of terms, this class is immutable
-   */
-  static class Terms {
-    private final Map<String, Long> values;
-    private final long maxTerm;
-    // ZK node version
-    private final int version;
-
-    public Terms () {
-      this(new HashMap<>(), 0);
-    }
-
-    public Terms(Map<String, Long> values, int version) {
-      this.values = values;
-      this.version = version;
-      if (values.isEmpty()) this.maxTerm = 0;
-      else this.maxTerm = Collections.max(values.values());
-    }
-
-    /**
-     * Can {@code coreNodeName} become leader?
-     * @param coreNodeName of the replica
-     * @return true if {@code coreNodeName} can become leader, false if otherwise
-     */
-    boolean canBecomeLeader(String coreNodeName) {
-      return haveHighestTermValue(coreNodeName) && !values.containsKey(recoveringTerm(coreNodeName));
-    }
-
-    /**
-     * Is {@code coreNodeName}'s term highest?
-     * @param coreNodeName of the replica
-     * @return true if term of {@code coreNodeName} is highest
-     */
-    boolean haveHighestTermValue(String coreNodeName) {
-      if (values.isEmpty()) return true;
-      long maxTerm = Collections.max(values.values());
-      return values.getOrDefault(coreNodeName, 0L) == maxTerm;
-    }
-
-    Long getTerm(String coreNodeName) {
-      return values.get(coreNodeName);
-    }
-
-    /**
-     * Return a new {@link Terms} in which term of {@code leader} is higher than {@code replicasNeedingRecovery}
-     * @param leader coreNodeName of leader
-     * @param replicasNeedingRecovery set of replicas in which their terms should be lower than leader's term
-     * @return null if term of {@code leader} is already higher than {@code replicasNeedingRecovery}
-     */
-    Terms increaseTerms(String leader, Set<String> replicasNeedingRecovery) {
-      if (!values.containsKey(leader)) {
-        throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Can not find leader's term " + leader);
-      }
-
-      boolean changed = false;
-      boolean foundReplicasInLowerTerms = false;
-
-      HashMap<String, Long> newValues = new HashMap<>(values);
-      long leaderTerm = newValues.get(leader);
-      for (Map.Entry<String, Long> entry : newValues.entrySet()) {
-        String key = entry.getKey();
-        if (replicasNeedingRecovery.contains(key)) foundReplicasInLowerTerms = true;
-        if (Objects.equals(entry.getValue(), leaderTerm)) {
-          if(skipIncreaseTermOf(key, replicasNeedingRecovery)) {
-            changed = true;
-          } else {
-            newValues.put(key, leaderTerm+1);
-          }
-        }
-      }
-
-      // We should skip the optimization if there are no replicasNeedingRecovery present in local terms,
-      // this may indicate that the current value is stale
-      if (!changed && foundReplicasInLowerTerms) return null;
-      return new Terms(newValues, version);
-    }
-
-    private boolean skipIncreaseTermOf(String key, Set<String> replicasNeedingRecovery) {
-      if (key.endsWith(RECOVERING_TERM_SUFFIX)) {
-        key = key.substring(0, key.length() - RECOVERING_TERM_SUFFIX.length());
-      }
-      return replicasNeedingRecovery.contains(key);
-    }
-
-    /**
-     * Return a new {@link Terms} in which highest terms are not zero
-     * @return null if highest terms are already larger than zero
-     */
-    Terms ensureHighestTermsAreNotZero() {
-      if (maxTerm > 0) return null;
-      else {
-        HashMap<String, Long> newValues = new HashMap<>(values);
-        for (String replica : values.keySet()) {
-          newValues.put(replica, 1L);
-        }
-        return new Terms(newValues, version);
-      }
-    }
-
-    /**
-     * Return a new {@link Terms} in which terms for the {@code coreNodeName} are removed
-     * @param coreNodeName of the replica
-     * @return null if term of {@code coreNodeName} is already not exist
-     */
-    Terms removeTerm(String coreNodeName) {
-      if (!values.containsKey(recoveringTerm(coreNodeName)) && !values.containsKey(coreNodeName)) {
-        return null;
-      }
-
-      HashMap<String, Long> newValues = new HashMap<>(values);
-      newValues.remove(coreNodeName);
-      newValues.remove(recoveringTerm(coreNodeName));
-
-      return new Terms(newValues, version);
-    }
-
-    /**
-     * Return a new {@link Terms} in which the associate term of {@code coreNodeName} is not null
-     * @param coreNodeName of the replica
-     * @return null if term of {@code coreNodeName} is already exist
-     */
-    Terms registerTerm(String coreNodeName) {
-      if (values.containsKey(coreNodeName)) return null;
-
-      HashMap<String, Long> newValues = new HashMap<>(values);
-      newValues.put(coreNodeName, 0L);
-      return new Terms(newValues, version);
-    }
-
-    Terms setTermToZero(String coreNodeName) {
-      if (values.getOrDefault(coreNodeName, -1L) == 0) {
-        return null;
-      }
-      HashMap<String, Long> newValues = new HashMap<>(values);
-      newValues.put(coreNodeName, 0L);
-      return new Terms(newValues, version);
-    }
-
-    /**
-     * Return a new {@link Terms} in which the term of {@code coreNodeName} is max
-     * @param coreNodeName of the replica
-     * @return null if term of {@code coreNodeName} is already maximum
-     */
-    Terms setTermEqualsToLeader(String coreNodeName) {
-      long maxTerm = getMaxTerm();
-      if (values.get(coreNodeName) == maxTerm) return null;
-
-      HashMap<String, Long> newValues = new HashMap<>(values);
-      newValues.put(coreNodeName, maxTerm);
-      newValues.remove(recoveringTerm(coreNodeName));
-      return new Terms(newValues, version);
-    }
-
-    long getMaxTerm() {
-      return maxTerm;
-    }
-
-    /**
-     * Mark {@code coreNodeName} as recovering
-     * @param coreNodeName of the replica
-     * @return null if {@code coreNodeName} is already marked as doing recovering
-     */
-    Terms startRecovering(String coreNodeName) {
-      long maxTerm = getMaxTerm();
-      if (values.get(coreNodeName) == maxTerm)
-        return null;
-
-      HashMap<String, Long> newValues = new HashMap<>(values);
-      if (!newValues.containsKey(recoveringTerm(coreNodeName))) {
-        long currentTerm = newValues.getOrDefault(coreNodeName, 0L);
-        // by keeping old term, we will have more information in leader election
-        newValues.put(recoveringTerm(coreNodeName), currentTerm);
-      }
-      newValues.put(coreNodeName, maxTerm);
-      return new Terms(newValues, version);
-    }
-
-    /**
-     * Mark {@code coreNodeName} as finished recovering
-     * @param coreNodeName of the replica
-     * @return null if term of {@code coreNodeName} is already finished doing recovering
-     */
-    Terms doneRecovering(String coreNodeName) {
-      if (!values.containsKey(recoveringTerm(coreNodeName))) {
-        return null;
-      }
-
-      HashMap<String, Long> newValues = new HashMap<>(values);
-      newValues.remove(recoveringTerm(coreNodeName));
-      return new Terms(newValues, version);
-    }
-
-    @Override
-    public String toString() {
-      return "Terms{" +
-          "values=" + values +
-          ", version=" + version +
-          '}';
-    }
-  }
 }
diff --git a/solr/core/src/test/org/apache/solr/cloud/ZkShardTermsTest.java b/solr/core/src/test/org/apache/solr/cloud/ZkShardTermsTest.java
index a56202a..56ed8ae7 100644
--- a/solr/core/src/test/org/apache/solr/cloud/ZkShardTermsTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/ZkShardTermsTest.java
@@ -32,6 +32,7 @@ import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.Supplier;
 
 import org.apache.solr.client.solrj.SolrServerException;
+import org.apache.solr.client.solrj.cloud.ShardTerms;
 import org.apache.solr.client.solrj.request.CollectionAdminRequest;
 import org.apache.solr.common.util.TimeSource;
 import org.apache.solr.util.TimeOut;
@@ -267,7 +268,7 @@ public class ZkShardTermsTest extends SolrCloudTestCase {
   public void testEnsureTermsIsHigher() {
     Map<String, Long> map = new HashMap<>();
     map.put("leader", 0L);
-    ZkShardTerms.Terms terms = new ZkShardTerms.Terms(map, 0);
+    ShardTerms terms = new ShardTerms(map, 0);
     terms = terms.increaseTerms("leader", Collections.singleton("replica"));
     assertEquals(1L, terms.getTerm("leader").longValue());
   }
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/ShardTerms.java b/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/ShardTerms.java
new file mode 100644
index 0000000..3b2f754
--- /dev/null
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/ShardTerms.java
@@ -0,0 +1,256 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.client.solrj.cloud;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+
+import org.apache.solr.common.MapWriter;
+import org.apache.solr.common.SolrException;
+
+/**
+ * Hold values of terms, this class is immutable. Create a new instance for every mutation
+ */
+public class ShardTerms implements MapWriter {
+  private static final String RECOVERING_TERM_SUFFIX = "_recovering";
+  private final Map<String, Long> values;
+  private final long maxTerm;
+  // ZK node version
+  private final int version;
+
+  public ShardTerms () {
+    this(new HashMap<>(), 0);
+  }
+
+  public ShardTerms(ShardTerms newTerms, int version) {
+    this(newTerms.values, version);
+  }
+
+  @Override
+  public void writeMap(EntryWriter ew) throws IOException {
+    values.forEach(ew.getBiConsumer());
+  }
+
+  public ShardTerms(Map<String, Long> values, int version) {
+    this.values = values;
+    this.version = version;
+    if (values.isEmpty()) this.maxTerm = 0;
+    else this.maxTerm = Collections.max(values.values());
+  }
+
+  /**
+   * Can {@code coreNodeName} become leader?
+   * @param coreNodeName of the replica
+   * @return true if {@code coreNodeName} can become leader, false if otherwise
+   */
+  public boolean canBecomeLeader(String coreNodeName) {
+    return haveHighestTermValue(coreNodeName) && !values.containsKey(recoveringTerm(coreNodeName));
+  }
+
+  /**
+   * Is {@code coreNodeName}'s term highest?
+   * @param coreNodeName of the replica
+   * @return true if term of {@code coreNodeName} is highest
+   */
+  public boolean haveHighestTermValue(String coreNodeName) {
+    if (values.isEmpty()) return true;
+    long maxTerm = Collections.max(values.values());
+    return values.getOrDefault(coreNodeName, 0L) == maxTerm;
+  }
+
+  public Long getTerm(String coreNodeName) {
+    return values.get(coreNodeName);
+  }
+
+  /**
+   * Return a new {@link ShardTerms} in which term of {@code leader} is higher than {@code replicasNeedingRecovery}
+   * @param leader coreNodeName of leader
+   * @param replicasNeedingRecovery set of replicas in which their terms should be lower than leader's term
+   * @return null if term of {@code leader} is already higher than {@code replicasNeedingRecovery}
+   */
+  public ShardTerms increaseTerms(String leader, Set<String> replicasNeedingRecovery) {
+    if (!values.containsKey(leader)) {
+      throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Can not find leader's term " + leader);
+    }
+
+    boolean changed = false;
+    boolean foundReplicasInLowerTerms = false;
+
+    HashMap<String, Long> newValues = new HashMap<>(values);
+    long leaderTerm = newValues.get(leader);
+    for (Map.Entry<String, Long> entry : newValues.entrySet()) {
+      String key = entry.getKey();
+      if (replicasNeedingRecovery.contains(key)) foundReplicasInLowerTerms = true;
+      if (Objects.equals(entry.getValue(), leaderTerm)) {
+        if(skipIncreaseTermOf(key, replicasNeedingRecovery)) {
+          changed = true;
+        } else {
+          newValues.put(key, leaderTerm+1);
+        }
+      }
+    }
+
+    // We should skip the optimization if there are no replicasNeedingRecovery present in local terms,
+    // this may indicate that the current value is stale
+    if (!changed && foundReplicasInLowerTerms) return null;
+    return new ShardTerms(newValues, version);
+  }
+
+  private boolean skipIncreaseTermOf(String key, Set<String> replicasNeedingRecovery) {
+    if (key.endsWith(RECOVERING_TERM_SUFFIX)) {
+      key = key.substring(0, key.length() - RECOVERING_TERM_SUFFIX.length());
+    }
+    return replicasNeedingRecovery.contains(key);
+  }
+
+  /**
+   * Return a new {@link ShardTerms} in which highest terms are not zero
+   * @return null if highest terms are already larger than zero
+   */
+  public ShardTerms ensureHighestTermsAreNotZero() {
+    if (maxTerm > 0) return null;
+    else {
+      HashMap<String, Long> newValues = new HashMap<>(values);
+      for (String replica : values.keySet()) {
+        newValues.put(replica, 1L);
+      }
+      return new ShardTerms(newValues, version);
+    }
+  }
+
+  /**
+   * Return a new {@link ShardTerms} in which terms for the {@code coreNodeName} are removed
+   * @param coreNodeName of the replica
+   * @return null if term of {@code coreNodeName} is already not exist
+   */
+  public ShardTerms removeTerm(String coreNodeName) {
+    if (!values.containsKey(recoveringTerm(coreNodeName)) && !values.containsKey(coreNodeName)) {
+      return null;
+    }
+
+    HashMap<String, Long> newValues = new HashMap<>(values);
+    newValues.remove(coreNodeName);
+    newValues.remove(recoveringTerm(coreNodeName));
+
+    return new ShardTerms(newValues, version);
+  }
+
+  /**
+   * Return a new {@link ShardTerms} in which the associate term of {@code coreNodeName} is not null
+   * @param coreNodeName of the replica
+   * @return null if term of {@code coreNodeName} is already exist
+   */
+  public ShardTerms registerTerm(String coreNodeName) {
+    if (values.containsKey(coreNodeName)) return null;
+
+    HashMap<String, Long> newValues = new HashMap<>(values);
+    newValues.put(coreNodeName, 0L);
+    return new ShardTerms(newValues, version);
+  }
+
+  public ShardTerms setTermToZero(String coreNodeName) {
+    if (values.getOrDefault(coreNodeName, -1L) == 0) {
+      return null;
+    }
+    HashMap<String, Long> newValues = new HashMap<>(values);
+    newValues.put(coreNodeName, 0L);
+    return new ShardTerms(newValues, version);
+  }
+
+  /**
+   * Return a new {@link ShardTerms} in which the term of {@code coreNodeName} is max
+   * @param coreNodeName of the replica
+   * @return null if term of {@code coreNodeName} is already maximum
+   */
+  public ShardTerms setTermEqualsToLeader(String coreNodeName) {
+    long maxTerm = getMaxTerm();
+    if (values.get(coreNodeName) == maxTerm) return null;
+
+    HashMap<String, Long> newValues = new HashMap<>(values);
+    newValues.put(coreNodeName, maxTerm);
+    newValues.remove(recoveringTerm(coreNodeName));
+    return new ShardTerms(newValues, version);
+  }
+
+  public long getMaxTerm() {
+    return maxTerm;
+  }
+
+  /**
+   * Mark {@code coreNodeName} as recovering
+   * @param coreNodeName of the replica
+   * @return null if {@code coreNodeName} is already marked as doing recovering
+   */
+  public ShardTerms startRecovering(String coreNodeName) {
+    long maxTerm = getMaxTerm();
+    if (values.get(coreNodeName) == maxTerm)
+      return null;
+
+    HashMap<String, Long> newValues = new HashMap<>(values);
+    if (!newValues.containsKey(recoveringTerm(coreNodeName))) {
+      long currentTerm = newValues.getOrDefault(coreNodeName, 0L);
+      // by keeping old term, we will have more information in leader election
+      newValues.put(recoveringTerm(coreNodeName), currentTerm);
+    }
+    newValues.put(coreNodeName, maxTerm);
+    return new ShardTerms(newValues, version);
+  }
+
+  /**
+   * Mark {@code coreNodeName} as finished recovering
+   * @param coreNodeName of the replica
+   * @return null if term of {@code coreNodeName} is already finished doing recovering
+   */
+  public ShardTerms doneRecovering(String coreNodeName) {
+    if (!values.containsKey(recoveringTerm(coreNodeName))) {
+      return null;
+    }
+
+    HashMap<String, Long> newValues = new HashMap<>(values);
+    newValues.remove(recoveringTerm(coreNodeName));
+    return new ShardTerms(newValues, version);
+  }
+
+  public static String recoveringTerm(String coreNodeName) {
+    return coreNodeName + RECOVERING_TERM_SUFFIX;
+  }
+
+  @Override
+  public String toString() {
+    return "Terms{" +
+        "values=" + values +
+        ", version=" + version +
+        '}';
+  }
+
+  public int getVersion() {
+    return version;
+  }
+
+  public Map<String , Long> getTerms() {
+    return new HashMap<>(this.values);
+  }
+
+  public boolean isRecovering(String name) {
+    return values.containsKey(recoveringTerm(name));
+  }
+}