You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by da...@apache.org on 2018/04/17 13:17:27 UTC
lucene-solr:branch_7x: SOLR-12187: Replica should watch clusterstate
and unload itself if its entry is removed
Repository: lucene-solr
Updated Branches:
refs/heads/branch_7x 94adf9d2f -> 174c11f2c
SOLR-12187: Replica should watch clusterstate and unload itself if its entry is removed
Project: http://git-wip-us.apache.org/repos/asf/lucene-solr/repo
Commit: http://git-wip-us.apache.org/repos/asf/lucene-solr/commit/174c11f2
Tree: http://git-wip-us.apache.org/repos/asf/lucene-solr/tree/174c11f2
Diff: http://git-wip-us.apache.org/repos/asf/lucene-solr/diff/174c11f2
Branch: refs/heads/branch_7x
Commit: 174c11f2c49314160ba7e48dc5d796c3ceff8256
Parents: 94adf9d
Author: Cao Manh Dat <da...@apache.org>
Authored: Tue Apr 17 20:16:31 2018 +0700
Committer: Cao Manh Dat <da...@apache.org>
Committed: Tue Apr 17 20:17:14 2018 +0700
----------------------------------------------------------------------
solr/CHANGES.txt | 2 +
.../org/apache/solr/cloud/ZkController.java | 136 ++++++++++++++-----
.../java/org/apache/solr/core/ZkContainer.java | 16 ---
.../solr/handler/admin/CollectionsHandler.java | 41 +-----
.../apache/solr/cloud/DeleteReplicaTest.java | 84 ++++++++++--
.../org/apache/solr/cloud/ForceLeaderTest.java | 75 ----------
.../org/apache/solr/cloud/MoveReplicaTest.java | 17 ---
.../apache/solr/common/cloud/ZkStateReader.java | 8 +-
8 files changed, 186 insertions(+), 193 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/174c11f2/solr/CHANGES.txt
----------------------------------------------------------------------
diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt
index 945c9a2..e625baa 100644
--- a/solr/CHANGES.txt
+++ b/solr/CHANGES.txt
@@ -144,6 +144,8 @@ Bug Fixes
* SOLR-10169: PeerSync will hit an NPE on no response errors when looking for fingerprint. (Erick Erickson)
+* SOLR-12187: Replica should watch clusterstate and unload itself if its entry is removed (Cao Manh Dat)
+
Optimizations
----------------------
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/174c11f2/solr/core/src/java/org/apache/solr/cloud/ZkController.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/ZkController.java b/solr/core/src/java/org/apache/solr/cloud/ZkController.java
index 872a8b9..8cd02b6 100644
--- a/solr/core/src/java/org/apache/solr/cloud/ZkController.java
+++ b/solr/core/src/java/org/apache/solr/cloud/ZkController.java
@@ -38,6 +38,7 @@ import java.util.HashSet;
import java.util.List;
import java.util.Locale;
import java.util.Map;
+import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.Callable;
@@ -65,6 +66,7 @@ import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrException.ErrorCode;
import org.apache.solr.common.cloud.BeforeReconnect;
import org.apache.solr.common.cloud.ClusterState;
+import org.apache.solr.common.cloud.CollectionStateWatcher;
import org.apache.solr.common.cloud.DefaultConnectionStrategy;
import org.apache.solr.common.cloud.DefaultZkACLProvider;
import org.apache.solr.common.cloud.DefaultZkCredentialsProvider;
@@ -1033,42 +1035,39 @@ public class ZkController {
try {
// pre register has published our down state
final String baseUrl = getBaseUrl();
-
final CloudDescriptor cloudDesc = desc.getCloudDescriptor();
final String collection = cloudDesc.getCollectionName();
-
- final String coreZkNodeName = desc.getCloudDescriptor().getCoreNodeName();
+ final String shardId = cloudDesc.getShardId();
+ final String coreZkNodeName = cloudDesc.getCoreNodeName();
assert coreZkNodeName != null : "we should have a coreNodeName by now";
+ // check replica's existence in clusterstate first
+ try {
+ zkStateReader.waitForState(collection, Overseer.isLegacy(zkStateReader) ? 60000 : 100,
+ TimeUnit.MILLISECONDS, (liveNodes, collectionState) -> getReplicaOrNull(collectionState, shardId, coreZkNodeName) != null);
+ } catch (TimeoutException e) {
+ throw new SolrException(ErrorCode.SERVER_ERROR, "Error registering SolrCore, timeout waiting for replica present in clusterstate");
+ }
+ Replica replica = getReplicaOrNull(zkStateReader.getClusterState().getCollectionOrNull(collection), shardId, coreZkNodeName);
+ if (replica == null) {
+ throw new SolrException(ErrorCode.SERVER_ERROR, "Error registering SolrCore, replica is removed from clusterstate");
+ }
+
ZkShardTerms shardTerms = getShardTerms(collection, cloudDesc.getShardId());
// This flag is used for testing rolling updates and should be removed in SOLR-11812
boolean isRunningInNewLIR = "new".equals(desc.getCoreProperty("lirVersion", "new"));
- if (isRunningInNewLIR && cloudDesc.getReplicaType() != Type.PULL) {
+ if (isRunningInNewLIR && replica.getType() != Type.PULL) {
shardTerms.registerTerm(coreZkNodeName);
}
- String shardId = cloudDesc.getShardId();
- Map<String,Object> props = new HashMap<>();
- // we only put a subset of props into the leader node
- props.put(ZkStateReader.BASE_URL_PROP, baseUrl);
- props.put(ZkStateReader.CORE_NAME_PROP, coreName);
- props.put(ZkStateReader.NODE_NAME_PROP, getNodeName());
-
+
log.debug("Register replica - core:{} address:{} collection:{} shard:{}",
- coreName, baseUrl, cloudDesc.getCollectionName(), shardId);
-
- ZkNodeProps leaderProps = new ZkNodeProps(props);
+ coreName, baseUrl, collection, shardId);
try {
// If we're a preferred leader, insert ourselves at the head of the queue
- boolean joinAtHead = false;
- final DocCollection docCollection = zkStateReader.getClusterState().getCollectionOrNull(collection);
- Replica replica = (docCollection == null) ? null : docCollection.getReplica(coreZkNodeName);
- if (replica != null) {
- joinAtHead = replica.getBool(SliceMutator.PREFERRED_LEADER_PROP, false);
- }
- //TODO WHy would replica be null?
- if (replica == null || replica.getType() != Type.PULL) {
+ boolean joinAtHead = replica.getBool(SliceMutator.PREFERRED_LEADER_PROP, false);
+ if (replica.getType() != Type.PULL) {
joinElection(desc, afterExpiration, joinAtHead);
} else if (replica.getType() == Type.PULL) {
if (joinAtHead) {
@@ -1093,9 +1092,8 @@ public class ZkController {
String ourUrl = ZkCoreNodeProps.getCoreUrl(baseUrl, coreName);
log.debug("We are " + ourUrl + " and leader is " + leaderUrl);
boolean isLeader = leaderUrl.equals(ourUrl);
- Replica.Type replicaType = zkStateReader.getClusterState().getCollection(collection).getReplica(coreZkNodeName).getType();
- assert !(isLeader && replicaType == Type.PULL): "Pull replica became leader!";
-
+ assert !(isLeader && replica.getType() == Type.PULL) : "Pull replica became leader!";
+
try (SolrCore core = cc.getCore(desc.getName())) {
// recover from local transaction log and wait for it to complete before
@@ -1105,7 +1103,7 @@ public class ZkController {
// leader election perhaps?
UpdateLog ulog = core.getUpdateHandler().getUpdateLog();
- boolean isTlogReplicaAndNotLeader = replicaType == Replica.Type.TLOG && !isLeader;
+ boolean isTlogReplicaAndNotLeader = replica.getType() == Replica.Type.TLOG && !isLeader;
if (isTlogReplicaAndNotLeader) {
String commitVersion = ReplicateFromLeader.getCommitVersion(core);
if (commitVersion != null) {
@@ -1138,23 +1136,40 @@ public class ZkController {
publish(desc, Replica.State.ACTIVE);
}
- if (isRunningInNewLIR && replicaType != Type.PULL) {
+ if (isRunningInNewLIR && replica.getType() != Type.PULL) {
+ // the watcher is added to a set so multiple calls of this method will left only one watcher
shardTerms.addListener(new RecoveringCoreTermWatcher(core.getCoreDescriptor(), getCoreContainer()));
}
core.getCoreDescriptor().getCloudDescriptor().setHasRegistered(true);
+ } catch (Exception e) {
+ unregister(coreName, desc, false);
+ throw e;
}
// make sure we have an update cluster state right away
zkStateReader.forceUpdateCollection(collection);
+ // the watcher is added to a set so multiple calls of this method will left only one watcher
+ zkStateReader.registerCollectionStateWatcher(cloudDesc.getCollectionName(),
+ new UnloadCoreOnDeletedWatcher(coreZkNodeName, shardId, desc.getName()));
return shardId;
- } catch (Exception e) {
- unregister(coreName, desc, false);
- throw e;
} finally {
MDCLoggingContext.clear();
}
}
+ private Replica getReplicaOrNull(DocCollection docCollection, String shard, String coreNodeName) {
+ if (docCollection == null) return null;
+
+ Slice slice = docCollection.getSlice(shard);
+ if (slice == null) return null;
+
+ Replica replica = slice.getReplica(coreNodeName);
+ if (replica == null) return null;
+ if (!getNodeName().equals(replica.getNodeName())) return null;
+
+ return replica;
+ }
+
public void startReplicationFromLeader(String coreName, boolean switchTransactionLog) throws InterruptedException {
log.info("{} starting background replication from leader", coreName);
ReplicateFromLeader replicateFromLeader = new ReplicateFromLeader(cc, coreName);
@@ -1359,11 +1374,7 @@ public class ZkController {
}
public void publish(final CoreDescriptor cd, final Replica.State state) throws Exception {
- publish(cd, state, true);
- }
-
- public void publish(final CoreDescriptor cd, final Replica.State state, boolean updateLastState) throws Exception {
- publish(cd, state, updateLastState, false);
+ publish(cd, state, true, false);
}
/**
@@ -1430,6 +1441,9 @@ public class ZkController {
props.put(ZkStateReader.SHARD_ID_PROP, cd.getCloudDescriptor().getShardId());
props.put(ZkStateReader.COLLECTION_PROP, collection);
props.put(ZkStateReader.REPLICA_TYPE, cd.getCloudDescriptor().getReplicaType().toString());
+ if (!Overseer.isLegacy(zkStateReader)) {
+ props.put(ZkStateReader.FORCE_SET_STATE_PROP, "false");
+ }
if (numShards != null) {
props.put(ZkStateReader.NUM_SHARDS_PROP, numShards.toString());
}
@@ -1521,7 +1535,6 @@ public class ZkController {
}
}
CloudDescriptor cloudDescriptor = cd.getCloudDescriptor();
- zkStateReader.unregisterCore(cloudDescriptor.getCollectionName());
if (removeCoreFromZk) {
ZkNodeProps m = new ZkNodeProps(Overseer.QUEUE_OPERATION,
OverseerAction.DELETECORE.toLower(), ZkStateReader.CORE_NAME_PROP, coreName,
@@ -1653,7 +1666,6 @@ public class ZkController {
"Collection {} not visible yet, but flagging it so a watch is registered when it becomes visible" :
"Registering watch for collection {}",
collectionName);
- zkStateReader.registerCore(collectionName);
} catch (KeeperException e) {
log.error("", e);
throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR, "", e);
@@ -2707,6 +2719,56 @@ public class ZkController {
};
}
+ private class UnloadCoreOnDeletedWatcher implements CollectionStateWatcher {
+ String coreNodeName;
+ String shard;
+ String coreName;
+
+ public UnloadCoreOnDeletedWatcher(String coreNodeName, String shard, String coreName) {
+ this.coreNodeName = coreNodeName;
+ this.shard = shard;
+ this.coreName = coreName;
+ }
+
+ @Override
+ // synchronized due to SOLR-11535
+ public synchronized boolean onStateChanged(Set<String> liveNodes, DocCollection collectionState) {
+ if (getCoreContainer().getCoreDescriptor(coreName) == null) return true;
+
+ boolean replicaRemoved = getReplicaOrNull(collectionState, shard, coreNodeName) == null;
+ if (replicaRemoved) {
+ try {
+ log.info("Replica {} removed from clusterstate, remove it.", coreName);
+ getCoreContainer().unload(coreName, true, true, true);
+ } catch (SolrException e) {
+ if (!e.getMessage().contains("Cannot unload non-existent core")) {
+ // no need to log if the core was already unloaded
+ log.warn("Failed to unregister core:{}", coreName, e);
+ }
+ } catch (Exception e) {
+ log.warn("Failed to unregister core:{}", coreName, e);
+ }
+ }
+ return replicaRemoved;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ UnloadCoreOnDeletedWatcher that = (UnloadCoreOnDeletedWatcher) o;
+ return Objects.equals(coreNodeName, that.coreNodeName) &&
+ Objects.equals(shard, that.shard) &&
+ Objects.equals(coreName, that.coreName);
+ }
+
+ @Override
+ public int hashCode() {
+
+ return Objects.hash(coreNodeName, shard, coreName);
+ }
+ }
+
/**
* Thrown during leader initiated recovery process if current node is not leader
*/
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/174c11f2/solr/core/src/java/org/apache/solr/core/ZkContainer.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/core/ZkContainer.java b/solr/core/src/java/org/apache/solr/core/ZkContainer.java
index f89367f..34e5764 100644
--- a/solr/core/src/java/org/apache/solr/core/ZkContainer.java
+++ b/solr/core/src/java/org/apache/solr/core/ZkContainer.java
@@ -222,22 +222,6 @@ public class ZkContainer {
public ZkController getZkController() {
return zkController;
}
-
- public void publishCoresAsDown(List<SolrCore> cores) {
-
- for (SolrCore core : cores) {
- try {
- zkController.publish(core.getCoreDescriptor(), Replica.State.DOWN);
- } catch (KeeperException e) {
- ZkContainer.log.error("", e);
- } catch (InterruptedException e) {
- Thread.interrupted();
- ZkContainer.log.error("", e);
- } catch (Exception e) {
- ZkContainer.log.error("", e);
- }
- }
- }
public void close() {
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/174c11f2/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java b/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java
index 5f4bc01..c02271e 100644
--- a/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java
+++ b/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java
@@ -40,7 +40,6 @@ import org.apache.solr.api.Api;
import org.apache.solr.client.solrj.SolrResponse;
import org.apache.solr.client.solrj.impl.HttpSolrClient;
import org.apache.solr.client.solrj.impl.HttpSolrClient.Builder;
-import org.apache.solr.client.solrj.request.CoreAdminRequest;
import org.apache.solr.client.solrj.request.CoreAdminRequest.RequestSyncShard;
import org.apache.solr.client.solrj.response.RequestStatusState;
import org.apache.solr.client.solrj.util.SolrIdentifierValidator;
@@ -282,7 +281,7 @@ public class CollectionsHandler extends RequestHandlerBase implements Permission
* In SOLR-11739 we change the way the async IDs are checked to decide if one has
* already been used or not. For backward compatibility, we continue to check in the
* old way (meaning, in all the queues) for now. This extra check should be removed
- * in Solr 9
+ * in Solr 9
*/
private static final boolean CHECK_ASYNC_ID_BACK_COMPAT_LOCATIONS = true;
@@ -306,7 +305,7 @@ public class CollectionsHandler extends RequestHandlerBase implements Permission
}
NamedList<String> r = new NamedList<>();
-
+
if (CHECK_ASYNC_ID_BACK_COMPAT_LOCATIONS && (
coreContainer.getZkController().getOverseerCompletedMap().contains(asyncId) ||
coreContainer.getZkController().getOverseerFailureMap().contains(asyncId) ||
@@ -1162,26 +1161,15 @@ public class CollectionsHandler extends RequestHandlerBase implements Permission
// Wait till we have an active leader
boolean success = false;
- for (int i = 0; i < 10; i++) {
- ZkCoreNodeProps zombieLeaderProps = getZombieLeader(zkController, collectionName, sliceId);
- if (zombieLeaderProps != null) {
- log.warn("A replica {} on node {} won the leader election, but not exist in clusterstate, " +
- "remove it and waiting for another round of election",
- zombieLeaderProps.getCoreName(), zombieLeaderProps.getNodeName());
- try (HttpSolrClient solrClient = new HttpSolrClient.Builder(zombieLeaderProps.getBaseUrl()).build()) {
- CoreAdminRequest.unloadCore(zombieLeaderProps.getCoreName(), solrClient);
- }
- // waiting for another election round
- i = 0;
- }
- clusterState = zkController.getClusterState();
+ for (int i = 0; i < 9; i++) {
+ Thread.sleep(5000);
+ clusterState = handler.coreContainer.getZkController().getClusterState();
collection = clusterState.getCollection(collectionName);
slice = collection.getSlice(sliceId);
if (slice.getLeader() != null && slice.getLeader().getState() == State.ACTIVE) {
success = true;
break;
}
- Thread.sleep(5000);
log.warn("Force leader attempt {}. Waiting 5 secs for an active leader. State of the slice: {}", (i + 1), slice);
}
@@ -1198,25 +1186,6 @@ public class CollectionsHandler extends RequestHandlerBase implements Permission
}
}
- /**
- * Zombie leader is a replica won the election but does not exist in clusterstate
- * @return null if the zombie leader does not exist
- */
- private static ZkCoreNodeProps getZombieLeader(ZkController zkController, String collection, String shardId) {
- try {
- ZkCoreNodeProps leaderProps = zkController.getLeaderProps(collection, shardId, 1000);
- DocCollection docCollection = zkController.getClusterState().getCollection(collection);
- Replica replica = docCollection.getReplica(leaderProps.getNodeProps().getStr(ZkStateReader.CORE_NODE_NAME_PROP));
- if (replica == null) return leaderProps;
- if (!replica.getNodeName().equals(leaderProps.getNodeName())) {
- return leaderProps;
- }
- return null;
- } catch (Exception e) {
- return null;
- }
- }
-
public static void waitForActiveCollection(String collectionName, CoreContainer cc, SolrResponse createCollResponse)
throws KeeperException, InterruptedException {
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/174c11f2/solr/core/src/test/org/apache/solr/cloud/DeleteReplicaTest.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/DeleteReplicaTest.java b/solr/core/src/test/org/apache/solr/cloud/DeleteReplicaTest.java
index d9dbba0..8c11713 100644
--- a/solr/core/src/test/org/apache/solr/cloud/DeleteReplicaTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/DeleteReplicaTest.java
@@ -22,6 +22,7 @@ import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.EnumSet;
import java.util.List;
+import java.util.Set;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
@@ -34,11 +35,13 @@ import org.apache.solr.client.solrj.request.CoreStatus;
import org.apache.solr.cloud.overseer.OverseerAction;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrInputDocument;
+import org.apache.solr.common.cloud.CollectionStateWatcher;
import org.apache.solr.common.cloud.DocCollection;
import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.cloud.Slice;
import org.apache.solr.common.cloud.ZkNodeProps;
import org.apache.solr.common.cloud.ZkStateReader;
+import org.apache.solr.common.cloud.ZkStateReaderAccessor;
import org.apache.solr.common.util.TimeSource;
import org.apache.solr.common.util.Utils;
import org.apache.solr.core.ZkContainer;
@@ -86,12 +89,17 @@ public class DeleteReplicaTest extends SolrCloudTestCase {
assertTrue("Unexpected error message: " + e.getMessage(), e.getMessage().contains("state is 'active'"));
assertTrue("Data directory for " + replica.getName() + " should not have been deleted", Files.exists(dataDir));
+ JettySolrRunner replicaJetty = cluster.getReplicaJetty(replica);
+ ZkStateReaderAccessor accessor = new ZkStateReaderAccessor(replicaJetty.getCoreContainer().getZkController().getZkStateReader());
+ Set<CollectionStateWatcher> watchers = accessor.getStateWatchers(collectionName);
CollectionAdminRequest.deleteReplica(collectionName, shard.getName(), replica.getName())
.process(cluster.getSolrClient());
waitForState("Expected replica " + replica.getName() + " to have been removed", collectionName, (n, c) -> {
Slice testShard = c.getSlice(shard.getName());
return testShard.getReplica(replica.getName()) == null;
});
+ // the core no longer watch collection state since it was removed
+ assertEquals(watchers.size() - 1, accessor.getStateWatchers(collectionName).size());
assertFalse("Data directory for " + replica.getName() + " should have been removed", Files.exists(dataDir));
@@ -165,8 +173,63 @@ public class DeleteReplicaTest extends SolrCloudTestCase {
}
@Test
+ public void deleteReplicaFromClusterState() throws Exception {
+ deleteReplicaFromClusterState("true");
+ deleteReplicaFromClusterState("false");
+ CollectionAdminRequest.setClusterProperty(ZkStateReader.LEGACY_CLOUD, null).process(cluster.getSolrClient());
+ }
+
+ public void deleteReplicaFromClusterState(String legacyCloud) throws Exception {
+ CollectionAdminRequest.setClusterProperty(ZkStateReader.LEGACY_CLOUD, legacyCloud).process(cluster.getSolrClient());
+ final String collectionName = "deleteFromClusterState_"+legacyCloud;
+ CollectionAdminRequest.createCollection(collectionName, "conf", 1, 3)
+ .process(cluster.getSolrClient());
+ cluster.getSolrClient().add(collectionName, new SolrInputDocument("id", "1"));
+ cluster.getSolrClient().add(collectionName, new SolrInputDocument("id", "2"));
+ cluster.getSolrClient().commit(collectionName);
+
+ Slice shard = getCollectionState(collectionName).getSlice("shard1");
+ Replica replica = getRandomReplica(shard);
+ JettySolrRunner replicaJetty = cluster.getReplicaJetty(replica);
+ ZkStateReaderAccessor accessor = new ZkStateReaderAccessor(replicaJetty.getCoreContainer().getZkController().getZkStateReader());
+ Set<CollectionStateWatcher> watchers = accessor.getStateWatchers(collectionName);
+
+ ZkNodeProps m = new ZkNodeProps(
+ Overseer.QUEUE_OPERATION, OverseerAction.DELETECORE.toLower(),
+ ZkStateReader.CORE_NAME_PROP, replica.getCoreName(),
+ ZkStateReader.NODE_NAME_PROP, replica.getNodeName(),
+ ZkStateReader.COLLECTION_PROP, collectionName,
+ ZkStateReader.CORE_NODE_NAME_PROP, replica.getName(),
+ ZkStateReader.BASE_URL_PROP, replica.getBaseUrl());
+ Overseer.getStateUpdateQueue(cluster.getZkClient()).offer(Utils.toJSON(m));
+
+ waitForState("Timeout waiting for replica get deleted", collectionName,
+ (liveNodes, collectionState) -> collectionState.getSlice("shard1").getReplicas().size() == 2);
+
+ TimeOut timeOut = new TimeOut(60, TimeUnit.SECONDS, TimeSource.NANO_TIME);
+ timeOut.waitFor("Waiting for replica get unloaded", () ->
+ replicaJetty.getCoreContainer().getCoreDescriptor(replica.getCoreName()) == null
+ );
+ // the core no longer watch collection state since it was removed
+ timeOut = new TimeOut(60, TimeUnit.SECONDS, TimeSource.NANO_TIME);
+ timeOut.waitFor("Waiting for watcher get removed", () ->
+ watchers.size() - 1 == accessor.getStateWatchers(collectionName).size()
+ );
+
+ CollectionAdminRequest.deleteCollection(collectionName).process(cluster.getSolrClient());
+ }
+
+ @Test
+ @Slow
public void raceConditionOnDeleteAndRegisterReplica() throws Exception {
- final String collectionName = "raceDeleteReplica";
+ raceConditionOnDeleteAndRegisterReplica("true");
+ raceConditionOnDeleteAndRegisterReplica("false");
+ CollectionAdminRequest.setClusterProperty(ZkStateReader.LEGACY_CLOUD, null).process(cluster.getSolrClient());
+ }
+
+ public void raceConditionOnDeleteAndRegisterReplica(String legacyCloud) throws Exception {
+ CollectionAdminRequest.setClusterProperty(ZkStateReader.LEGACY_CLOUD, legacyCloud).process(cluster.getSolrClient());
+ final String collectionName = "raceDeleteReplica_"+legacyCloud;
CollectionAdminRequest.createCollection(collectionName, "conf", 1, 2)
.process(cluster.getSolrClient());
waitForState("Expected 1x2 collections", collectionName, clusterShape(1, 2));
@@ -246,15 +309,16 @@ public class DeleteReplicaTest extends SolrCloudTestCase {
ZkContainer.testing_beforeRegisterInZk = null;
}
-
- waitForState("Timeout for replica:"+replica1.getName()+" register itself as DOWN after failed to register", collectionName, (liveNodes, collectionState) -> {
- Slice shard = collectionState.getSlice("shard1");
- Replica replica = shard.getReplica(replica1.getName());
- return replica != null && replica.getState() == DOWN;
- });
-
- CollectionAdminRequest.addReplicaToShard(collectionName, "shard1")
- .process(cluster.getSolrClient());
+ while (true) {
+ try {
+ CollectionAdminRequest.addReplicaToShard(collectionName, "shard1")
+ .process(cluster.getSolrClient());
+ break;
+ } catch (Exception e) {
+ // expected, when the node is not fully started
+ Thread.sleep(500);
+ }
+ }
waitForState("Expected 1x2 collections", collectionName, clusterShape(1, 2));
String leaderJettyNodeName = leaderJetty.getNodeName();
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/174c11f2/solr/core/src/test/org/apache/solr/cloud/ForceLeaderTest.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/ForceLeaderTest.java b/solr/core/src/test/org/apache/solr/cloud/ForceLeaderTest.java
index beaeb24..013434c 100644
--- a/solr/core/src/test/org/apache/solr/cloud/ForceLeaderTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/ForceLeaderTest.java
@@ -63,81 +63,6 @@ public class ForceLeaderTest extends HttpPartitionTest {
}
/**
- * Tests that FORCELEADER can get an active leader even in the case there are a replica won the election but not present in clusterstate
- */
- @Test
- @Slow
- public void testZombieLeader() throws Exception {
- String testCollectionName = "forceleader_zombie_leader_collection";
- createCollection(testCollectionName, "conf1", 1, 3, 1);
- cloudClient.setDefaultCollection(testCollectionName);
- try {
- List<Replica> notLeaders = ensureAllReplicasAreActive(testCollectionName, SHARD1, 1, 3, maxWaitSecsToSeeAllActive);
- assertEquals("Expected 2 replicas for collection " + testCollectionName
- + " but found " + notLeaders.size() + "; clusterState: "
- + printClusterStateInfo(testCollectionName), 2, notLeaders.size());
- List<JettySolrRunner> notLeaderJetties = notLeaders.stream().map(rep -> getJettyOnPort(getReplicaPort(rep)))
- .collect(Collectors.toList());
-
- Replica leader = cloudClient.getZkStateReader().getLeaderRetry(testCollectionName, SHARD1);
- JettySolrRunner leaderJetty = getJettyOnPort(getReplicaPort(leader));
-
- // remove leader from clusterstate
- ZkNodeProps m = new ZkNodeProps(
- Overseer.QUEUE_OPERATION, OverseerAction.DELETECORE.toLower(),
- ZkStateReader.CORE_NAME_PROP, leader.getCoreName(),
- ZkStateReader.NODE_NAME_PROP, leader.getNodeName(),
- ZkStateReader.COLLECTION_PROP, testCollectionName,
- ZkStateReader.CORE_NODE_NAME_PROP, leader.getName(),
- ZkStateReader.BASE_URL_PROP, leader.getBaseUrl());
- Overseer.getStateUpdateQueue(cloudClient.getZkStateReader().getZkClient()).offer(Utils.toJSON(m));
-
- boolean restartOtherReplicas = random().nextBoolean();
- log.info("Starting test with restartOtherReplicas:{}", restartOtherReplicas);
- if (restartOtherReplicas) {
- for (JettySolrRunner notLeaderJetty : notLeaderJetties) {
- notLeaderJetty.stop();
- }
- }
- cloudClient.waitForState(testCollectionName, 30, TimeUnit.SECONDS,
- (liveNodes, collectionState) -> collectionState.getReplicas().size() == 2);
-
- if (restartOtherReplicas) {
- for (JettySolrRunner notLeaderJetty : notLeaderJetties) {
- notLeaderJetty.start();
- }
- }
-
- log.info("Before forcing leader: " + cloudClient.getZkStateReader().getClusterState()
- .getCollection(testCollectionName).getSlice(SHARD1));
- doForceLeader(cloudClient, testCollectionName, SHARD1);
-
- // By now we have an active leader. Wait for recoveries to begin
- waitForRecoveriesToFinish(testCollectionName, cloudClient.getZkStateReader(), true);
- ClusterState clusterState = cloudClient.getZkStateReader().getClusterState();
- log.info("After forcing leader: " + clusterState.getCollection(testCollectionName).getSlice(SHARD1));
-
- assertNull("Expected zombie leader get deleted", leaderJetty.getCoreContainer().getCore(leader.getCoreName()));
- Replica newLeader = clusterState.getCollectionOrNull(testCollectionName).getSlice(SHARD1).getLeader();
- assertNotNull(newLeader);
- assertEquals(State.ACTIVE, newLeader.getState());
-
- int numActiveReplicas = getNumberOfActiveReplicas(clusterState, testCollectionName, SHARD1);
- assertEquals(2, numActiveReplicas);
-
- // Assert that indexing works again
- sendDoc(1);
- cloudClient.commit();
-
- assertDocsExistInAllReplicas(notLeaders, testCollectionName, 1, 1);
- } finally {
- log.info("Cleaning up after the test.");
- // try to clean up
- attemptCollectionDelete(cloudClient, testCollectionName);
- }
- }
-
- /**
* Tests that FORCELEADER can get an active leader even only replicas with term lower than leader's term are live
*/
@Test
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/174c11f2/solr/core/src/test/org/apache/solr/cloud/MoveReplicaTest.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/MoveReplicaTest.java b/solr/core/src/test/org/apache/solr/cloud/MoveReplicaTest.java
index 395b1d8..6fcbd80 100644
--- a/solr/core/src/test/org/apache/solr/cloud/MoveReplicaTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/MoveReplicaTest.java
@@ -58,9 +58,6 @@ import org.slf4j.LoggerFactory;
public class MoveReplicaTest extends SolrCloudTestCase {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
- private static ZkStateReaderAccessor accessor;
- private static int overseerLeaderIndex;
-
// used by MoveReplicaHDFSTest
protected boolean inPlaceMove = true;
@@ -76,14 +73,12 @@ public class MoveReplicaTest extends SolrCloudTestCase {
JettySolrRunner jetty = cluster.getJettySolrRunner(i);
if (jetty.getNodeName().equals(overseerLeader)) {
overseerJetty = jetty;
- overseerLeaderIndex = i;
break;
}
}
if (overseerJetty == null) {
fail("no overseer leader!");
}
- accessor = new ZkStateReaderAccessor(overseerJetty.getCoreContainer().getZkController().getZkStateReader());
}
protected String getSolrXml() {
@@ -135,8 +130,6 @@ public class MoveReplicaTest extends SolrCloudTestCase {
}
}
- Set<CollectionStateWatcher> watchers = new HashSet<>(accessor.getStateWatchers(coll));
-
int sourceNumCores = getNumOfCores(cloudClient, replica.getNodeName(), coll);
int targetNumCores = getNumOfCores(cloudClient, targetNode, coll);
@@ -199,9 +192,6 @@ public class MoveReplicaTest extends SolrCloudTestCase {
assertEquals(100, cluster.getSolrClient().query(coll, new SolrQuery("*:*")).getResults().getNumFound());
- Set<CollectionStateWatcher> newWatchers = new HashSet<>(accessor.getStateWatchers(coll));
- assertEquals(watchers, newWatchers);
-
moveReplica = createMoveReplicaRequest(coll, replica, targetNode, shardId);
moveReplica.setInPlaceMove(inPlaceMove);
moveReplica.process(cloudClient);
@@ -241,8 +231,6 @@ public class MoveReplicaTest extends SolrCloudTestCase {
}
}
assertTrue("replica never fully recovered", recovered);
- newWatchers = new HashSet<>(accessor.getStateWatchers(coll));
- assertEquals(watchers, newWatchers);
assertEquals(100, cluster.getSolrClient().query(coll, new SolrQuery("*:*")).getResults().getNumFound());
}
@@ -256,8 +244,6 @@ public class MoveReplicaTest extends SolrCloudTestCase {
CloudSolrClient cloudClient = cluster.getSolrClient();
- Set<CollectionStateWatcher> watchers = new HashSet<>(accessor.getStateWatchers(coll));
-
CollectionAdminRequest.Create create = CollectionAdminRequest.createCollection(coll, "conf1", 2, REPLICATION);
create.setAutoAddReplicas(false);
cloudClient.request(create);
@@ -301,9 +287,6 @@ public class MoveReplicaTest extends SolrCloudTestCase {
}
assertFalse(success);
- Set<CollectionStateWatcher> newWatchers = new HashSet<>(accessor.getStateWatchers(coll));
- assertEquals(watchers, newWatchers);
-
log.info("--- current collection state: " + cloudClient.getZkStateReader().getClusterState().getCollection(coll));
assertEquals(100, cluster.getSolrClient().query(coll, new SolrQuery("*:*")).getResults().getNumFound());
}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/174c11f2/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java b/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java
index b0b591a..7d5401d 100644
--- a/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java
+++ b/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java
@@ -1572,8 +1572,12 @@ public class ZkStateReader implements Closeable {
return v;
});
for (CollectionStateWatcher watcher : watchers) {
- if (watcher.onStateChanged(liveNodes, collectionState)) {
- removeCollectionStateWatcher(collection, watcher);
+ try {
+ if (watcher.onStateChanged(liveNodes, collectionState)) {
+ removeCollectionStateWatcher(collection, watcher);
+ }
+ } catch (Throwable throwable) {
+ LOG.warn("Error on calling watcher", throwable);
}
}
}