You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by da...@apache.org on 2018/01/11 07:42:25 UTC

lucene-solr:jira/solr-11702: SOLR-11702: Support ForceLeaderAPI

Repository: lucene-solr
Updated Branches:
  refs/heads/jira/solr-11702 583a4a549 -> c16385142


SOLR-11702: Support ForceLeaderAPI


Project: http://git-wip-us.apache.org/repos/asf/lucene-solr/repo
Commit: http://git-wip-us.apache.org/repos/asf/lucene-solr/commit/c1638514
Tree: http://git-wip-us.apache.org/repos/asf/lucene-solr/tree/c1638514
Diff: http://git-wip-us.apache.org/repos/asf/lucene-solr/diff/c1638514

Branch: refs/heads/jira/solr-11702
Commit: c163851429779c2e25423da34103dc43b7b493b9
Parents: 583a4a5
Author: Cao Manh Dat <da...@apache.org>
Authored: Thu Jan 11 14:42:10 2018 +0700
Committer: Cao Manh Dat <da...@apache.org>
Committed: Thu Jan 11 14:42:10 2018 +0700

----------------------------------------------------------------------
 .../org/apache/solr/cloud/ElectionContext.java  |  17 +--
 .../org/apache/solr/cloud/ZkShardTerms.java     |   7 +-
 .../solr/handler/admin/CollectionsHandler.java  |  45 ++++--
 .../org/apache/solr/cloud/ForceLeaderTest.java  | 145 +++++++++++++++++++
 4 files changed, 192 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/c1638514/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java b/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java
index d6efd1d..6302fe2 100644
--- a/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java
+++ b/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java
@@ -313,14 +313,6 @@ final class ShardLeaderElectionContext extends ShardLeaderElectionContextBase {
         }
       }
       coreNodeName = core.getCoreDescriptor().getCloudDescriptor().getCoreNodeName();
-      if (zkController.getShardTerms(collection, shardId).registered(coreNodeName)
-          && !zkController.getShardTerms(collection, shardId).canBecomeLeader(coreNodeName)) {
-        // no need to do recovery here, we already has term less than leader term, the recovery will be triggered later
-        log.info("Can not become leader, this core has term less than leader's term");
-        cancelElection();
-        leaderElector.joinElection(this, true);
-        return;
-      }
       MDCLoggingContext.setCore(core);
       lt = core.getUpdateHandler().getSolrCoreState().getLeaderThrottle();
     }
@@ -765,7 +757,14 @@ final class ShardLeaderElectionContext extends ShardLeaderElectionContextBase {
       // to make sure others participate in sync and leader election, we can be leader
       return true;
     }
