You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by da...@apache.org on 2018/01/11 07:42:25 UTC
lucene-solr:jira/solr-11702: SOLR-11702: Support ForceLeaderAPI
Repository: lucene-solr
Updated Branches:
refs/heads/jira/solr-11702 583a4a549 -> c16385142
SOLR-11702: Support ForceLeaderAPI
Project: http://git-wip-us.apache.org/repos/asf/lucene-solr/repo
Commit: http://git-wip-us.apache.org/repos/asf/lucene-solr/commit/c1638514
Tree: http://git-wip-us.apache.org/repos/asf/lucene-solr/tree/c1638514
Diff: http://git-wip-us.apache.org/repos/asf/lucene-solr/diff/c1638514
Branch: refs/heads/jira/solr-11702
Commit: c163851429779c2e25423da34103dc43b7b493b9
Parents: 583a4a5
Author: Cao Manh Dat <da...@apache.org>
Authored: Thu Jan 11 14:42:10 2018 +0700
Committer: Cao Manh Dat <da...@apache.org>
Committed: Thu Jan 11 14:42:10 2018 +0700
----------------------------------------------------------------------
.../org/apache/solr/cloud/ElectionContext.java | 17 +--
.../org/apache/solr/cloud/ZkShardTerms.java | 7 +-
.../solr/handler/admin/CollectionsHandler.java | 45 ++++--
.../org/apache/solr/cloud/ForceLeaderTest.java | 145 +++++++++++++++++++
4 files changed, 192 insertions(+), 22 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/c1638514/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java b/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java
index d6efd1d..6302fe2 100644
--- a/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java
+++ b/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java
@@ -313,14 +313,6 @@ final class ShardLeaderElectionContext extends ShardLeaderElectionContextBase {
}
}
coreNodeName = core.getCoreDescriptor().getCloudDescriptor().getCoreNodeName();
- if (zkController.getShardTerms(collection, shardId).registered(coreNodeName)
- && !zkController.getShardTerms(collection, shardId).canBecomeLeader(coreNodeName)) {
- // no need to do recovery here, we already has term less than leader term, the recovery will be triggered later
- log.info("Can not become leader, this core has term less than leader's term");
- cancelElection();
- leaderElector.joinElection(this, true);
- return;
- }
MDCLoggingContext.setCore(core);
lt = core.getUpdateHandler().getSolrCoreState().getLeaderThrottle();
}
@@ -765,7 +757,14 @@ final class ShardLeaderElectionContext extends ShardLeaderElectionContextBase {
// to make sure others participate in sync and leader election, we can be leader
return true;
}
-
+
+ String coreNodeName = core.getCoreDescriptor().getCloudDescriptor().getCoreNodeName();
+ if (zkController.getShardTerms(collection, shardId).registered(coreNodeName)
+ && !zkController.getShardTerms(collection, shardId).canBecomeLeader(coreNodeName)) {
+ log.info("Can't become leader, term of replica {} less than leader", coreNodeName);
+ return false;
+ }
+
if (core.getCoreDescriptor().getCloudDescriptor().getLastPublished() == Replica.State.ACTIVE) {
log.debug("My last published State was Active, it's okay to be the leader.");
return true;
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/c1638514/solr/core/src/java/org/apache/solr/cloud/ZkShardTerms.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/ZkShardTerms.java b/solr/core/src/java/org/apache/solr/cloud/ZkShardTerms.java
index 69ae990..335a07b 100644
--- a/solr/core/src/java/org/apache/solr/cloud/ZkShardTerms.java
+++ b/solr/core/src/java/org/apache/solr/cloud/ZkShardTerms.java
@@ -181,13 +181,18 @@ public class ZkShardTerms implements AutoCloseable{
* Set a replica's term equals to leader's term
* @param coreNodeName of the replica
*/
- void setEqualsToMax(String coreNodeName) {
+ public void setEqualsToMax(String coreNodeName) {
Terms newTerms;
while ( (newTerms = terms.setEqualsToMax(coreNodeName)) != null) {
if (forceSaveTerms(newTerms)) break;
}
}
+ public long getTerm(String coreNodeName) {
+ Long term = terms.getTerm(coreNodeName);
+ return term == null? -1 : term;
+ }
+
// package private for testing, only used by tests
int getNumListeners() {
synchronized (listeners) {
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/c1638514/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java b/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java
index aba32b4..c05afa4 100644
--- a/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java
+++ b/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java
@@ -28,8 +28,10 @@ import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Optional;
+import java.util.OptionalLong;
import java.util.Set;
import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
import com.google.common.collect.ImmutableSet;
import org.apache.commons.io.IOUtils;
@@ -47,6 +49,7 @@ import org.apache.solr.cloud.OverseerSolrResponse;
import org.apache.solr.cloud.OverseerTaskQueue;
import org.apache.solr.cloud.OverseerTaskQueue.QueueEvent;
import org.apache.solr.cloud.ZkController;
+import org.apache.solr.cloud.ZkShardTerms;
import org.apache.solr.cloud.overseer.SliceMutator;
import org.apache.solr.cloud.rule.ReplicaAssigner;
import org.apache.solr.cloud.rule.Rule;
@@ -959,7 +962,8 @@ public class CollectionsHandler extends RequestHandlerBase implements Permission
}
private static void forceLeaderElection(SolrQueryRequest req, CollectionsHandler handler) {
- ClusterState clusterState = handler.coreContainer.getZkController().getClusterState();
+ ZkController zkController = handler.coreContainer.getZkController();
+ ClusterState clusterState = zkController.getClusterState();
String collectionName = req.getParams().required().get(COLLECTION_PROP);
String sliceId = req.getParams().required().get(SHARD_ID_PROP);
@@ -971,7 +975,7 @@ public class CollectionsHandler extends RequestHandlerBase implements Permission
"No shard with name " + sliceId + " exists for collection " + collectionName);
}
- try {
+ try (ZkShardTerms zkShardTerms = new ZkShardTerms(collectionName, slice.getName(), zkController.getZkClient())) {
// if an active replica is the leader, then all is fine already
Replica leader = slice.getLeader();
if (leader != null && leader.getState() == State.ACTIVE) {
@@ -989,20 +993,37 @@ public class CollectionsHandler extends RequestHandlerBase implements Permission
handler.coreContainer.getZkController().getZkClient().clean(lirPath);
}
+ final Set<String> liveNodes = clusterState.getLiveNodes();
+ List<Replica> liveReplicas = slice.getReplicas().stream()
+ .filter(rep -> liveNodes.contains(rep.getNodeName())).collect(Collectors.toList());
+ boolean shouldIncreaseReplicaTerms = liveReplicas.stream()
+ .noneMatch(rep -> zkShardTerms.registered(rep.getName()) && zkShardTerms.canBecomeLeader(rep.getName()));
+ // we won't increase replica's terms if exist a live replica with term equals to leader
+ if (shouldIncreaseReplicaTerms) {
+ OptionalLong optionalMaxTerm = liveReplicas.stream()
+ .filter(rep -> zkShardTerms.registered(rep.getName()))
+ .mapToLong(rep -> zkShardTerms.getTerm(rep.getName()))
+ .max();
+ // and increase terms of replicas less out-of-sync
+ if (optionalMaxTerm.isPresent()) {
+ liveReplicas.stream()
+ .filter(rep -> zkShardTerms.getTerm(rep.getName()) == optionalMaxTerm.getAsLong())
+ .forEach(rep -> zkShardTerms.setEqualsToMax(rep.getName()));
+ }
+ }
+
// Call all live replicas to prepare themselves for leadership, e.g. set last published
// state to active.
- for (Replica rep : slice.getReplicas()) {
- if (clusterState.getLiveNodes().contains(rep.getNodeName())) {
- ShardHandler shardHandler = handler.coreContainer.getShardHandlerFactory().getShardHandler();
+ for (Replica rep : liveReplicas) {
+ ShardHandler shardHandler = handler.coreContainer.getShardHandlerFactory().getShardHandler();
- ModifiableSolrParams params = new ModifiableSolrParams();
- params.set(CoreAdminParams.ACTION, CoreAdminAction.FORCEPREPAREFORLEADERSHIP.toString());
- params.set(CoreAdminParams.CORE, rep.getStr("core"));
- String nodeName = rep.getNodeName();
+ ModifiableSolrParams params = new ModifiableSolrParams();
+ params.set(CoreAdminParams.ACTION, CoreAdminAction.FORCEPREPAREFORLEADERSHIP.toString());
+ params.set(CoreAdminParams.CORE, rep.getStr("core"));
+ String nodeName = rep.getNodeName();
- OverseerCollectionMessageHandler.sendShardRequest(nodeName, params, shardHandler, null, null,
- CommonParams.CORES_HANDLER_PATH, handler.coreContainer.getZkController().getZkStateReader()); // synchronous request
- }
+ OverseerCollectionMessageHandler.sendShardRequest(nodeName, params, shardHandler, null, null,
+ CommonParams.CORES_HANDLER_PATH, handler.coreContainer.getZkController().getZkStateReader()); // synchronous request
}
// Wait till we have an active leader
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/c1638514/solr/core/src/test/org/apache/solr/cloud/ForceLeaderTest.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/ForceLeaderTest.java b/solr/core/src/test/org/apache/solr/cloud/ForceLeaderTest.java
index 9ce3036..49b243f 100644
--- a/solr/core/src/test/org/apache/solr/cloud/ForceLeaderTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/ForceLeaderTest.java
@@ -59,6 +59,11 @@ public class ForceLeaderTest extends HttpPartitionTest {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private final boolean onlyLeaderIndexes = random().nextBoolean();
+ @Override
+ protected boolean useTlogReplicas() {
+ return onlyLeaderIndexes;
+ }
+
@Test
@Override
@Ignore
@@ -66,6 +71,146 @@ public class ForceLeaderTest extends HttpPartitionTest {
}
+ /**
+ * Tests that FORCELEADER can get an active leader even only replicas with term lower than leader's term are live
+ */
+ @Test
+ @Slow
+ public void testReplicasInLowerTerms() throws Exception {
+ handle.put("maxScore", SKIPVAL);
+ handle.put("timestamp", SKIPVAL);
+
+ String testCollectionName = "forceleader_lower_terms_collection";
+ createCollection(testCollectionName, "conf1", 1, 3, 1);
+ cloudClient.setDefaultCollection(testCollectionName);
+
+ try {
+ List<Replica> notLeaders = ensureAllReplicasAreActive(testCollectionName, SHARD1, 1, 3, maxWaitSecsToSeeAllActive);
+ assertEquals("Expected 2 replicas for collection " + testCollectionName
+ + " but found " + notLeaders.size() + "; clusterState: "
+ + printClusterStateInfo(testCollectionName), 2, notLeaders.size());
+
+ Replica leader = cloudClient.getZkStateReader().getLeaderRetry(testCollectionName, SHARD1);
+ JettySolrRunner notLeader0 = getJettyOnPort(getReplicaPort(notLeaders.get(0)));
+ ZkController zkController = notLeader0.getCoreContainer().getZkController();
+
+ log.info("Before put non leaders into lower term: " + printClusterStateInfo());
+ putNonLeadersIntoLowerTerm(testCollectionName, SHARD1, zkController, leader, notLeaders);
+
+ cloudClient.getZkStateReader().forceUpdateCollection(testCollectionName);
+ ClusterState clusterState = cloudClient.getZkStateReader().getClusterState();
+ int numActiveReplicas = getNumberOfActiveReplicas(clusterState, testCollectionName, SHARD1);
+ assertEquals("Expected only 0 active replica but found " + numActiveReplicas +
+ "; clusterState: " + printClusterStateInfo(), 0, numActiveReplicas);
+
+ int numReplicasOnLiveNodes = 0;
+ for (Replica rep : clusterState.getCollection(testCollectionName).getSlice(SHARD1).getReplicas()) {
+ if (clusterState.getLiveNodes().contains(rep.getNodeName())) {
+ numReplicasOnLiveNodes++;
+ }
+ }
+ assertEquals(2, numReplicasOnLiveNodes);
+ log.info("Before forcing leader: " + printClusterStateInfo());
+ // Assert there is no leader yet
+ assertNull("Expected no leader right now. State: " + clusterState.getCollection(testCollectionName).getSlice(SHARD1),
+ clusterState.getCollection(testCollectionName).getSlice(SHARD1).getLeader());
+
+ assertSendDocFails(3);
+
+ log.info("Do force leader...");
+ doForceLeader(cloudClient, testCollectionName, SHARD1);
+
+ // By now we have an active leader. Wait for recoveries to begin
+ waitForRecoveriesToFinish(testCollectionName, cloudClient.getZkStateReader(), true);
+
+ cloudClient.getZkStateReader().forceUpdateCollection(testCollectionName);
+ clusterState = cloudClient.getZkStateReader().getClusterState();
+ log.info("After forcing leader: " + clusterState.getCollection(testCollectionName).getSlice(SHARD1));
+ // we have a leader
+ Replica newLeader = clusterState.getCollectionOrNull(testCollectionName).getSlice(SHARD1).getLeader();
+ assertNotNull(newLeader);
+ // leader is active
+ assertEquals(State.ACTIVE, newLeader.getState());
+
+ numActiveReplicas = getNumberOfActiveReplicas(clusterState, testCollectionName, SHARD1);
+ assertEquals(2, numActiveReplicas);
+
+ // Assert that indexing works again
+ log.info("Sending doc 4...");
+ sendDoc(4);
+ log.info("Committing...");
+ cloudClient.commit();
+ log.info("Doc 4 sent and commit issued");
+
+ assertDocsExistInAllReplicas(notLeaders, testCollectionName, 1, 1);
+ assertDocsExistInAllReplicas(notLeaders, testCollectionName, 4, 4);
+
+ // Docs 1 and 4 should be here. 2 was lost during the partition, 3 had failed to be indexed.
+ log.info("Checking doc counts...");
+ ModifiableSolrParams params = new ModifiableSolrParams();
+ params.add("q", "*:*");
+ assertEquals("Expected only 2 documents in the index", 2, cloudClient.query(params).getResults().getNumFound());
+
+ bringBackOldLeaderAndSendDoc(testCollectionName, leader, notLeaders, 5);
+ } finally {
+ log.info("Cleaning up after the test.");
+ // try to clean up
+ attemptCollectionDelete(cloudClient, testCollectionName);
+ }
+ }
+
+ void putNonLeadersIntoLowerTerm(String collectionName, String shard, ZkController zkController, Replica leader, List<Replica> notLeaders) throws Exception {
+ SocketProxy[] nonLeaderProxies = new SocketProxy[notLeaders.size()];
+ for (int i = 0; i < notLeaders.size(); i++)
+ nonLeaderProxies[i] = getProxyForReplica(notLeaders.get(i));
+
+ sendDoc(1);
+
+ // ok, now introduce a network partition between the leader and both replicas
+ log.info("Closing proxies for the non-leader replicas...");
+ for (SocketProxy proxy : nonLeaderProxies)
+ proxy.close();
+ getProxyForReplica(leader).close();
+
+ // indexing during a partition
+ log.info("Sending a doc during the network partition...");
+ JettySolrRunner leaderJetty = getJettyOnPort(getReplicaPort(leader));
+ sendDoc(2, null, leaderJetty);
+
+ // Kill the leader
+ log.info("Killing leader for shard1 of " + collectionName + " on node " + leader.getNodeName() + "");
+ leaderJetty.stop();
+
+ // Wait for a steady state, till the shard is leaderless
+ log.info("Sleep and periodically wake up to check for state...");
+ for (int i = 0; i < 20; i++) {
+ ClusterState clusterState = zkController.getZkStateReader().getClusterState();
+ boolean allRecoveries = true;
+ for (Replica notLeader : notLeaders) {
+ if (clusterState.getCollection(collectionName).getReplica(notLeader.getName()).getState() != State.RECOVERING) {
+ allRecoveries = false;
+ break;
+ }
+ }
+ if (allRecoveries && clusterState.getCollection(collectionName).getSlice(shard).getLeader() == null) {
+ break;
+ }
+ Thread.sleep(1000);
+ }
+ log.info("Waking up...");
+
+ // remove the network partition
+ log.info("Reopening the proxies for the non-leader replicas...");
+ for (SocketProxy proxy : nonLeaderProxies)
+ proxy.reopen();
+
+ try (ZkShardTerms zkShardTerms = new ZkShardTerms(collectionName, shard, cloudClient.getZkStateReader().getZkClient())) {
+ for (Replica notLeader : notLeaders) {
+ assertTrue(zkShardTerms.getTerm(leader.getName()) > zkShardTerms.getTerm(notLeader.getName()));
+ }
+ }
+ }
+
/***
* Tests that FORCELEADER can get an active leader after leader puts all replicas in LIR and itself goes down,
* hence resulting in a leaderless shard.