You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by no...@apache.org on 2019/12/03 04:37:44 UTC
[lucene-solr] branch branch_8x updated: SOLR-13995: Move
ZkShardTerms.Terms to SolrJ
This is an automated email from the ASF dual-hosted git repository.
noble pushed a commit to branch branch_8x
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git
The following commit(s) were added to refs/heads/branch_8x by this push:
new 664d935 SOLR-13995: Move ZkShardTerms.Terms to SolrJ
664d935 is described below
commit 664d93591f45336d3e89df002c29124cd07f334d
Author: noble <no...@apache.org>
AuthorDate: Tue Dec 3 15:16:34 2019 +1100
SOLR-13995: Move ZkShardTerms.Terms to SolrJ
---
.../solr/cloud/RecoveringCoreTermWatcher.java | 3 +-
.../java/org/apache/solr/cloud/ZkShardTerms.java | 259 +++------------------
.../org/apache/solr/cloud/ZkShardTermsTest.java | 3 +-
.../apache/solr/client/solrj/cloud/ShardTerms.java | 256 ++++++++++++++++++++
4 files changed, 286 insertions(+), 235 deletions(-)
diff --git a/solr/core/src/java/org/apache/solr/cloud/RecoveringCoreTermWatcher.java b/solr/core/src/java/org/apache/solr/cloud/RecoveringCoreTermWatcher.java
index 007d221..5d4ec17 100644
--- a/solr/core/src/java/org/apache/solr/cloud/RecoveringCoreTermWatcher.java
+++ b/solr/core/src/java/org/apache/solr/cloud/RecoveringCoreTermWatcher.java
@@ -20,6 +20,7 @@ package org.apache.solr.cloud;
import java.lang.invoke.MethodHandles;
import java.util.concurrent.atomic.AtomicLong;
+import org.apache.solr.client.solrj.cloud.ShardTerms;
import org.apache.solr.core.CoreContainer;
import org.apache.solr.core.CoreDescriptor;
import org.apache.solr.core.SolrCore;
@@ -44,7 +45,7 @@ public class RecoveringCoreTermWatcher implements ZkShardTerms.CoreTermWatcher {
}
@Override
- public boolean onTermChanged(ZkShardTerms.Terms terms) {
+ public boolean onTermChanged(ShardTerms terms) {
if (coreContainer.isShutDown()) return false;
try (SolrCore solrCore = coreContainer.getCore(coreDescriptor.getName())) {
diff --git a/solr/core/src/java/org/apache/solr/cloud/ZkShardTerms.java b/solr/core/src/java/org/apache/solr/cloud/ZkShardTerms.java
index 2c97164..2c2a24a 100644
--- a/solr/core/src/java/org/apache/solr/cloud/ZkShardTerms.java
+++ b/solr/core/src/java/org/apache/solr/cloud/ZkShardTerms.java
@@ -18,15 +18,14 @@
package org.apache.solr.cloud;
import java.lang.invoke.MethodHandles;
-import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
-import java.util.Objects;
import java.util.Set;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.solr.client.solrj.cloud.ShardTerms;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.cloud.SolrZkClient;
import org.apache.solr.common.cloud.ZkStateReader;
@@ -74,14 +73,12 @@ public class ZkShardTerms implements AutoCloseable{
private final Set<CoreTermWatcher> listeners = new HashSet<>();
private final AtomicBoolean isClosed = new AtomicBoolean(false);
- private static final String RECOVERING_TERM_SUFFIX = "_recovering";
-
- private Terms terms;
+ private ShardTerms terms;
// Listener of a core for shard's term change events
interface CoreTermWatcher {
// return true if the listener wanna to be triggered in the next time
- boolean onTermChanged(Terms terms);
+ boolean onTermChanged(ShardTerms terms);
}
public ZkShardTerms(String collection, String shard, SolrZkClient zkClient) {
@@ -103,12 +100,15 @@ public class ZkShardTerms implements AutoCloseable{
public void ensureTermsIsHigher(String leader, Set<String> replicasNeedingRecovery) {
if (replicasNeedingRecovery.isEmpty()) return;
- Terms newTerms;
+ ShardTerms newTerms;
while( (newTerms = terms.increaseTerms(leader, replicasNeedingRecovery)) != null) {
if (forceSaveTerms(newTerms)) return;
}
}
+ public ShardTerms getShardTerms() {
+ return terms;
+ }
/**
* Can this replica become leader?
* @param coreNodeName of the replica
@@ -148,7 +148,7 @@ public class ZkShardTerms implements AutoCloseable{
// package private for testing, only used by tests
Map<String, Long> getTerms() {
synchronized (writingLock) {
- return new HashMap<>(terms.values);
+ return terms.getTerms();
}
}
@@ -178,7 +178,7 @@ public class ZkShardTerms implements AutoCloseable{
// package private for testing, only used by tests
// return true if this object should not be reused
boolean removeTerm(String coreNodeName) {
- Terms newTerms;
+ ShardTerms newTerms;
while ( (newTerms = terms.removeTerm(coreNodeName)) != null) {
try {
if (saveTerms(newTerms)) return false;
@@ -195,7 +195,7 @@ public class ZkShardTerms implements AutoCloseable{
* @param coreNodeName of the replica
*/
void registerTerm(String coreNodeName) {
- Terms newTerms;
+ ShardTerms newTerms;
while ( (newTerms = terms.registerTerm(coreNodeName)) != null) {
if (forceSaveTerms(newTerms)) break;
}
@@ -207,14 +207,14 @@ public class ZkShardTerms implements AutoCloseable{
* @param coreNodeName of the replica
*/
public void setTermEqualsToLeader(String coreNodeName) {
- Terms newTerms;
+ ShardTerms newTerms;
while ( (newTerms = terms.setTermEqualsToLeader(coreNodeName)) != null) {
if (forceSaveTerms(newTerms)) break;
}
}
public void setTermToZero(String coreNodeName) {
- Terms newTerms;
+ ShardTerms newTerms;
while ( (newTerms = terms.setTermToZero(coreNodeName)) != null) {
if (forceSaveTerms(newTerms)) break;
}
@@ -224,7 +224,7 @@ public class ZkShardTerms implements AutoCloseable{
* Mark {@code coreNodeName} as recovering
*/
public void startRecovering(String coreNodeName) {
- Terms newTerms;
+ ShardTerms newTerms;
while ( (newTerms = terms.startRecovering(coreNodeName)) != null) {
if (forceSaveTerms(newTerms)) break;
}
@@ -234,27 +234,22 @@ public class ZkShardTerms implements AutoCloseable{
* Mark {@code coreNodeName} as finished recovering
*/
public void doneRecovering(String coreNodeName) {
- Terms newTerms;
+ ShardTerms newTerms;
while ( (newTerms = terms.doneRecovering(coreNodeName)) != null) {
if (forceSaveTerms(newTerms)) break;
}
}
public boolean isRecovering(String name) {
- return terms.values.containsKey(recoveringTerm(name));
- }
-
- public static String recoveringTerm(String coreNodeName) {
- return coreNodeName + RECOVERING_TERM_SUFFIX;
+ return terms.isRecovering(name);
}
-
/**
* When first updates come in, all replicas have some data now,
* so we must switch from term 0 (registered) to 1 (have some data)
*/
public void ensureHighestTermsAreNotZero() {
- Terms newTerms;
+ ShardTerms newTerms;
while ( (newTerms = terms.ensureHighestTermsAreNotZero()) != null) {
if (forceSaveTerms(newTerms)) break;
}
@@ -282,7 +277,7 @@ public class ZkShardTerms implements AutoCloseable{
* @param newTerms to be set
* @return true if terms is saved successfully to ZK, false if otherwise
*/
- private boolean forceSaveTerms(Terms newTerms) {
+ private boolean forceSaveTerms(ShardTerms newTerms) {
try {
return saveTerms(newTerms);
} catch (KeeperException.NoNodeException e) {
@@ -297,11 +292,11 @@ public class ZkShardTerms implements AutoCloseable{
* @return true if terms is saved successfully to ZK, false if otherwise
* @throws KeeperException.NoNodeException correspond ZK term node is not created
*/
- private boolean saveTerms(Terms newTerms) throws KeeperException.NoNodeException {
- byte[] znodeData = Utils.toJSON(newTerms.values);
+ private boolean saveTerms(ShardTerms newTerms) throws KeeperException.NoNodeException {
+ byte[] znodeData = Utils.toJSON(newTerms);
try {
- Stat stat = zkClient.setData(znodePath, znodeData, newTerms.version, true);
- setNewTerms(new Terms(newTerms.values, stat.getVersion()));
+ Stat stat = zkClient.setData(znodePath, znodeData, newTerms.getVersion(), true);
+ setNewTerms(new ShardTerms(newTerms, stat.getVersion()));
log.info("Successful update of terms at {} to {}", znodePath, newTerms);
return true;
} catch (KeeperException.BadVersionException e) {
@@ -344,11 +339,11 @@ public class ZkShardTerms implements AutoCloseable{
* Fetch latest terms from ZK
*/
public void refreshTerms() {
- Terms newTerms;
+ ShardTerms newTerms;
try {
Stat stat = new Stat();
byte[] data = zkClient.getData(znodePath, null, stat, true);
- newTerms = new Terms((Map<String, Long>) Utils.fromJSON(data), stat.getVersion());
+ newTerms = new ShardTerms((Map<String, Long>) Utils.fromJSON(data), stat.getVersion());
} catch (KeeperException e) {
Thread.interrupted();
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Error updating shard term for collection: " + collection, e);
@@ -411,10 +406,10 @@ public class ZkShardTerms implements AutoCloseable{
* Atomically update {@link ZkShardTerms#terms} and call listeners
* @param newTerms to be set
*/
- private void setNewTerms(Terms newTerms) {
+ private void setNewTerms(ShardTerms newTerms) {
boolean isChanged = false;
synchronized (writingLock) {
- if (terms == null || newTerms.version > terms.version) {
+ if (terms == null || newTerms.getVersion() > terms.getVersion()) {
terms = newTerms;
isChanged = true;
}
@@ -422,211 +417,9 @@ public class ZkShardTerms implements AutoCloseable{
if (isChanged) onTermUpdates(newTerms);
}
- private void onTermUpdates(Terms newTerms) {
+ private void onTermUpdates(ShardTerms newTerms) {
synchronized (listeners) {
listeners.removeIf(coreTermWatcher -> !coreTermWatcher.onTermChanged(newTerms));
}
}
-
- /**
- * Hold values of terms, this class is immutable
- */
- static class Terms {
- private final Map<String, Long> values;
- private final long maxTerm;
- // ZK node version
- private final int version;
-
- public Terms () {
- this(new HashMap<>(), 0);
- }
-
- public Terms(Map<String, Long> values, int version) {
- this.values = values;
- this.version = version;
- if (values.isEmpty()) this.maxTerm = 0;
- else this.maxTerm = Collections.max(values.values());
- }
-
- /**
- * Can {@code coreNodeName} become leader?
- * @param coreNodeName of the replica
- * @return true if {@code coreNodeName} can become leader, false if otherwise
- */
- boolean canBecomeLeader(String coreNodeName) {
- return haveHighestTermValue(coreNodeName) && !values.containsKey(recoveringTerm(coreNodeName));
- }
-
- /**
- * Is {@code coreNodeName}'s term highest?
- * @param coreNodeName of the replica
- * @return true if term of {@code coreNodeName} is highest
- */
- boolean haveHighestTermValue(String coreNodeName) {
- if (values.isEmpty()) return true;
- long maxTerm = Collections.max(values.values());
- return values.getOrDefault(coreNodeName, 0L) == maxTerm;
- }
-
- Long getTerm(String coreNodeName) {
- return values.get(coreNodeName);
- }
-
- /**
- * Return a new {@link Terms} in which term of {@code leader} is higher than {@code replicasNeedingRecovery}
- * @param leader coreNodeName of leader
- * @param replicasNeedingRecovery set of replicas in which their terms should be lower than leader's term
- * @return null if term of {@code leader} is already higher than {@code replicasNeedingRecovery}
- */
- Terms increaseTerms(String leader, Set<String> replicasNeedingRecovery) {
- if (!values.containsKey(leader)) {
- throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Can not find leader's term " + leader);
- }
-
- boolean changed = false;
- boolean foundReplicasInLowerTerms = false;
-
- HashMap<String, Long> newValues = new HashMap<>(values);
- long leaderTerm = newValues.get(leader);
- for (Map.Entry<String, Long> entry : newValues.entrySet()) {
- String key = entry.getKey();
- if (replicasNeedingRecovery.contains(key)) foundReplicasInLowerTerms = true;
- if (Objects.equals(entry.getValue(), leaderTerm)) {
- if(skipIncreaseTermOf(key, replicasNeedingRecovery)) {
- changed = true;
- } else {
- newValues.put(key, leaderTerm+1);
- }
- }
- }
-
- // We should skip the optimization if there are no replicasNeedingRecovery present in local terms,
- // this may indicate that the current value is stale
- if (!changed && foundReplicasInLowerTerms) return null;
- return new Terms(newValues, version);
- }
-
- private boolean skipIncreaseTermOf(String key, Set<String> replicasNeedingRecovery) {
- if (key.endsWith(RECOVERING_TERM_SUFFIX)) {
- key = key.substring(0, key.length() - RECOVERING_TERM_SUFFIX.length());
- }
- return replicasNeedingRecovery.contains(key);
- }
-
- /**
- * Return a new {@link Terms} in which highest terms are not zero
- * @return null if highest terms are already larger than zero
- */
- Terms ensureHighestTermsAreNotZero() {
- if (maxTerm > 0) return null;
- else {
- HashMap<String, Long> newValues = new HashMap<>(values);
- for (String replica : values.keySet()) {
- newValues.put(replica, 1L);
- }
- return new Terms(newValues, version);
- }
- }
-
- /**
- * Return a new {@link Terms} in which terms for the {@code coreNodeName} are removed
- * @param coreNodeName of the replica
- * @return null if term of {@code coreNodeName} is already not exist
- */
- Terms removeTerm(String coreNodeName) {
- if (!values.containsKey(recoveringTerm(coreNodeName)) && !values.containsKey(coreNodeName)) {
- return null;
- }
-
- HashMap<String, Long> newValues = new HashMap<>(values);
- newValues.remove(coreNodeName);
- newValues.remove(recoveringTerm(coreNodeName));
-
- return new Terms(newValues, version);
- }
-
- /**
- * Return a new {@link Terms} in which the associate term of {@code coreNodeName} is not null
- * @param coreNodeName of the replica
- * @return null if term of {@code coreNodeName} is already exist
- */
- Terms registerTerm(String coreNodeName) {
- if (values.containsKey(coreNodeName)) return null;
-
- HashMap<String, Long> newValues = new HashMap<>(values);
- newValues.put(coreNodeName, 0L);
- return new Terms(newValues, version);
- }
-
- Terms setTermToZero(String coreNodeName) {
- if (values.getOrDefault(coreNodeName, -1L) == 0) {
- return null;
- }
- HashMap<String, Long> newValues = new HashMap<>(values);
- newValues.put(coreNodeName, 0L);
- return new Terms(newValues, version);
- }
-
- /**
- * Return a new {@link Terms} in which the term of {@code coreNodeName} is max
- * @param coreNodeName of the replica
- * @return null if term of {@code coreNodeName} is already maximum
- */
- Terms setTermEqualsToLeader(String coreNodeName) {
- long maxTerm = getMaxTerm();
- if (values.get(coreNodeName) == maxTerm) return null;
-
- HashMap<String, Long> newValues = new HashMap<>(values);
- newValues.put(coreNodeName, maxTerm);
- newValues.remove(recoveringTerm(coreNodeName));
- return new Terms(newValues, version);
- }
-
- long getMaxTerm() {
- return maxTerm;
- }
-
- /**
- * Mark {@code coreNodeName} as recovering
- * @param coreNodeName of the replica
- * @return null if {@code coreNodeName} is already marked as doing recovering
- */
- Terms startRecovering(String coreNodeName) {
- long maxTerm = getMaxTerm();
- if (values.get(coreNodeName) == maxTerm)
- return null;
-
- HashMap<String, Long> newValues = new HashMap<>(values);
- if (!newValues.containsKey(recoveringTerm(coreNodeName))) {
- long currentTerm = newValues.getOrDefault(coreNodeName, 0L);
- // by keeping old term, we will have more information in leader election
- newValues.put(recoveringTerm(coreNodeName), currentTerm);
- }
- newValues.put(coreNodeName, maxTerm);
- return new Terms(newValues, version);
- }
-
- /**
- * Mark {@code coreNodeName} as finished recovering
- * @param coreNodeName of the replica
- * @return null if term of {@code coreNodeName} is already finished doing recovering
- */
- Terms doneRecovering(String coreNodeName) {
- if (!values.containsKey(recoveringTerm(coreNodeName))) {
- return null;
- }
-
- HashMap<String, Long> newValues = new HashMap<>(values);
- newValues.remove(recoveringTerm(coreNodeName));
- return new Terms(newValues, version);
- }
-
- @Override
- public String toString() {
- return "Terms{" +
- "values=" + values +
- ", version=" + version +
- '}';
- }
- }
}
diff --git a/solr/core/src/test/org/apache/solr/cloud/ZkShardTermsTest.java b/solr/core/src/test/org/apache/solr/cloud/ZkShardTermsTest.java
index a56202a..56ed8ae7 100644
--- a/solr/core/src/test/org/apache/solr/cloud/ZkShardTermsTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/ZkShardTermsTest.java
@@ -32,6 +32,7 @@ import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Supplier;
import org.apache.solr.client.solrj.SolrServerException;
+import org.apache.solr.client.solrj.cloud.ShardTerms;
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
import org.apache.solr.common.util.TimeSource;
import org.apache.solr.util.TimeOut;
@@ -267,7 +268,7 @@ public class ZkShardTermsTest extends SolrCloudTestCase {
public void testEnsureTermsIsHigher() {
Map<String, Long> map = new HashMap<>();
map.put("leader", 0L);
- ZkShardTerms.Terms terms = new ZkShardTerms.Terms(map, 0);
+ ShardTerms terms = new ShardTerms(map, 0);
terms = terms.increaseTerms("leader", Collections.singleton("replica"));
assertEquals(1L, terms.getTerm("leader").longValue());
}
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/ShardTerms.java b/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/ShardTerms.java
new file mode 100644
index 0000000..3b2f754
--- /dev/null
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/ShardTerms.java
@@ -0,0 +1,256 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.client.solrj.cloud;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+
+import org.apache.solr.common.MapWriter;
+import org.apache.solr.common.SolrException;
+
+/**
+ * Hold values of terms, this class is immutable. Create a new instance for every mutation
+ */
+public class ShardTerms implements MapWriter {
+ private static final String RECOVERING_TERM_SUFFIX = "_recovering";
+ private final Map<String, Long> values;
+ private final long maxTerm;
+ // ZK node version
+ private final int version;
+
+ public ShardTerms () {
+ this(new HashMap<>(), 0);
+ }
+
+ public ShardTerms(ShardTerms newTerms, int version) {
+ this(newTerms.values, version);
+ }
+
+ @Override
+ public void writeMap(EntryWriter ew) throws IOException {
+ values.forEach(ew.getBiConsumer());
+ }
+
+ public ShardTerms(Map<String, Long> values, int version) {
+ this.values = values;
+ this.version = version;
+ if (values.isEmpty()) this.maxTerm = 0;
+ else this.maxTerm = Collections.max(values.values());
+ }
+
+ /**
+ * Can {@code coreNodeName} become leader?
+ * @param coreNodeName of the replica
+ * @return true if {@code coreNodeName} can become leader, false if otherwise
+ */
+ public boolean canBecomeLeader(String coreNodeName) {
+ return haveHighestTermValue(coreNodeName) && !values.containsKey(recoveringTerm(coreNodeName));
+ }
+
+ /**
+ * Is {@code coreNodeName}'s term highest?
+ * @param coreNodeName of the replica
+ * @return true if term of {@code coreNodeName} is highest
+ */
+ public boolean haveHighestTermValue(String coreNodeName) {
+ if (values.isEmpty()) return true;
+ long maxTerm = Collections.max(values.values());
+ return values.getOrDefault(coreNodeName, 0L) == maxTerm;
+ }
+
+ public Long getTerm(String coreNodeName) {
+ return values.get(coreNodeName);
+ }
+
+ /**
+ * Return a new {@link ShardTerms} in which term of {@code leader} is higher than {@code replicasNeedingRecovery}
+ * @param leader coreNodeName of leader
+ * @param replicasNeedingRecovery set of replicas in which their terms should be lower than leader's term
+ * @return null if term of {@code leader} is already higher than {@code replicasNeedingRecovery}
+ */
+ public ShardTerms increaseTerms(String leader, Set<String> replicasNeedingRecovery) {
+ if (!values.containsKey(leader)) {
+ throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Can not find leader's term " + leader);
+ }
+
+ boolean changed = false;
+ boolean foundReplicasInLowerTerms = false;
+
+ HashMap<String, Long> newValues = new HashMap<>(values);
+ long leaderTerm = newValues.get(leader);
+ for (Map.Entry<String, Long> entry : newValues.entrySet()) {
+ String key = entry.getKey();
+ if (replicasNeedingRecovery.contains(key)) foundReplicasInLowerTerms = true;
+ if (Objects.equals(entry.getValue(), leaderTerm)) {
+ if(skipIncreaseTermOf(key, replicasNeedingRecovery)) {
+ changed = true;
+ } else {
+ newValues.put(key, leaderTerm+1);
+ }
+ }
+ }
+
+ // We should skip the optimization if there are no replicasNeedingRecovery present in local terms,
+ // this may indicate that the current value is stale
+ if (!changed && foundReplicasInLowerTerms) return null;
+ return new ShardTerms(newValues, version);
+ }
+
+ private boolean skipIncreaseTermOf(String key, Set<String> replicasNeedingRecovery) {
+ if (key.endsWith(RECOVERING_TERM_SUFFIX)) {
+ key = key.substring(0, key.length() - RECOVERING_TERM_SUFFIX.length());
+ }
+ return replicasNeedingRecovery.contains(key);
+ }
+
+ /**
+ * Return a new {@link ShardTerms} in which highest terms are not zero
+ * @return null if highest terms are already larger than zero
+ */
+ public ShardTerms ensureHighestTermsAreNotZero() {
+ if (maxTerm > 0) return null;
+ else {
+ HashMap<String, Long> newValues = new HashMap<>(values);
+ for (String replica : values.keySet()) {
+ newValues.put(replica, 1L);
+ }
+ return new ShardTerms(newValues, version);
+ }
+ }
+
+ /**
+ * Return a new {@link ShardTerms} in which terms for the {@code coreNodeName} are removed
+ * @param coreNodeName of the replica
+ * @return null if term of {@code coreNodeName} is already not exist
+ */
+ public ShardTerms removeTerm(String coreNodeName) {
+ if (!values.containsKey(recoveringTerm(coreNodeName)) && !values.containsKey(coreNodeName)) {
+ return null;
+ }
+
+ HashMap<String, Long> newValues = new HashMap<>(values);
+ newValues.remove(coreNodeName);
+ newValues.remove(recoveringTerm(coreNodeName));
+
+ return new ShardTerms(newValues, version);
+ }
+
+ /**
+ * Return a new {@link ShardTerms} in which the associate term of {@code coreNodeName} is not null
+ * @param coreNodeName of the replica
+ * @return null if term of {@code coreNodeName} is already exist
+ */
+ public ShardTerms registerTerm(String coreNodeName) {
+ if (values.containsKey(coreNodeName)) return null;
+
+ HashMap<String, Long> newValues = new HashMap<>(values);
+ newValues.put(coreNodeName, 0L);
+ return new ShardTerms(newValues, version);
+ }
+
+ public ShardTerms setTermToZero(String coreNodeName) {
+ if (values.getOrDefault(coreNodeName, -1L) == 0) {
+ return null;
+ }
+ HashMap<String, Long> newValues = new HashMap<>(values);
+ newValues.put(coreNodeName, 0L);
+ return new ShardTerms(newValues, version);
+ }
+
+ /**
+ * Return a new {@link ShardTerms} in which the term of {@code coreNodeName} is max
+ * @param coreNodeName of the replica
+ * @return null if term of {@code coreNodeName} is already maximum
+ */
+ public ShardTerms setTermEqualsToLeader(String coreNodeName) {
+ long maxTerm = getMaxTerm();
+ if (values.get(coreNodeName) == maxTerm) return null;
+
+ HashMap<String, Long> newValues = new HashMap<>(values);
+ newValues.put(coreNodeName, maxTerm);
+ newValues.remove(recoveringTerm(coreNodeName));
+ return new ShardTerms(newValues, version);
+ }
+
+ public long getMaxTerm() {
+ return maxTerm;
+ }
+
+ /**
+ * Mark {@code coreNodeName} as recovering
+ * @param coreNodeName of the replica
+ * @return null if {@code coreNodeName} is already marked as doing recovering
+ */
+ public ShardTerms startRecovering(String coreNodeName) {
+ long maxTerm = getMaxTerm();
+ if (values.get(coreNodeName) == maxTerm)
+ return null;
+
+ HashMap<String, Long> newValues = new HashMap<>(values);
+ if (!newValues.containsKey(recoveringTerm(coreNodeName))) {
+ long currentTerm = newValues.getOrDefault(coreNodeName, 0L);
+ // by keeping old term, we will have more information in leader election
+ newValues.put(recoveringTerm(coreNodeName), currentTerm);
+ }
+ newValues.put(coreNodeName, maxTerm);
+ return new ShardTerms(newValues, version);
+ }
+
+ /**
+ * Mark {@code coreNodeName} as finished recovering
+ * @param coreNodeName of the replica
+ * @return null if term of {@code coreNodeName} is already finished doing recovering
+ */
+ public ShardTerms doneRecovering(String coreNodeName) {
+ if (!values.containsKey(recoveringTerm(coreNodeName))) {
+ return null;
+ }
+
+ HashMap<String, Long> newValues = new HashMap<>(values);
+ newValues.remove(recoveringTerm(coreNodeName));
+ return new ShardTerms(newValues, version);
+ }
+
+ public static String recoveringTerm(String coreNodeName) {
+ return coreNodeName + RECOVERING_TERM_SUFFIX;
+ }
+
+ @Override
+ public String toString() {
+ return "Terms{" +
+ "values=" + values +
+ ", version=" + version +
+ '}';
+ }
+
+ public int getVersion() {
+ return version;
+ }
+
+ public Map<String , Long> getTerms() {
+ return new HashMap<>(this.values);
+ }
+
+ public boolean isRecovering(String name) {
+ return values.containsKey(recoveringTerm(name));
+ }
+}