You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by da...@apache.org on 2018/03/04 05:57:17 UTC
lucene-solr:master: SOLR-12011: Consistence problem when in-sync
replicas are DOWN
Repository: lucene-solr
Updated Branches:
refs/heads/master ad7e94afb -> 9de4225e9
SOLR-12011: Consistence problem when in-sync replicas are DOWN
Project: http://git-wip-us.apache.org/repos/asf/lucene-solr/repo
Commit: http://git-wip-us.apache.org/repos/asf/lucene-solr/commit/9de4225e
Tree: http://git-wip-us.apache.org/repos/asf/lucene-solr/tree/9de4225e
Diff: http://git-wip-us.apache.org/repos/asf/lucene-solr/diff/9de4225e
Branch: refs/heads/master
Commit: 9de4225e9a54ba987c2c7c9d4510bea3e4f9de97
Parents: ad7e94a
Author: Cao Manh Dat <da...@apache.org>
Authored: Sun Mar 4 12:57:05 2018 +0700
Committer: Cao Manh Dat <da...@apache.org>
Committed: Sun Mar 4 12:57:05 2018 +0700
----------------------------------------------------------------------
solr/CHANGES.txt | 5 +
.../org/apache/solr/cloud/CloudDescriptor.java | 2 +-
.../org/apache/solr/cloud/ElectionContext.java | 84 ++----
.../solr/cloud/RecoveringCoreTermWatcher.java | 2 +-
.../org/apache/solr/cloud/ZkController.java | 15 +-
.../org/apache/solr/cloud/ZkShardTerms.java | 156 ++++++++++-
.../org/apache/solr/core/CoreContainer.java | 2 +
.../solr/handler/admin/CollectionsHandler.java | 2 +-
.../solr/handler/admin/CoreAdminOperation.java | 11 -
.../solr/handler/admin/PrepRecoveryOp.java | 5 +-
.../solr/handler/admin/RestoreCoreOp.java | 13 +
.../org/apache/solr/handler/admin/SplitOp.java | 21 ++
.../processor/DistributedUpdateProcessor.java | 90 +++---
.../apache/solr/cloud/TestCloudConsistency.java | 276 +++++++++++++++++++
.../org/apache/solr/cloud/TestPullReplica.java | 14 +
.../org/apache/solr/cloud/ZkShardTermsTest.java | 41 ++-
16 files changed, 595 insertions(+), 144 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/9de4225e/solr/CHANGES.txt
----------------------------------------------------------------------
diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt
index 7b4f878..e0ef777 100644
--- a/solr/CHANGES.txt
+++ b/solr/CHANGES.txt
@@ -78,6 +78,9 @@ Upgrade Notes
* LUCENE-8161: If you are using the spatial JTS library with Solr, you must upgrade to 1.15.0. This new version
of JTS is now dual-licensed to include a BSD style license.
+* SOLR-12011: Replicas which are not up-to-date are not allowed to become leader. Use FORCELEADER API to
+ allow these replicas become leader.
+
New Features
----------------------
* SOLR-11285: Simulation framework for autoscaling. (ab)
@@ -236,6 +239,8 @@ Bug Fixes
Also changed the display label in the Admin UI from routerField to router.field to match the actual API.
(Shawn Heisey via Cassandra Targett)
+* SOLR-12011: Consistence problem when in-sync replicas are DOWN. (Cao Manh Dat)
+
Optimizations
----------------------
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/9de4225e/solr/core/src/java/org/apache/solr/cloud/CloudDescriptor.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/CloudDescriptor.java b/solr/core/src/java/org/apache/solr/cloud/CloudDescriptor.java
index 32cb65b..068191e 100644
--- a/solr/core/src/java/org/apache/solr/cloud/CloudDescriptor.java
+++ b/solr/core/src/java/org/apache/solr/cloud/CloudDescriptor.java
@@ -42,7 +42,7 @@ public class CloudDescriptor {
// set to true once a core has registered in zk
// set to false on detecting a session expiration
private volatile boolean hasRegistered = false;
- volatile Replica.State lastPublished = Replica.State.ACTIVE;
+ private volatile Replica.State lastPublished = Replica.State.ACTIVE;
public static final String NUM_SHARDS = "numShards";
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/9de4225e/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java b/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java
index 2d00151..3ba4dfc 100644
--- a/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java
+++ b/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java
@@ -20,9 +20,10 @@ import java.io.Closeable;
import java.io.IOException;
import java.lang.invoke.MethodHandles;
import java.util.ArrayList;
-import java.util.Collections;
import java.util.EnumSet;
+import java.util.HashSet;
import java.util.List;
+import java.util.Set;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
@@ -359,12 +360,18 @@ final class ShardLeaderElectionContext extends ShardLeaderElectionContextBase {
}
replicaType = core.getCoreDescriptor().getCloudDescriptor().getReplicaType();
-
+ String coreNodeName = core.getCoreDescriptor().getCloudDescriptor().getCoreNodeName();
// should I be leader?
- if (weAreReplacement && !shouldIBeLeader(leaderProps, core, weAreReplacement)) {
+ if (zkController.getShardTerms(collection, shardId).registered(coreNodeName)
+ && !zkController.getShardTerms(collection, shardId).canBecomeLeader(coreNodeName)) {
+ log.info("Can't become leader, term of replica {} less than leader", coreNodeName);
rejoinLeaderElection(core);
return;
}
+
+ if (isClosed) {
+ return;
+ }
log.info("I may be the new leader - try and sync");
@@ -516,8 +523,7 @@ final class ShardLeaderElectionContext extends ShardLeaderElectionContextBase {
zkStateReader.forceUpdateCollection(collection);
ClusterState clusterState = zkStateReader.getClusterState();
Replica rep = getReplica(clusterState, collection, leaderProps.getStr(ZkStateReader.CORE_NODE_NAME_PROP));
- if (rep != null && rep.getState() != Replica.State.ACTIVE
- && rep.getState() != Replica.State.RECOVERING) {
+ if (rep != null && rep.getState() != Replica.State.ACTIVE) {
log.debug("We have become the leader after core registration but are not in an ACTIVE state - publishing ACTIVE");
zkController.publish(core.getCoreDescriptor(), Replica.State.ACTIVE);
}
@@ -593,40 +599,43 @@ final class ShardLeaderElectionContext extends ShardLeaderElectionContextBase {
}
if (replicas != null && replicas.size() > 0) {
+ // set of replicas which is running in new LIR but lirState=DOWN
+ Set<String> replicasMustBeInLowerTerm = new HashSet<>();
for (String replicaCoreNodeName : replicas) {
if (coreNodeName.equals(replicaCoreNodeName))
continue; // added safe-guard so we don't mark this core as down
- if (zkController.getShardTerms(collection, shardId).registered(replicaCoreNodeName)) {
- // the replica registered its term so it is running with the new LIR implementation
- // we can put this replica into recovery by increase our terms
- zkController.getShardTerms(collection, shardId).ensureTermsIsHigher(coreNodeName, Collections.singleton(replicaCoreNodeName));
- continue;
- }
-
final Replica.State lirState = zkController.getLeaderInitiatedRecoveryState(coll, shardId, replicaCoreNodeName);
if (lirState == Replica.State.DOWN || lirState == Replica.State.RECOVERY_FAILED) {
log.info("After core={} coreNodeName={} was elected leader, a replica coreNodeName={} was found in state: "
+ lirState.toString() + " and needing recovery.", coreName, coreNodeName, replicaCoreNodeName);
- List<ZkCoreNodeProps> replicaProps =
- zkController.getZkStateReader().getReplicaProps(collection, shardId, coreNodeName);
+ List<Replica> replicasProps =
+ zkController.getZkStateReader().getClusterState().getCollection(collection)
+ .getSlice(shardId).getReplicas(EnumSet.of(Replica.Type.NRT, Replica.Type.TLOG));
- if (replicaProps != null && replicaProps.size() > 0) {
+ if (replicasProps != null && replicasProps.size() > 0) {
ZkCoreNodeProps coreNodeProps = null;
- for (ZkCoreNodeProps p : replicaProps) {
- if (((Replica)p.getNodeProps()).getName().equals(replicaCoreNodeName)) {
- coreNodeProps = p;
+ for (Replica p : replicasProps) {
+ if (p.getName().equals(replicaCoreNodeName)) {
+ coreNodeProps = new ZkCoreNodeProps(p);
break;
}
}
- zkController.ensureReplicaInLeaderInitiatedRecovery(cc,
- collection, shardId, coreNodeProps, core.getCoreDescriptor(),
- false /* forcePublishState */);
+ if (zkController.getShardTerms(collection, shardId).registered(replicaCoreNodeName)) {
+ replicasMustBeInLowerTerm.add(replicaCoreNodeName);
+ } else {
+ zkController.ensureReplicaInLeaderInitiatedRecovery(cc,
+ collection, shardId, coreNodeProps, core.getCoreDescriptor(),
+ false /* forcePublishState */);
+ }
}
}
}
+ // these replicas registered their terms so it is running with the new LIR implementation
+ // we can put this replica into recovery by increase our terms
+ zkController.getShardTerms(collection, shardId).ensureTermsIsHigher(coreNodeName, replicasMustBeInLowerTerm);
}
} // core gets closed automagically
}
@@ -741,39 +750,6 @@ final class ShardLeaderElectionContext extends ShardLeaderElectionContextBase {
leaderElector.joinElection(this, true);
}
- private boolean shouldIBeLeader(ZkNodeProps leaderProps, SolrCore core, boolean weAreReplacement) {
- log.debug("Checking if I should try and be the leader.");
-
- if (isClosed) {
- log.debug("Bailing on leader process because we have been closed");
- return false;
- }
-
- if (!weAreReplacement) {
- // we are the first node starting in the shard - there is a configurable wait
- // to make sure others participate in sync and leader election, we can be leader
- return true;
- }
-
- String coreNodeName = core.getCoreDescriptor().getCloudDescriptor().getCoreNodeName();
- if (zkController.getShardTerms(collection, shardId).registered(coreNodeName)
- && !zkController.getShardTerms(collection, shardId).canBecomeLeader(coreNodeName)) {
- log.info("Can't become leader, term of replica {} less than leader", coreNodeName);
- return false;
- }
-
- if (core.getCoreDescriptor().getCloudDescriptor().getLastPublished() == Replica.State.ACTIVE) {
- log.debug("My last published State was Active, it's okay to be the leader.");
- return true;
- }
- log.debug("My last published State was "
- + core.getCoreDescriptor().getCloudDescriptor().getLastPublished()
- + ", I won't be the leader.");
- // TODO: and if no one is a good candidate?
-
- return false;
- }
-
}
final class OverseerElectionContext extends ElectionContext {
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/9de4225e/solr/core/src/java/org/apache/solr/cloud/RecoveringCoreTermWatcher.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/RecoveringCoreTermWatcher.java b/solr/core/src/java/org/apache/solr/cloud/RecoveringCoreTermWatcher.java
index 26fec97..90a500a 100644
--- a/solr/core/src/java/org/apache/solr/cloud/RecoveringCoreTermWatcher.java
+++ b/solr/core/src/java/org/apache/solr/cloud/RecoveringCoreTermWatcher.java
@@ -48,7 +48,7 @@ public class RecoveringCoreTermWatcher implements ZkShardTerms.CoreTermWatcher {
if (solrCore.getCoreDescriptor() == null || solrCore.getCoreDescriptor().getCloudDescriptor() == null) return true;
String coreNodeName = solrCore.getCoreDescriptor().getCloudDescriptor().getCoreNodeName();
- if (terms.canBecomeLeader(coreNodeName)) return true;
+ if (terms.haveHighestTermValue(coreNodeName)) return true;
if (lastTermDoRecovery.get() < terms.getTerm(coreNodeName)) {
log.info("Start recovery on {} because core's term is less than leader's term", coreNodeName);
lastTermDoRecovery.set(terms.getTerm(coreNodeName));
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/9de4225e/solr/core/src/java/org/apache/solr/cloud/ZkController.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/ZkController.java b/solr/core/src/java/org/apache/solr/cloud/ZkController.java
index cb1fcea..a159db5 100644
--- a/solr/core/src/java/org/apache/solr/cloud/ZkController.java
+++ b/solr/core/src/java/org/apache/solr/cloud/ZkController.java
@@ -1045,7 +1045,7 @@ public class ZkController {
// This flag is used for testing rolling updates and should be removed in SOLR-11812
boolean isRunningInNewLIR = "new".equals(desc.getCoreProperty("lirVersion", "new"));
- if (isRunningInNewLIR) {
+ if (isRunningInNewLIR && cloudDesc.getReplicaType() != Type.PULL) {
shardTerms.registerTerm(coreZkNodeName);
}
String shardId = cloudDesc.getShardId();
@@ -1455,13 +1455,20 @@ public class ZkController {
// This flag is used for testing rolling updates and should be removed in SOLR-11812
boolean isRunningInNewLIR = "new".equals(cd.getCoreProperty("lirVersion", "new"));
- if (state == Replica.State.RECOVERING && isRunningInNewLIR) {
- getShardTerms(collection, shardId).setEqualsToMax(coreNodeName);
+ // pull replicas are excluded because their terms are not considered
+ if (state == Replica.State.RECOVERING && isRunningInNewLIR && cd.getCloudDescriptor().getReplicaType() != Type.PULL) {
+ // state is used by client, state of replica can change from RECOVERING to DOWN without needed to finish recovery
+ // by calling this we will know that a replica actually finished recovery or not
+ getShardTerms(collection, shardId).startRecovering(coreNodeName);
}
+ if (state == Replica.State.ACTIVE && isRunningInNewLIR && cd.getCloudDescriptor().getReplicaType() != Type.PULL) {
+ getShardTerms(collection, shardId).doneRecovering(coreNodeName);
+ }
+
ZkNodeProps m = new ZkNodeProps(props);
if (updateLastState) {
- cd.getCloudDescriptor().lastPublished = state;
+ cd.getCloudDescriptor().setLastPublished(state);
}
overseerJobQueue.offer(Utils.toJSON(m));
} finally {
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/9de4225e/solr/core/src/java/org/apache/solr/cloud/ZkShardTerms.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/ZkShardTerms.java b/solr/core/src/java/org/apache/solr/cloud/ZkShardTerms.java
index 7dc0d57..50b424c 100644
--- a/solr/core/src/java/org/apache/solr/cloud/ZkShardTerms.java
+++ b/solr/core/src/java/org/apache/solr/cloud/ZkShardTerms.java
@@ -22,7 +22,6 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
-import java.util.NoSuchElementException;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.TimeoutException;
@@ -100,6 +99,8 @@ public class ZkShardTerms implements AutoCloseable{
* @param replicasNeedingRecovery set of replicas in which their terms should be lower than leader's term
*/
public void ensureTermsIsHigher(String leader, Set<String> replicasNeedingRecovery) {
+ if (replicasNeedingRecovery.isEmpty()) return;
+
Terms newTerms;
while( (newTerms = terms.increaseTerms(leader, replicasNeedingRecovery)) != null) {
if (forceSaveTerms(newTerms)) return;
@@ -107,7 +108,7 @@ public class ZkShardTerms implements AutoCloseable{
}
/**
- * Can this replica become leader or is this replica's term equals to leader's term?
+ * Can this replica become leader?
* @param coreNodeName of the replica
* @return true if this replica can become leader, false if otherwise
*/
@@ -116,6 +117,15 @@ public class ZkShardTerms implements AutoCloseable{
}
/**
+ * Should leader skip sending updates to this replica?
+ * @param coreNodeName of the replica
+ * @return true if this replica has term equals to leader's term, false if otherwise
+ */
+ public boolean skipSendingUpdatesTo(String coreNodeName) {
+ return !terms.haveHighestTermValue(coreNodeName);
+ }
+
+ /**
* Did this replica registered its term? This is a sign to check f
* @param coreNodeName of the replica
* @return true if this replica registered its term, false if otherwise
@@ -184,16 +194,59 @@ public class ZkShardTerms implements AutoCloseable{
}
/**
- * Set a replica's term equals to leader's term
+ * Set a replica's term equals to leader's term.
+ * This call should only be used by {@link org.apache.solr.common.params.CollectionParams.CollectionAction#FORCELEADER}
* @param coreNodeName of the replica
*/
- public void setEqualsToMax(String coreNodeName) {
+ public void setTermEqualsToLeader(String coreNodeName) {
+ Terms newTerms;
+ while ( (newTerms = terms.setTermEqualsToLeader(coreNodeName)) != null) {
+ if (forceSaveTerms(newTerms)) break;
+ }
+ }
+
+ public void setTermToZero(String coreNodeName) {
+ Terms newTerms;
+ while ( (newTerms = terms.setTermToZero(coreNodeName)) != null) {
+ if (forceSaveTerms(newTerms)) break;
+ }
+ }
+
+ /**
+ * Mark {@code coreNodeName} as recovering
+ */
+ public void startRecovering(String coreNodeName) {
+ Terms newTerms;
+ while ( (newTerms = terms.startRecovering(coreNodeName)) != null) {
+ if (forceSaveTerms(newTerms)) break;
+ }
+ }
+
+ /**
+ * Mark {@code coreNodeName} as finished recovering
+ */
+ public void doneRecovering(String coreNodeName) {
Terms newTerms;
- while ( (newTerms = terms.setEqualsToMax(coreNodeName)) != null) {
+ while ( (newTerms = terms.doneRecovering(coreNodeName)) != null) {
if (forceSaveTerms(newTerms)) break;
}
}
+ /**
+ * When first updates come in, all replicas have some data now,
+ * so we must switch from term 0 (registered) to 1 (have some data)
+ */
+ public void ensureHighestTermsAreNotZero() {
+ Terms newTerms;
+ while ( (newTerms = terms.ensureHighestTermsAreNotZero()) != null) {
+ if (forceSaveTerms(newTerms)) break;
+ }
+ }
+
+ public long getHighestTerm() {
+ return terms.getMaxTerm();
+ }
+
public long getTerm(String coreNodeName) {
Long term = terms.getTerm(coreNodeName);
return term == null? -1 : term;
@@ -232,6 +285,7 @@ public class ZkShardTerms implements AutoCloseable{
try {
Stat stat = zkClient.setData(znodePath, znodeData, newTerms.version, true);
setNewTerms(new Terms(newTerms.values, stat.getVersion()));
+ log.info("Successful update terms at {} to {}", znodePath, newTerms);
return true;
} catch (KeeperException.BadVersionException e) {
log.info("Failed to save terms, version is not match, retrying");
@@ -367,6 +421,7 @@ public class ZkShardTerms implements AutoCloseable{
*/
static class Terms {
private final Map<String, Long> values;
+ private final long maxTerm;
// ZK node version
private final int version;
@@ -377,14 +432,25 @@ public class ZkShardTerms implements AutoCloseable{
public Terms(Map<String, Long> values, int version) {
this.values = values;
this.version = version;
+ if (values.isEmpty()) this.maxTerm = 0;
+ else this.maxTerm = Collections.max(values.values());
}
/**
- * Can this replica become leader or is this replica's term equals to leader's term?
+ * Can {@code coreNodeName} become leader?
* @param coreNodeName of the replica
- * @return true if this replica can become leader, false if otherwise
+ * @return true if {@code coreNodeName} can become leader, false if otherwise
*/
boolean canBecomeLeader(String coreNodeName) {
+ return haveHighestTermValue(coreNodeName) && !values.containsKey(coreNodeName + "_recovering");
+ }
+
+ /**
+ * Is {@code coreNodeName}'s term highest?
+ * @param coreNodeName of the replica
+ * @return true if term of {@code coreNodeName} is highest
+ */
+ boolean haveHighestTermValue(String coreNodeName) {
if (values.isEmpty()) return true;
long maxTerm = Collections.max(values.values());
return values.getOrDefault(coreNodeName, 0L) == maxTerm;
@@ -428,6 +494,21 @@ public class ZkShardTerms implements AutoCloseable{
}
/**
+ * Return a new {@link Terms} in which highest terms are not zero
+ * @return null if highest terms are already larger than zero
+ */
+ Terms ensureHighestTermsAreNotZero() {
+ if (maxTerm > 0) return null;
+ else {
+ HashMap<String, Long> newValues = new HashMap<>(values);
+ for (String replica : values.keySet()) {
+ newValues.put(replica, 1L);
+ }
+ return new Terms(newValues, version);
+ }
+ }
+
+ /**
* Return a new {@link Terms} in which term of {@code coreNodeName} is removed
* @param coreNodeName of the replica
* @return null if term of {@code coreNodeName} is already not exist
@@ -453,23 +534,70 @@ public class ZkShardTerms implements AutoCloseable{
return new Terms(newValues, version);
}
+ Terms setTermToZero(String coreNodeName) {
+ if (values.getOrDefault(coreNodeName, -1L) == 0) {
+ return null;
+ }
+ HashMap<String, Long> newValues = new HashMap<>(values);
+ newValues.put(coreNodeName, 0L);
+ return new Terms(newValues, version);
+ }
+
/**
* Return a new {@link Terms} in which the term of {@code coreNodeName} is max
* @param coreNodeName of the replica
* @return null if term of {@code coreNodeName} is already maximum
*/
- Terms setEqualsToMax(String coreNodeName) {
- long maxTerm;
- try {
- maxTerm = Collections.max(values.values());
- } catch (NoSuchElementException e){
- maxTerm = 0;
- }
+ Terms setTermEqualsToLeader(String coreNodeName) {
+ long maxTerm = getMaxTerm();
if (values.get(coreNodeName) == maxTerm) return null;
HashMap<String, Long> newValues = new HashMap<>(values);
newValues.put(coreNodeName, maxTerm);
return new Terms(newValues, version);
}
+
+ long getMaxTerm() {
+ return maxTerm;
+ }
+
+ /**
+ * Mark {@code coreNodeName} as recovering
+ * @param coreNodeName of the replica
+ * @return null if {@code coreNodeName} is already marked as doing recovering
+ */
+ Terms startRecovering(String coreNodeName) {
+ long maxTerm = getMaxTerm();
+ if (values.get(coreNodeName) == maxTerm && values.getOrDefault(coreNodeName+"_recovering", -1L) == maxTerm)
+ return null;
+
+ HashMap<String, Long> newValues = new HashMap<>(values);
+ newValues.put(coreNodeName, maxTerm);
+ newValues.put(coreNodeName+"_recovering", maxTerm);
+ return new Terms(newValues, version);
+ }
+
+ /**
+ * Mark {@code coreNodeName} as finished recovering
+ * @param coreNodeName of the replica
+ * @return null if term of {@code coreNodeName} is already finished doing recovering
+ */
+ Terms doneRecovering(String coreNodeName) {
+ if (!values.containsKey(coreNodeName+"_recovering")) {
+ return null;
+ }
+
+ HashMap<String, Long> newValues = new HashMap<>(values);
+ newValues.remove(coreNodeName+"_recovering");
+ return new Terms(newValues, version);
+ }
+
+ @Override
+ public String toString() {
+ return "Terms{" +
+ "values=" + values +
+ ", version=" + version +
+ '}';
+ }
}
}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/9de4225e/solr/core/src/java/org/apache/solr/core/CoreContainer.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/core/CoreContainer.java b/solr/core/src/java/org/apache/solr/core/CoreContainer.java
index bf25d69..c735071 100644
--- a/solr/core/src/java/org/apache/solr/core/CoreContainer.java
+++ b/solr/core/src/java/org/apache/solr/core/CoreContainer.java
@@ -1132,6 +1132,8 @@ public class CoreContainer {
if (leader != null && leader.getState() == State.ACTIVE) {
log.info("Found active leader, will attempt to create fresh core and recover.");
resetIndexDirectory(dcore, coreConfig);
+ // the index of this core is emptied, its term should be set to 0
+ getZkController().getShardTerms(desc.getCollectionName(), desc.getShardId()).setTermToZero(desc.getCoreNodeName());
return new SolrCore(this, dcore, coreConfig);
}
} catch (SolrException se) {
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/9de4225e/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java b/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java
index 4933559..db70796 100644
--- a/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java
+++ b/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java
@@ -1149,7 +1149,7 @@ public class CollectionsHandler extends RequestHandlerBase implements Permission
if (optionalMaxTerm.isPresent()) {
liveReplicas.stream()
.filter(rep -> zkShardTerms.getTerm(rep.getName()) == optionalMaxTerm.getAsLong())
- .forEach(rep -> zkShardTerms.setEqualsToMax(rep.getName()));
+ .forEach(rep -> zkShardTerms.setTermEqualsToLeader(rep.getName()));
}
}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/9de4225e/solr/core/src/java/org/apache/solr/handler/admin/CoreAdminOperation.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/handler/admin/CoreAdminOperation.java b/solr/core/src/java/org/apache/solr/handler/admin/CoreAdminOperation.java
index fbf24a1..179589b 100644
--- a/solr/core/src/java/org/apache/solr/handler/admin/CoreAdminOperation.java
+++ b/solr/core/src/java/org/apache/solr/handler/admin/CoreAdminOperation.java
@@ -28,7 +28,6 @@ import org.apache.lucene.store.Directory;
import org.apache.solr.cloud.ZkController;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrException.ErrorCode;
-import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.params.CoreAdminParams;
import org.apache.solr.common.params.SolrParams;
import org.apache.solr.common.util.NamedList;
@@ -234,16 +233,6 @@ enum CoreAdminOperation implements CoreAdminOp {
if (cname == null) {
throw new IllegalArgumentException(CoreAdminParams.CORE + " is required");
}
- try (SolrCore core = it.handler.coreContainer.getCore(cname)) {
-
- // Setting the last published state for this core to be ACTIVE
- if (core != null) {
- core.getCoreDescriptor().getCloudDescriptor().setLastPublished(Replica.State.ACTIVE);
- log().info("Setting the last published state for this core, {}, to {}", core.getName(), Replica.State.ACTIVE);
- } else {
- SolrException.log(log(), "Could not find core: " + cname);
- }
- }
}),
BACKUPCORE_OP(BACKUPCORE, new BackupCoreOp()),
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/9de4225e/solr/core/src/java/org/apache/solr/handler/admin/PrepRecoveryOp.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/handler/admin/PrepRecoveryOp.java b/solr/core/src/java/org/apache/solr/handler/admin/PrepRecoveryOp.java
index 3647735..d064e78 100644
--- a/solr/core/src/java/org/apache/solr/handler/admin/PrepRecoveryOp.java
+++ b/solr/core/src/java/org/apache/solr/handler/admin/PrepRecoveryOp.java
@@ -127,7 +127,10 @@ class PrepRecoveryOp implements CoreAdminHandler.CoreAdminOp {
ZkShardTerms shardTerms = coreContainer.getZkController().getShardTerms(collectionName, slice.getName());
// if the replica is waiting for leader to see recovery state, the leader should refresh its terms
- if (waitForState == Replica.State.RECOVERING && shardTerms.registered(coreNodeName) && !shardTerms.canBecomeLeader(coreNodeName)) {
+ if (waitForState == Replica.State.RECOVERING && shardTerms.registered(coreNodeName) && shardTerms.skipSendingUpdatesTo(coreNodeName)) {
+ // The replica changed it term, then published itself as RECOVERING.
+ // This core already see replica as RECOVERING
+ // so it is guarantees that a live-fetch will be enough for this core to see max term published
shardTerms.refreshTerms();
}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/9de4225e/solr/core/src/java/org/apache/solr/handler/admin/RestoreCoreOp.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/handler/admin/RestoreCoreOp.java b/solr/core/src/java/org/apache/solr/handler/admin/RestoreCoreOp.java
index 03d1478..dbb2af0 100644
--- a/solr/core/src/java/org/apache/solr/handler/admin/RestoreCoreOp.java
+++ b/solr/core/src/java/org/apache/solr/handler/admin/RestoreCoreOp.java
@@ -20,8 +20,10 @@ package org.apache.solr.handler.admin;
import java.net.URI;
import java.util.Optional;
+import org.apache.solr.cloud.CloudDescriptor;
import org.apache.solr.cloud.ZkController;
import org.apache.solr.common.SolrException;
+import org.apache.solr.common.cloud.Slice;
import org.apache.solr.common.params.CoreAdminParams;
import org.apache.solr.common.params.SolrParams;
import org.apache.solr.core.SolrCore;
@@ -61,11 +63,22 @@ class RestoreCoreOp implements CoreAdminHandler.CoreAdminOp {
URI locationUri = repository.createURI(location);
try (SolrCore core = it.handler.coreContainer.getCore(cname)) {
+ CloudDescriptor cd = core.getCoreDescriptor().getCloudDescriptor();
+ // this core must be the only replica in its shard otherwise
+ // we cannot guarantee consistency between replicas because when we add data (or restore index) to this replica
+ Slice slice = zkController.getClusterState().getCollection(cd.getCollectionName()).getSlice(cd.getShardId());
+ if (slice.getReplicas().size() != 1) {
+ throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
+ "Failed to restore core=" + core.getName() + ", the core must be the only replica in its shard");
+ }
RestoreCore restoreCore = new RestoreCore(repository, core, locationUri, name);
boolean success = restoreCore.doRestore();
if (!success) {
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Failed to restore core=" + core.getName());
}
+ // other replicas to-be-created will know that they are out of date by
+ // looking at their term : 0 compare to term of this core : 1
+ zkController.getShardTerms(cd.getCollectionName(), cd.getShardId()).ensureHighestTermsAreNotZero();
}
}
}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/9de4225e/solr/core/src/java/org/apache/solr/handler/admin/SplitOp.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/handler/admin/SplitOp.java b/solr/core/src/java/org/apache/solr/handler/admin/SplitOp.java
index 5267c75..5e924d8 100644
--- a/solr/core/src/java/org/apache/solr/handler/admin/SplitOp.java
+++ b/solr/core/src/java/org/apache/solr/handler/admin/SplitOp.java
@@ -23,6 +23,8 @@ import java.util.Arrays;
import java.util.List;
import java.util.Map;
+import org.apache.solr.cloud.CloudDescriptor;
+import org.apache.solr.cloud.ZkShardTerms;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.cloud.ClusterState;
import org.apache.solr.common.cloud.DocCollection;
@@ -111,6 +113,16 @@ class SplitOp implements CoreAdminHandler.CoreAdminOp {
SolrCore newcore = it.handler.coreContainer.getCore(newCoreName);
if (newcore != null) {
newCores.add(newcore);
+ if (it.handler.coreContainer.isZooKeeperAware()) {
+ // this core must be the only replica in its shard otherwise
+ // we cannot guarantee consistency between replicas because when we add data to this replica
+ CloudDescriptor cd = newcore.getCoreDescriptor().getCloudDescriptor();
+ ClusterState clusterState = it.handler.coreContainer.getZkController().getClusterState();
+ if (clusterState.getCollection(cd.getCollectionName()).getSlice(cd.getShardId()).getReplicas().size() != 1) {
+ throw new SolrException(SolrException.ErrorCode.BAD_REQUEST,
+ "Core with core name " + newCoreName + " must be the only replica in shard " + cd.getShardId());
+ }
+ }
} else {
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Core with core name " + newCoreName + " expected but doesn't exist.");
}
@@ -123,6 +135,15 @@ class SplitOp implements CoreAdminHandler.CoreAdminOp {
SplitIndexCommand cmd = new SplitIndexCommand(req, paths, newCores, ranges, router, routeFieldName, splitKey);
core.getUpdateHandler().split(cmd);
+ if (it.handler.coreContainer.isZooKeeperAware()) {
+ for (SolrCore newcore : newCores) {
+ // the index of the core changed from empty to have some data, its term must be not zero
+ CloudDescriptor cd = newcore.getCoreDescriptor().getCloudDescriptor();
+ ZkShardTerms zkShardTerms = it.handler.coreContainer.getZkController().getShardTerms(cd.getCollectionName(), cd.getShardId());
+ zkShardTerms.ensureHighestTermsAreNotZero();
+ }
+ }
+
// After the split has completed, someone (here?) should start the process of replaying the buffered updates.
} catch (Exception e) {
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/9de4225e/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java b/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java
index b37cb9c..5070582 100644
--- a/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java
+++ b/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java
@@ -23,7 +23,6 @@ import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.EnumSet;
-import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
@@ -173,6 +172,8 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
private boolean forwardToLeader = false;
private boolean isSubShardLeader = false;
private List<Node> nodes;
+ private Set<String> skippedCoreNodeNames;
+ private boolean isIndexChanged = false;
private UpdateCommand updateCommand; // the current command this processor is working on.
@@ -334,9 +335,13 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
// that means I want to forward onto my replicas...
// so get the replicas...
forwardToLeader = false;
- List<ZkCoreNodeProps> replicaProps = zkController.getZkStateReader()
- .getReplicaProps(collection, shardId, leaderReplica.getName(), null, Replica.State.DOWN);
- if (replicaProps == null) {
+ ClusterState clusterState = zkController.getZkStateReader().getClusterState();
+ String leaderCoreNodeName = leaderReplica.getName();
+ List<Replica> replicas = clusterState.getCollection(collection)
+ .getSlice(shardId)
+ .getReplicas(EnumSet.of(Replica.Type.NRT, Replica.Type.TLOG));
+ replicas.removeIf((replica) -> replica.getName().equals(leaderCoreNodeName));
+ if (replicas.isEmpty()) {
return null;
}
@@ -349,16 +354,20 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
log.info("test.distrib.skip.servers was found and contains:" + skipListSet);
}
- List<Node> nodes = new ArrayList<>(replicaProps.size());
+ List<Node> nodes = new ArrayList<>(replicas.size());
+ skippedCoreNodeNames = new HashSet<>();
ZkShardTerms zkShardTerms = zkController.getShardTerms(collection, shardId);
- for (ZkCoreNodeProps props : replicaProps) {
- String coreNodeName = ((Replica) props.getNodeProps()).getName();
- if (skipList != null && skipListSet.contains(props.getCoreUrl())) {
- log.info("check url:" + props.getCoreUrl() + " against:" + skipListSet + " result:true");
- } else if(!isOldLIRMode && zkShardTerms.registered(coreNodeName) && !zkShardTerms.canBecomeLeader(coreNodeName)) {
- log.info("skip url:{} cause its term is less than leader", props.getCoreUrl());
+ for (Replica replica: replicas) {
+ String coreNodeName = replica.getName();
+ if (skipList != null && skipListSet.contains(replica.getCoreUrl())) {
+ log.info("check url:" + replica.getCoreUrl() + " against:" + skipListSet + " result:true");
+ } else if(!isOldLIRMode && zkShardTerms.registered(coreNodeName) && zkShardTerms.skipSendingUpdatesTo(coreNodeName)) {
+ log.debug("skip url:{} cause its term is less than leader", replica.getCoreUrl());
+ skippedCoreNodeNames.add(replica.getName());
+ } else if (!clusterState.getLiveNodes().contains(replica.getNodeName()) || replica.getState() == Replica.State.DOWN) {
+ skippedCoreNodeNames.add(replica.getName());
} else {
- nodes.add(new StdNode(props, collection, shardId));
+ nodes.add(new StdNode(new ZkCoreNodeProps(replica), collection, shardId));
}
}
return nodes;
@@ -750,6 +759,14 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
// TODO: optionally fail if n replicas are not reached...
private void doFinish() {
+ boolean shouldUpdateTerms = isLeader && !isOldLIRMode && isIndexChanged;
+ if (shouldUpdateTerms) {
+ ZkShardTerms zkShardTerms = zkController.getShardTerms(cloudDesc.getCollectionName(), cloudDesc.getShardId());
+ if (skippedCoreNodeNames != null) {
+ zkShardTerms.ensureTermsIsHigher(cloudDesc.getCoreNodeName(), skippedCoreNodeNames);
+ }
+ zkController.getShardTerms(collection, cloudDesc.getShardId()).ensureHighestTermsAreNotZero();
+ }
// TODO: if not a forward and replication req is not specified, we could
// send in a background thread
@@ -758,7 +775,7 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
// TODO - we may need to tell about more than one error...
List<Error> errorsForClient = new ArrayList<>(errors.size());
- Map<ShardInfo, Set<String>> failedReplicas = new HashMap<>();
+ Set<String> replicasShouldBeInLowerTerms = new HashSet<>();
for (final SolrCmdDistributor.Error error : errors) {
if (error.req.node instanceof RetryNode) {
@@ -856,9 +873,7 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
Throwable rootCause = SolrException.getRootCause(error.e);
if (!isOldLIRMode && zkController.getShardTerms(collection, shardId).registered(coreNodeName)) {
log.error("Setting up to try to start recovery on replica {} with url {} by increasing leader term", coreNodeName, replicaUrl, rootCause);
- ShardInfo shardInfo = new ShardInfo(collection, shardId, leaderCoreNodeName);
- failedReplicas.putIfAbsent(shardInfo, new HashSet<>());
- failedReplicas.get(shardInfo).add(coreNodeName);
+ replicasShouldBeInLowerTerms.add(coreNodeName);
} else {
// The replica did not registered its term, so it must run with old LIR implementation
log.error("Setting up to try to start recovery on replica {}", replicaUrl, rootCause);
@@ -891,11 +906,9 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
}
}
}
- if (!isOldLIRMode) {
- for (Map.Entry<ShardInfo, Set<String>> entry : failedReplicas.entrySet()) {
- ShardInfo shardInfo = entry.getKey();
- zkController.getShardTerms(shardInfo.collection, shardInfo.shard).ensureTermsIsHigher(shardInfo.leader, entry.getValue());
- }
+ if (!isOldLIRMode && !replicasShouldBeInLowerTerms.isEmpty()) {
+ zkController.getShardTerms(cloudDesc.getCollectionName(), cloudDesc.getShardId())
+ .ensureTermsIsHigher(cloudDesc.getCoreNodeName(), replicasShouldBeInLowerTerms);
}
// in either case, we need to attach the achieved and min rf to the response.
if (leaderReplicationTracker != null || rollupReplicationTracker != null) {
@@ -928,48 +941,17 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
throw new DistributedUpdatesAsyncException(errorsForClient);
}
}
-
- private class ShardInfo {
- private String collection;
- private String shard;
- private String leader;
-
- public ShardInfo(String collection, String shard, String leader) {
- this.collection = collection;
- this.shard = shard;
- this.leader = leader;
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o) return true;
- if (o == null || getClass() != o.getClass()) return false;
-
- ShardInfo shardInfo = (ShardInfo) o;
-
- if (!collection.equals(shardInfo.collection)) return false;
- if (!shard.equals(shardInfo.shard)) return false;
- return leader.equals(shardInfo.leader);
- }
-
- @Override
- public int hashCode() {
- int result = collection.hashCode();
- result = 31 * result + shard.hashCode();
- result = 31 * result + leader.hashCode();
- return result;
- }
- }
-
// must be synchronized by bucket
private void doLocalAdd(AddUpdateCommand cmd) throws IOException {
super.processAdd(cmd);
+ isIndexChanged = true;
}
// must be synchronized by bucket
private void doLocalDelete(DeleteUpdateCommand cmd) throws IOException {
super.processDelete(cmd);
+ isIndexChanged = true;
}
/**
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/9de4225e/solr/core/src/test/org/apache/solr/cloud/TestCloudConsistency.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/TestCloudConsistency.java b/solr/core/src/test/org/apache/solr/cloud/TestCloudConsistency.java
new file mode 100644
index 0000000..ee8bf51
--- /dev/null
+++ b/solr/core/src/test/org/apache/solr/cloud/TestCloudConsistency.java
@@ -0,0 +1,276 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.cloud;
+
+import java.io.IOException;
+import java.lang.invoke.MethodHandles;
+import java.net.URI;
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.solr.JSONTestUtil;
+import org.apache.solr.client.solrj.SolrServerException;
+import org.apache.solr.client.solrj.embedded.JettySolrRunner;
+import org.apache.solr.client.solrj.impl.HttpSolrClient;
+import org.apache.solr.client.solrj.request.CollectionAdminRequest;
+import org.apache.solr.client.solrj.request.QueryRequest;
+import org.apache.solr.common.SolrInputDocument;
+import org.apache.solr.common.cloud.Replica;
+import org.apache.solr.common.cloud.ZkCoreNodeProps;
+import org.apache.solr.common.cloud.ZkStateReader;
+import org.apache.solr.common.util.NamedList;
+import org.apache.solr.common.util.TimeSource;
+import org.apache.solr.util.TimeOut;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class TestCloudConsistency extends SolrCloudTestCase {
+
+ private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+ private static Map<JettySolrRunner, SocketProxy> proxies;
+ private static Map<URI, JettySolrRunner> jettys;
+
+ @BeforeClass
+ public static void setupCluster() throws Exception {
+ System.setProperty("solr.directoryFactory", "solr.StandardDirectoryFactory");
+ System.setProperty("solr.ulog.numRecordsToKeep", "1000");
+
+ configureCluster(4)
+ .addConfig("conf", configset("cloud-minimal"))
+ .configure();
+ // Add proxies
+ proxies = new HashMap<>(cluster.getJettySolrRunners().size());
+ jettys = new HashMap<>();
+ for (JettySolrRunner jetty:cluster.getJettySolrRunners()) {
+ SocketProxy proxy = new SocketProxy();
+ jetty.setProxyPort(proxy.getListenPort());
+ cluster.stopJettySolrRunner(jetty);//TODO: Can we avoid this restart
+ cluster.startJettySolrRunner(jetty);
+ proxy.open(jetty.getBaseUrl().toURI());
+ LOG.info("Adding proxy for URL: " + jetty.getBaseUrl() + ". Proxy: " + proxy.getUrl());
+ proxies.put(jetty, proxy);
+ jettys.put(proxy.getUrl(), jetty);
+ }
+ }
+
+ @AfterClass
+ public static void tearDownCluster() throws Exception {
+ for (SocketProxy proxy:proxies.values()) {
+ proxy.close();
+ }
+ proxies = null;
+ jettys = null;
+ }
+
+ @Test
+ public void testOutOfSyncReplicasCannotBecomeLeader() throws Exception {
+ testOutOfSyncReplicasCannotBecomeLeader(false);
+ }
+
+ @Test
+ public void testOutOfSyncReplicasCannotBecomeLeaderAfterRestart() throws Exception {
+ testOutOfSyncReplicasCannotBecomeLeader(true);
+ }
+
+ public void testOutOfSyncReplicasCannotBecomeLeader(boolean onRestart) throws Exception {
+ final String collectionName = "outOfSyncReplicasCannotBecomeLeader-"+onRestart;
+ CollectionAdminRequest.createCollection(collectionName, 1, 3)
+ .setCreateNodeSet("")
+ .process(cluster.getSolrClient());
+ CollectionAdminRequest.addReplicaToShard(collectionName, "shard1")
+ .setNode(cluster.getJettySolrRunner(0).getNodeName())
+ .process(cluster.getSolrClient());
+ waitForState("Timeout waiting for shard leader", collectionName, clusterShape(1, 1));
+
+ CollectionAdminRequest.addReplicaToShard(collectionName, "shard1")
+ .setNode(cluster.getJettySolrRunner(1).getNodeName())
+ .process(cluster.getSolrClient());
+ CollectionAdminRequest.addReplicaToShard(collectionName, "shard1")
+ .setNode(cluster.getJettySolrRunner(2).getNodeName())
+ .process(cluster.getSolrClient());
+ waitForState("Timeout waiting for 1x3 collection", collectionName, clusterShape(1, 3));
+
+ addDocs(collectionName, 3, 1);
+
+ final Replica oldLeader = getCollectionState(collectionName).getSlice("shard1").getLeader();
+ assertEquals(cluster.getJettySolrRunner(0).getNodeName(), oldLeader.getNodeName());
+
+ if (onRestart) {
+ addDocToWhenOtherReplicasAreDown(collectionName, oldLeader, 4);
+ } else {
+ addDocWhenOtherReplicasAreNetworkPartitioned(collectionName, oldLeader, 4);
+ }
+
+ assertDocsExistInAllReplicas(getCollectionState(collectionName).getReplicas(), collectionName, 1, 4);
+
+ CollectionAdminRequest.deleteCollection(collectionName).process(cluster.getSolrClient());
+ }
+
+
+ /**
+ * Adding doc when replicas (not leader) are down,
+ * These replicas are out-of-sync hence they should not become leader even when current leader is DOWN.
+ * Leader should be on node - 0
+ */
+ private void addDocToWhenOtherReplicasAreDown(String collection, Replica leader, int docId) throws Exception {
+ ChaosMonkey.stop(cluster.getJettySolrRunner(1));
+ ChaosMonkey.stop(cluster.getJettySolrRunner(2));
+ waitForState("", collection, (liveNodes, collectionState) ->
+ collectionState.getSlice("shard1").getReplicas().stream()
+ .filter(replica -> replica.getState() == Replica.State.DOWN).count() == 2);
+
+ addDocs(collection, 1, docId);
+ ChaosMonkey.stop(cluster.getJettySolrRunner(0));
+ waitForState("", collection, (liveNodes, collectionState) -> collectionState.getReplica(leader.getName()).getState() == Replica.State.DOWN);
+
+ ChaosMonkey.start(cluster.getJettySolrRunner(1));
+ ChaosMonkey.start(cluster.getJettySolrRunner(2));
+ TimeOut timeOut = new TimeOut(10, TimeUnit.SECONDS, TimeSource.CURRENT_TIME);
+ while (!timeOut.hasTimedOut()) {
+ Replica newLeader = getCollectionState(collection).getSlice("shard1").getLeader();
+ if (newLeader != null && !newLeader.getName().equals(leader.getName()) && newLeader.getState() == Replica.State.ACTIVE) {
+ fail("Out of sync replica became leader " + newLeader);
+ }
+ }
+
+ ChaosMonkey.start(cluster.getJettySolrRunner(0));
+ waitForState("Timeout waiting for leader", collection, (liveNodes, collectionState) -> {
+ Replica newLeader = collectionState.getLeader("shard1");
+ return newLeader != null && newLeader.getName().equals(leader.getName());
+ });
+ waitForState("Timeout waiting for active collection", collection, clusterShape(1, 3));
+ }
+
+
+ /**
+ * Adding doc when replicas (not leader) are network partitioned with leader,
+ * These replicas are out-of-sync hence they should not become leader even when current leader is DOWN.
+ * Leader should be on node - 0
+ */
+ private void addDocWhenOtherReplicasAreNetworkPartitioned(String collection, Replica leader, int docId) throws Exception {
+ for (int i = 0; i < 3; i++) {
+ proxies.get(cluster.getJettySolrRunner(i)).close();
+ }
+ addDoc(collection, docId, cluster.getJettySolrRunner(0));
+ ChaosMonkey.stop(cluster.getJettySolrRunner(0));
+ for (int i = 1; i < 3; i++) {
+ proxies.get(cluster.getJettySolrRunner(i)).reopen();
+ }
+ waitForState("Timeout waiting for leader goes DOWN", collection, (liveNodes, collectionState)
+ -> collectionState.getReplica(leader.getName()).getState() == Replica.State.DOWN);
+
+ TimeOut timeOut = new TimeOut(10, TimeUnit.SECONDS, TimeSource.CURRENT_TIME);
+ while (!timeOut.hasTimedOut()) {
+ Replica newLeader = getCollectionState(collection).getLeader("shard1");
+ if (newLeader != null && !newLeader.getName().equals(leader.getName()) && newLeader.getState() == Replica.State.ACTIVE) {
+ fail("Out of sync replica became leader " + newLeader);
+ }
+ }
+
+ proxies.get(cluster.getJettySolrRunner(0)).reopen();
+ ChaosMonkey.start(cluster.getJettySolrRunner(0));
+ waitForState("Timeout waiting for leader", collection, (liveNodes, collectionState) -> {
+ Replica newLeader = collectionState.getLeader("shard1");
+ return newLeader != null && newLeader.getName().equals(leader.getName());
+ });
+ waitForState("Timeout waiting for active collection", collection, clusterShape(1, 3));
+ }
+
+ private void addDocs(String collection, int numDocs, int startId) throws SolrServerException, IOException {
+ List<SolrInputDocument> docs = new ArrayList<>(numDocs);
+ for (int i = 0; i < numDocs; i++) {
+ int id = startId + i;
+ docs.add(new SolrInputDocument("id", String.valueOf(id), "fieldName_s", String.valueOf(id)));
+ }
+ cluster.getSolrClient().add(collection, docs);
+ cluster.getSolrClient().commit(collection);
+ }
+
+ private void addDoc(String collection, int docId, JettySolrRunner solrRunner) throws IOException, SolrServerException {
+ try (HttpSolrClient solrClient = new HttpSolrClient.Builder(solrRunner.getBaseUrl().toString()).build()) {
+ solrClient.add(collection, new SolrInputDocument("id", String.valueOf(docId), "fieldName_s", String.valueOf(docId)));
+ }
+ }
+
+ private void assertDocsExistInAllReplicas(List<Replica> notLeaders,
+ String testCollectionName, int firstDocId, int lastDocId) throws Exception {
+ Replica leader =
+ cluster.getSolrClient().getZkStateReader().getLeaderRetry(testCollectionName, "shard1", 10000);
+ HttpSolrClient leaderSolr = getHttpSolrClient(leader, testCollectionName);
+ List<HttpSolrClient> replicas =
+ new ArrayList<HttpSolrClient>(notLeaders.size());
+
+ for (Replica r : notLeaders) {
+ replicas.add(getHttpSolrClient(r, testCollectionName));
+ }
+ try {
+ for (int d = firstDocId; d <= lastDocId; d++) {
+ String docId = String.valueOf(d);
+ assertDocExists(leaderSolr, testCollectionName, docId);
+ for (HttpSolrClient replicaSolr : replicas) {
+ assertDocExists(replicaSolr, testCollectionName, docId);
+ }
+ }
+ } finally {
+ if (leaderSolr != null) {
+ leaderSolr.close();
+ }
+ for (HttpSolrClient replicaSolr : replicas) {
+ replicaSolr.close();
+ }
+ }
+ }
+
+ private void assertDocExists(HttpSolrClient solr, String coll, String docId) throws Exception {
+ NamedList rsp = realTimeGetDocId(solr, docId);
+ String match = JSONTestUtil.matchObj("/id", rsp.get("doc"), docId);
+ assertTrue("Doc with id=" + docId + " not found in " + solr.getBaseURL()
+ + " due to: " + match + "; rsp="+rsp, match == null);
+ }
+
+ private NamedList realTimeGetDocId(HttpSolrClient solr, String docId) throws SolrServerException, IOException {
+ QueryRequest qr = new QueryRequest(params("qt", "/get", "id", docId, "distrib", "false"));
+ return solr.request(qr);
+ }
+
+ protected HttpSolrClient getHttpSolrClient(Replica replica, String coll) throws Exception {
+ ZkCoreNodeProps zkProps = new ZkCoreNodeProps(replica);
+ String url = zkProps.getBaseUrl() + "/" + coll;
+ return getHttpSolrClient(url);
+ }
+
+
+ protected JettySolrRunner getJettyForReplica(Replica replica) throws Exception {
+ String replicaBaseUrl = replica.getStr(ZkStateReader.BASE_URL_PROP);
+ assertNotNull(replicaBaseUrl);
+ URL baseUrl = new URL(replicaBaseUrl);
+
+ JettySolrRunner proxy = jettys.get(baseUrl.toURI());
+ assertNotNull("No proxy found for " + baseUrl + "!", proxy);
+ return proxy;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/9de4225e/solr/core/src/test/org/apache/solr/cloud/TestPullReplica.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/TestPullReplica.java b/solr/core/src/test/org/apache/solr/cloud/TestPullReplica.java
index 146f5c0..c10ec0f 100644
--- a/solr/core/src/test/org/apache/solr/cloud/TestPullReplica.java
+++ b/solr/core/src/test/org/apache/solr/cloud/TestPullReplica.java
@@ -425,10 +425,19 @@ public class TestPullReplica extends SolrCloudTestCase {
Replica pullReplica = docCollection.getSlice("shard1").getReplicas(EnumSet.of(Replica.Type.PULL)).get(0);
assertTrue(pullReplica.isActive(cluster.getSolrClient().getZkStateReader().getClusterState().getLiveNodes()));
+ long highestTerm = 0L;
+ try (ZkShardTerms zkShardTerms = new ZkShardTerms(collectionName, "shard1", zkClient())) {
+ highestTerm = zkShardTerms.getHighestTerm();
+ }
// add document, this should fail since there is no leader. Pull replica should not accept the update
expectThrows(SolrException.class, () ->
cluster.getSolrClient().add(collectionName, new SolrInputDocument("id", "2", "foo", "zoo"))
);
+ if (removeReplica) {
+ try(ZkShardTerms zkShardTerms = new ZkShardTerms(collectionName, "shard1", zkClient())) {
+ assertEquals(highestTerm, zkShardTerms.getHighestTerm());
+ }
+ }
// Also fails if I send the update to the pull replica explicitly
try (HttpSolrClient pullReplicaClient = getHttpSolrClient(docCollection.getReplicas(EnumSet.of(Replica.Type.PULL)).get(0).getCoreUrl())) {
@@ -436,6 +445,11 @@ public class TestPullReplica extends SolrCloudTestCase {
cluster.getSolrClient().add(collectionName, new SolrInputDocument("id", "2", "foo", "zoo"))
);
}
+ if (removeReplica) {
+ try(ZkShardTerms zkShardTerms = new ZkShardTerms(collectionName, "shard1", zkClient())) {
+ assertEquals(highestTerm, zkShardTerms.getHighestTerm());
+ }
+ }
// Queries should still work
waitForNumDocsInAllReplicas(1, docCollection.getReplicas(EnumSet.of(Replica.Type.PULL)));
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/9de4225e/solr/core/src/test/org/apache/solr/cloud/ZkShardTermsTest.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/ZkShardTermsTest.java b/solr/core/src/test/org/apache/solr/cloud/ZkShardTermsTest.java
index 037517b..d557b29 100644
--- a/solr/core/src/test/org/apache/solr/cloud/ZkShardTermsTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/ZkShardTermsTest.java
@@ -94,7 +94,7 @@ public class ZkShardTermsTest extends SolrCloudTestCase {
assertEquals(1L, rep1Terms.getTerm("rep1"));
waitFor(1L, () -> rep2Terms.getTerm("rep1"));
- rep2Terms.setEqualsToMax("rep2");
+ rep2Terms.setTermEqualsToLeader("rep2");
assertEquals(1L, rep2Terms.getTerm("rep2"));
rep2Terms.registerTerm("rep2");
assertEquals(1L, rep2Terms.getTerm("rep2"));
@@ -138,7 +138,7 @@ public class ZkShardTermsTest extends SolrCloudTestCase {
while (!stop.get()) {
try {
Thread.sleep(random().nextInt(200));
- zkShardTerms.setEqualsToMax(replica);
+ zkShardTerms.setTermEqualsToLeader(replica);
} catch (InterruptedException e) {
e.printStackTrace();
}
@@ -178,7 +178,7 @@ public class ZkShardTermsTest extends SolrCloudTestCase {
waitFor(1, count::get);
leaderTerms.ensureTermsIsHigher("leader", Collections.singleton("replica"));
waitFor(2, count::get);
- replicaTerms.setEqualsToMax("replica");
+ replicaTerms.setTermEqualsToLeader("replica");
waitFor(3, count::get);
assertEquals(0, replicaTerms.getNumListeners());
@@ -194,6 +194,41 @@ public class ZkShardTermsTest extends SolrCloudTestCase {
assertEquals(1L, terms.getTerm("leader").longValue());
}
+ public void testSetTermToZero() {
+ String collection = "setTermToZero";
+ ZkShardTerms terms = new ZkShardTerms(collection, "shard1", cluster.getZkClient());
+ terms.registerTerm("leader");
+ terms.registerTerm("replica");
+ terms.ensureTermsIsHigher("leader", Collections.singleton("replica"));
+ assertEquals(1L, terms.getTerm("leader"));
+ terms.setTermToZero("leader");
+ assertEquals(0L, terms.getTerm("leader"));
+ terms.close();
+ }
+
+ public void testReplicaCanBecomeLeader() throws InterruptedException {
+ String collection = "replicaCanBecomeLeader";
+ ZkShardTerms leaderTerms = new ZkShardTerms(collection, "shard1", cluster.getZkClient());
+ ZkShardTerms replicaTerms = new ZkShardTerms(collection, "shard1", cluster.getZkClient());
+ leaderTerms.registerTerm("leader");
+ replicaTerms.registerTerm("replica");
+
+ leaderTerms.ensureTermsIsHigher("leader", Collections.singleton("replica"));
+ waitFor(false, () -> replicaTerms.canBecomeLeader("replica"));
+ waitFor(true, () -> leaderTerms.skipSendingUpdatesTo("replica"));
+
+ replicaTerms.startRecovering("replica");
+ waitFor(false, () -> replicaTerms.canBecomeLeader("replica"));
+ waitFor(false, () -> leaderTerms.skipSendingUpdatesTo("replica"));
+
+ replicaTerms.doneRecovering("replica");
+ waitFor(true, () -> replicaTerms.canBecomeLeader("replica"));
+ waitFor(false, () -> leaderTerms.skipSendingUpdatesTo("replica"));
+
+ leaderTerms.close();
+ replicaTerms.close();
+ }
+
private <T> void waitFor(T expected, Supplier<T> supplier) throws InterruptedException {
TimeOut timeOut = new TimeOut(10, TimeUnit.SECONDS, new TimeSource.CurrentTimeSource());
while (!timeOut.hasTimedOut()) {