-    
+
+    String coreNodeName = core.getCoreDescriptor().getCloudDescriptor().getCoreNodeName();
+    if (zkController.getShardTerms(collection, shardId).registered(coreNodeName)
+        && !zkController.getShardTerms(collection, shardId).canBecomeLeader(coreNodeName)) {
+      log.info("Can't become leader, term of replica {} less than leader", coreNodeName);
+      return false;
+    }
+
     if (core.getCoreDescriptor().getCloudDescriptor().getLastPublished() == Replica.State.ACTIVE) {
       log.debug("My last published State was Active, it's okay to be the leader.");
       return true;

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/c1638514/solr/core/src/java/org/apache/solr/cloud/ZkShardTerms.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/ZkShardTerms.java b/solr/core/src/java/org/apache/solr/cloud/ZkShardTerms.java
index 69ae990..335a07b 100644
--- a/solr/core/src/java/org/apache/solr/cloud/ZkShardTerms.java
+++ b/solr/core/src/java/org/apache/solr/cloud/ZkShardTerms.java
@@ -181,13 +181,18 @@ public class ZkShardTerms implements AutoCloseable{
    * Set a replica's term equals to leader's term
    * @param coreNodeName of the replica
    */
-  void setEqualsToMax(String coreNodeName) {
+  public void setEqualsToMax(String coreNodeName) {
     Terms newTerms;
     while ( (newTerms = terms.setEqualsToMax(coreNodeName)) != null) {
       if (forceSaveTerms(newTerms)) break;
     }
   }
 
+  public long getTerm(String coreNodeName) {
+    Long term = terms.getTerm(coreNodeName);
+    return term == null? -1 : term;
+  }
+
   // package private for testing, only used by tests
   int getNumListeners() {
     synchronized (listeners) {

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/c1638514/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java b/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java
index aba32b4..c05afa4 100644
--- a/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java
+++ b/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java
@@ -28,8 +28,10 @@ import java.util.List;
 import java.util.Locale;
 import java.util.Map;
 import java.util.Optional;
+import java.util.OptionalLong;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
 
 import com.google.common.collect.ImmutableSet;
 import org.apache.commons.io.IOUtils;
@@ -47,6 +49,7 @@ import org.apache.solr.cloud.OverseerSolrResponse;
 import org.apache.solr.cloud.OverseerTaskQueue;
 import org.apache.solr.cloud.OverseerTaskQueue.QueueEvent;
 import org.apache.solr.cloud.ZkController;
+import org.apache.solr.cloud.ZkShardTerms;
 import org.apache.solr.cloud.overseer.SliceMutator;
 import org.apache.solr.cloud.rule.ReplicaAssigner;
 import org.apache.solr.cloud.rule.Rule;
@@ -959,7 +962,8 @@ public class CollectionsHandler extends RequestHandlerBase implements Permission
   }
 
   private static void forceLeaderElection(SolrQueryRequest req, CollectionsHandler handler) {
-    ClusterState clusterState = handler.coreContainer.getZkController().getClusterState();
+    ZkController zkController = handler.coreContainer.getZkController();
+    ClusterState clusterState = zkController.getClusterState();
     String collectionName = req.getParams().required().get(COLLECTION_PROP);
     String sliceId = req.getParams().required().get(SHARD_ID_PROP);
 
@@ -971,7 +975,7 @@ public class CollectionsHandler extends RequestHandlerBase implements Permission
           "No shard with name " + sliceId + " exists for collection " + collectionName);
     }
 
-    try {
+    try (ZkShardTerms zkShardTerms = new ZkShardTerms(collectionName, slice.getName(), zkController.getZkClient())) {
       // if an active replica is the leader, then all is fine already
       Replica leader = slice.getLeader();
       if (leader != null && leader.getState() == State.ACTIVE) {
@@ -989,20 +993,37 @@ public class CollectionsHandler extends RequestHandlerBase implements Permission
         handler.coreContainer.getZkController().getZkClient().clean(lirPath);
       }
 
+      final Set<String> liveNodes = clusterState.getLiveNodes();
+      List<Replica> liveReplicas = slice.getReplicas().stream()
+          .filter(rep -> liveNodes.contains(rep.getNodeName())).collect(Collectors.toList());
+      boolean shouldIncreaseReplicaTerms = liveReplicas.stream()
+          .noneMatch(rep -> zkShardTerms.registered(rep.getName()) && zkShardTerms.canBecomeLeader(rep.getName()));
+      // we won't increase replica's terms if exist a live replica with term equals to leader
+      if (shouldIncreaseReplicaTerms) {
+        OptionalLong optionalMaxTerm = liveReplicas.stream()
+            .filter(rep -> zkShardTerms.registered(rep.getName()))
+            .mapToLong(rep -> zkShardTerms.getTerm(rep.getName()))
+            .max();
+        // and increase terms of replicas less out-of-sync
+        if (optionalMaxTerm.isPresent()) {
+          liveReplicas.stream()
+              .filter(rep -> zkShardTerms.getTerm(rep.getName()) == optionalMaxTerm.getAsLong())
+              .forEach(rep -> zkShardTerms.setEqualsToMax(rep.getName()));
+        }
+      }
+
       // Call all live replicas to prepare themselves for leadership, e.g. set last published
       // state to active.
-      for (Replica rep : slice.getReplicas()) {
-        if (clusterState.getLiveNodes().contains(rep.getNodeName())) {
-          ShardHandler shardHandler = handler.coreContainer.getShardHandlerFactory().getShardHandler();
+      for (Replica rep : liveReplicas) {
+        ShardHandler shardHandler = handler.coreContainer.getShardHandlerFactory().getShardHandler();
 
-          ModifiableSolrParams params = new ModifiableSolrParams();
-          params.set(CoreAdminParams.ACTION, CoreAdminAction.FORCEPREPAREFORLEADERSHIP.toString());
-          params.set(CoreAdminParams.CORE, rep.getStr("core"));
-          String nodeName = rep.getNodeName();
+        ModifiableSolrParams params = new ModifiableSolrParams();
+        params.set(CoreAdminParams.ACTION, CoreAdminAction.FORCEPREPAREFORLEADERSHIP.toString());
+        params.set(CoreAdminParams.CORE, rep.getStr("core"));
+        String nodeName = rep.getNodeName();
 
-          OverseerCollectionMessageHandler.sendShardRequest(nodeName, params, shardHandler, null, null,
-              CommonParams.CORES_HANDLER_PATH, handler.coreContainer.getZkController().getZkStateReader()); // synchronous request
-        }
+        OverseerCollectionMessageHandler.sendShardRequest(nodeName, params, shardHandler, null, null,
+            CommonParams.CORES_HANDLER_PATH, handler.coreContainer.getZkController().getZkStateReader()); // synchronous request
       }
 
       // Wait till we have an active leader

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/c1638514/solr/core/src/test/org/apache/solr/cloud/ForceLeaderTest.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/ForceLeaderTest.java b/solr/core/src/test/org/apache/solr/cloud/ForceLeaderTest.java
index 9ce3036..49b243f 100644
--- a/solr/core/src/test/org/apache/solr/cloud/ForceLeaderTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/ForceLeaderTest.java
@@ -59,6 +59,11 @@ public class ForceLeaderTest extends HttpPartitionTest {
   private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
   private final boolean onlyLeaderIndexes = random().nextBoolean();
 
+  @Override
+  protected boolean useTlogReplicas() {
+    return onlyLeaderIndexes;
+  }
+
   @Test
   @Override
   @Ignore
@@ -66,6 +71,146 @@ public class ForceLeaderTest extends HttpPartitionTest {
 
   }
 
+  /**
+   * Tests that FORCELEADER can get an active leader even only replicas with term lower than leader's term are live
+   */
+  @Test
+  @Slow
+  public void testReplicasInLowerTerms() throws Exception {
+    handle.put("maxScore", SKIPVAL);
+    handle.put("timestamp", SKIPVAL);
+
+    String testCollectionName = "forceleader_lower_terms_collection";
+    createCollection(testCollectionName, "conf1", 1, 3, 1);
+    cloudClient.setDefaultCollection(testCollectionName);
+
+    try {
+      List<Replica> notLeaders = ensureAllReplicasAreActive(testCollectionName, SHARD1, 1, 3, maxWaitSecsToSeeAllActive);
+      assertEquals("Expected 2 replicas for collection " + testCollectionName
+          + " but found " + notLeaders.size() + "; clusterState: "
+          + printClusterStateInfo(testCollectionName), 2, notLeaders.size());
+
+      Replica leader = cloudClient.getZkStateReader().getLeaderRetry(testCollectionName, SHARD1);
+      JettySolrRunner notLeader0 = getJettyOnPort(getReplicaPort(notLeaders.get(0)));
+      ZkController zkController = notLeader0.getCoreContainer().getZkController();
+
+      log.info("Before put non leaders into lower term: " + printClusterStateInfo());
+      putNonLeadersIntoLowerTerm(testCollectionName, SHARD1, zkController, leader, notLeaders);
+
+      cloudClient.getZkStateReader().forceUpdateCollection(testCollectionName);
+      ClusterState clusterState = cloudClient.getZkStateReader().getClusterState();
+      int numActiveReplicas = getNumberOfActiveReplicas(clusterState, testCollectionName, SHARD1);
+      assertEquals("Expected only 0 active replica but found " + numActiveReplicas +
+          "; clusterState: " + printClusterStateInfo(), 0, numActiveReplicas);
+
+      int numReplicasOnLiveNodes = 0;
+      for (Replica rep : clusterState.getCollection(testCollectionName).getSlice(SHARD1).getReplicas()) {
+        if (clusterState.getLiveNodes().contains(rep.getNodeName())) {
+          numReplicasOnLiveNodes++;
+        }
+      }
+      assertEquals(2, numReplicasOnLiveNodes);
+      log.info("Before forcing leader: " + printClusterStateInfo());
+      // Assert there is no leader yet
+      assertNull("Expected no leader right now. State: " + clusterState.getCollection(testCollectionName).getSlice(SHARD1),
+          clusterState.getCollection(testCollectionName).getSlice(SHARD1).getLeader());
+
+      assertSendDocFails(3);
+
+      log.info("Do force leader...");
+      doForceLeader(cloudClient, testCollectionName, SHARD1);
+
+      // By now we have an active leader. Wait for recoveries to begin
+      waitForRecoveriesToFinish(testCollectionName, cloudClient.getZkStateReader(), true);
+
+      cloudClient.getZkStateReader().forceUpdateCollection(testCollectionName);
+      clusterState = cloudClient.getZkStateReader().getClusterState();
+      log.info("After forcing leader: " + clusterState.getCollection(testCollectionName).getSlice(SHARD1));
+      // we have a leader
+      Replica newLeader = clusterState.getCollectionOrNull(testCollectionName).getSlice(SHARD1).getLeader();
+      assertNotNull(newLeader);
+      // leader is active
+      assertEquals(State.ACTIVE, newLeader.getState());
+
+      numActiveReplicas = getNumberOfActiveReplicas(clusterState, testCollectionName, SHARD1);
+      assertEquals(2, numActiveReplicas);
+
+      // Assert that indexing works again
+      log.info("Sending doc 4...");
+      sendDoc(4);
+      log.info("Committing...");
+      cloudClient.commit();
+      log.info("Doc 4 sent and commit issued");
+
+      assertDocsExistInAllReplicas(notLeaders, testCollectionName, 1, 1);
+      assertDocsExistInAllReplicas(notLeaders, testCollectionName, 4, 4);
+
+      // Docs 1 and 4 should be here. 2 was lost during the partition, 3 had failed to be indexed.
+      log.info("Checking doc counts...");
+      ModifiableSolrParams params = new ModifiableSolrParams();
+      params.add("q", "*:*");
+      assertEquals("Expected only 2 documents in the index", 2, cloudClient.query(params).getResults().getNumFound());
+
+      bringBackOldLeaderAndSendDoc(testCollectionName, leader, notLeaders, 5);
+    } finally {
+      log.info("Cleaning up after the test.");
+      // try to clean up
+      attemptCollectionDelete(cloudClient, testCollectionName);
+    }
+  }
+
+  void putNonLeadersIntoLowerTerm(String collectionName, String shard, ZkController zkController, Replica leader, List<Replica> notLeaders) throws Exception {
+    SocketProxy[] nonLeaderProxies = new SocketProxy[notLeaders.size()];
+    for (int i = 0; i < notLeaders.size(); i++)
+      nonLeaderProxies[i] = getProxyForReplica(notLeaders.get(i));
+
+    sendDoc(1);
+
+    // ok, now introduce a network partition between the leader and both replicas
+    log.info("Closing proxies for the non-leader replicas...");
+    for (SocketProxy proxy : nonLeaderProxies)
+      proxy.close();
+    getProxyForReplica(leader).close();
+
+    // indexing during a partition
+    log.info("Sending a doc during the network partition...");
+    JettySolrRunner leaderJetty = getJettyOnPort(getReplicaPort(leader));
+    sendDoc(2, null, leaderJetty);
+
+    // Kill the leader
+    log.info("Killing leader for shard1 of " + collectionName + " on node " + leader.getNodeName() + "");
+    leaderJetty.stop();
+
+    // Wait for a steady state, till the shard is leaderless
+    log.info("Sleep and periodically wake up to check for state...");
+    for (int i = 0; i < 20; i++) {
+      ClusterState clusterState = zkController.getZkStateReader().getClusterState();
+      boolean allRecoveries = true;
+      for (Replica notLeader : notLeaders) {
+        if (clusterState.getCollection(collectionName).getReplica(notLeader.getName()).getState() != State.RECOVERING) {
+          allRecoveries = false;
+          break;
+        }
+      }
+      if (allRecoveries && clusterState.getCollection(collectionName).getSlice(shard).getLeader() == null) {
+        break;
+      }
+      Thread.sleep(1000);
+    }
+    log.info("Waking up...");
+
+    // remove the network partition
+    log.info("Reopening the proxies for the non-leader replicas...");
+    for (SocketProxy proxy : nonLeaderProxies)
+      proxy.reopen();
+
+    try (ZkShardTerms zkShardTerms = new ZkShardTerms(collectionName, shard, cloudClient.getZkStateReader().getZkClient())) {
+      for (Replica notLeader : notLeaders) {
+        assertTrue(zkShardTerms.getTerm(leader.getName()) > zkShardTerms.getTerm(notLeader.getName()));
+      }
+    }
+  }
+
   /***
    * Tests that FORCELEADER can get an active leader after leader puts all replicas in LIR and itself goes down,
    * hence resulting in a leaderless shard.