You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by da...@apache.org on 2018/01/03 10:52:06 UTC

[18/18] lucene-solr:jira/solr-11702: SOLR-11702: Replica should check the connection between its and leader before doing recovery

SOLR-11702: Replica should check the connection between its and leader before doing recovery


Project: http://git-wip-us.apache.org/repos/asf/lucene-solr/repo
Commit: http://git-wip-us.apache.org/repos/asf/lucene-solr/commit/3e6afede
Tree: http://git-wip-us.apache.org/repos/asf/lucene-solr/tree/3e6afede
Diff: http://git-wip-us.apache.org/repos/asf/lucene-solr/diff/3e6afede

Branch: refs/heads/jira/solr-11702
Commit: 3e6afede46d9824bea07a6df87214fa991c7c209
Parents: 764c1f2
Author: Cao Manh Dat <da...@apache.org>
Authored: Wed Jan 3 17:51:31 2018 +0700
Committer: Cao Manh Dat <da...@apache.org>
Committed: Wed Jan 3 17:51:31 2018 +0700

----------------------------------------------------------------------
 .../org/apache/solr/cloud/RecoveryStrategy.java |  52 ++++++++-
 .../apache/solr/cloud/HttpPartitionTest.java    | 107 ++++++++++++-------
 2 files changed, 120 insertions(+), 39 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/3e6afede/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java b/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java
index 95ba3f1..63dfe19 100644
--- a/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java
+++ b/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java
@@ -35,8 +35,10 @@ import org.apache.solr.client.solrj.impl.HttpSolrClient.HttpUriRequestResponse;
 import org.apache.solr.client.solrj.request.AbstractUpdateRequest;
 import org.apache.solr.client.solrj.request.CoreAdminRequest.WaitForState;
 import org.apache.solr.client.solrj.request.UpdateRequest;
+import org.apache.solr.client.solrj.response.SolrPingResponse;
 import org.apache.solr.common.SolrException;
 import org.apache.solr.common.SolrException.ErrorCode;
+import org.apache.solr.common.cloud.DocCollection;
 import org.apache.solr.common.cloud.Replica;
 import org.apache.solr.common.cloud.Slice;
 import org.apache.solr.common.cloud.ZkCoreNodeProps;
@@ -521,8 +523,12 @@ public class RecoveryStrategy implements Runnable, Closeable {
     while (!successfulRecovery && !Thread.currentThread().isInterrupted() && !isClosed()) { // don't use interruption or it will close channels though
       try {
         CloudDescriptor cloudDesc = core.getCoreDescriptor().getCloudDescriptor();
-        final Replica leader = zkStateReader.getLeaderRetry(cloudDesc.getCollectionName(), cloudDesc.getShardId());
-        //TODO we should try to ping to the leader to ensure the network is healthy
+        final Replica leader = pingLeader(ourUrl, core.getCoreDescriptor(), true);
+        if (isClosed()) {
+          LOG.info("RecoveryStrategy has been closed");
+          break;
+        }
+
         boolean isLeader = leader.getCoreUrl().equals(ourUrl);
         if (isLeader && !cloudDesc.isLeader()) {
           throw new SolrException(ErrorCode.SERVER_ERROR, "Cloud state still says we are leader.");
@@ -738,6 +744,48 @@ public class RecoveryStrategy implements Runnable, Closeable {
     LOG.info("Finished recovery process, successful=[{}]", Boolean.toString(successfulRecovery));
   }
 
+  private final Replica pingLeader(String ourUrl, CoreDescriptor coreDesc, boolean mayPutReplicaAsDown) throws Exception {
+    int numTried = 0;
+    while (true) {
+      CloudDescriptor cloudDesc = coreDesc.getCloudDescriptor();
+      DocCollection docCollection = zkStateReader.getClusterState().getCollection(cloudDesc.getCollectionName());
+      if (mayPutReplicaAsDown && numTried == 1 &&
+          docCollection.getReplica(coreDesc.getCloudDescriptor().getCoreNodeName()).getState() == Replica.State.ACTIVE) {
+        // this operation may take a long time, by putting replica into DOWN state, client won't query this replica
+        zkController.publish(coreDesc, Replica.State.DOWN);
+      }
+      numTried++;
+      final Replica leaderReplica = zkStateReader.getLeaderRetry(
+          cloudDesc.getCollectionName(), cloudDesc.getShardId());
+
+      if (isClosed()) {
+        return leaderReplica;
+      }
+
+      if (leaderReplica.getCoreUrl().equals(ourUrl)) {
+        return leaderReplica;
+      }
+
+      try (HttpSolrClient httpSolrClient = new HttpSolrClient.Builder(leaderReplica.getCoreUrl())
+          .withSocketTimeout(1000)
+          .withConnectionTimeout(1000)
+          .build()) {
+        SolrPingResponse resp = httpSolrClient.ping();
+        return leaderReplica;
+      } catch (IOException e) {
+        LOG.info("Failed to connect leader {} on recovery, try again", leaderReplica.getBaseUrl(), e);
+        Thread.sleep(500);
+      } catch (Exception e) {
+        if (e.getCause() instanceof IOException) {
+          LOG.info("Failed to connect leader {} on recovery, try again", leaderReplica.getBaseUrl(), e);
+          Thread.sleep(500);
+        } else {
+          return leaderReplica;
+        }
+      }
+    }
+  }
+
   public static Runnable testing_beforeReplayBufferingUpdates;
 
   final private Future<RecoveryInfo> replay(SolrCore core)

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/3e6afede/solr/core/src/test/org/apache/solr/cloud/HttpPartitionTest.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/HttpPartitionTest.java b/solr/core/src/test/org/apache/solr/cloud/HttpPartitionTest.java
index ff083cd..24f4ff6 100644
--- a/solr/core/src/test/org/apache/solr/cloud/HttpPartitionTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/HttpPartitionTest.java
@@ -31,6 +31,7 @@ import java.util.concurrent.TimeUnit;
 import org.apache.lucene.util.LuceneTestCase.Slow;
 import org.apache.solr.JSONTestUtil;
 import org.apache.solr.SolrTestCaseJ4.SuppressSSL;
+import org.apache.solr.client.solrj.SolrClient;
 import org.apache.solr.client.solrj.SolrServerException;
 import org.apache.solr.client.solrj.embedded.JettySolrRunner;
 import org.apache.solr.client.solrj.impl.CloudSolrClient;
@@ -46,10 +47,13 @@ import org.apache.solr.common.cloud.Slice;
 import org.apache.solr.common.cloud.ZkCoreNodeProps;
 import org.apache.solr.common.cloud.ZkStateReader;
 import org.apache.solr.common.util.NamedList;
+import org.apache.solr.common.util.TimeSource;
 import org.apache.solr.core.CoreContainer;
 import org.apache.solr.core.SolrCore;
 import org.apache.solr.update.UpdateLog;
 import org.apache.solr.util.RTimer;
+import org.apache.solr.util.TimeOut;
+import org.apache.zookeeper.KeeperException;
 import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -148,6 +152,7 @@ public class HttpPartitionTest extends AbstractFullDistribZkTestBase {
 
     sendDoc(1, 2);
 
+    JettySolrRunner leaderJetty = getJettyOnPort(getReplicaPort(getShardLeader(testCollectionName, "shard1", 1000)));
     List<Replica> notLeaders =
         ensureAllReplicasAreActive(testCollectionName, "shard1", 1, 3, maxWaitSecsToSeeAllActive);
     assertTrue("Expected 2 non-leader replicas for collection " + testCollectionName
@@ -160,27 +165,21 @@ public class HttpPartitionTest extends AbstractFullDistribZkTestBase {
     // Now introduce a network partition between the leader and 1 replica, so a minRf of 2 is still achieved
     log.info("partitioning replica :  " + notLeaders.get(0));
     SocketProxy proxy0 = getProxyForReplica(notLeaders.get(0));
+    SocketProxy leaderProxy = getProxyForReplica(getShardLeader(testCollectionName, "shard1", 1000));
 
     proxy0.close();
+    // leader still can connect to replica 2, by closing leaderProxy, replica 1 can not do recovery
+    leaderProxy.close();
 
     // indexing during a partition
-    int achievedRf = sendDoc(2, 2);
+    int achievedRf = sendDoc(2, 2, leaderJetty);
     assertEquals("Unexpected achieved replication factor", 2, achievedRf);
-
+    try (ZkShardTerms zkShardTerms = new ZkShardTerms(testCollectionName, "shard1", cloudClient.getZkStateReader().getZkClient())) {
+      assertFalse(zkShardTerms.canBecomeLeader(notLeaders.get(0).getName()));
+    }
     Thread.sleep(sleepMsBeforeHealPartition);
-
-    // Verify that the partitioned replica is DOWN
-    ZkStateReader zkr = cloudClient.getZkStateReader();
-    zkr.forceUpdateCollection(testCollectionName);; // force the state to be fresh
-    ClusterState cs = zkr.getClusterState();
-    Collection<Slice> slices = cs.getCollection(testCollectionName).getActiveSlices();
-    Slice slice = slices.iterator().next();
-    Replica partitionedReplica = slice.getReplica(notLeaders.get(0).getName());
-    assertEquals("The partitioned replica did not go into recovering",
-        Replica.State.RECOVERING.toString(), partitionedReplica.getStr(ZkStateReader.STATE_PROP));
-    log.info("un-partitioning replica :  " + notLeaders.get(0));
-
     proxy0.reopen();
+    leaderProxy.reopen();
 
     notLeaders =
         ensureAllReplicasAreActive(testCollectionName, "shard1", 1, 3, maxWaitSecsToSeeAllActive);
@@ -193,8 +192,10 @@ public class HttpPartitionTest extends AbstractFullDistribZkTestBase {
     proxy0.close();
     SocketProxy proxy1 = getProxyForReplica(notLeaders.get(1));
     proxy1.close();
+    leaderProxy = getProxyForReplica(getShardLeader(testCollectionName, "shard1", 1000));
+    leaderProxy.close();
 
-    achievedRf = sendDoc(3, 2);
+    achievedRf = sendDoc(3, 2, leaderJetty);
     assertEquals("Unexpected achieved replication factor", 1, achievedRf);
 
     Thread.sleep(sleepMsBeforeHealPartition);
@@ -204,6 +205,7 @@ public class HttpPartitionTest extends AbstractFullDistribZkTestBase {
 
     proxy0.reopen();
     proxy1.reopen();
+    leaderProxy.reopen();
 
     notLeaders =
         ensureAllReplicasAreActive(testCollectionName, "shard1", 1, 3, maxWaitSecsToSeeAllActive);
@@ -238,30 +240,27 @@ public class HttpPartitionTest extends AbstractFullDistribZkTestBase {
     
     Replica notLeader = 
         ensureAllReplicasAreActive(testCollectionName, "shard1", 1, 2, maxWaitSecsToSeeAllActive).get(0);
-    
+    JettySolrRunner leaderJetty = getJettyOnPort(getReplicaPort(getShardLeader(testCollectionName, "shard1", 1000)));
+
     // ok, now introduce a network partition between the leader and the replica
     SocketProxy proxy = getProxyForReplica(notLeader);
+    SocketProxy leaderProxy = getProxyForReplica(getShardLeader(testCollectionName, "shard1", 1000));
     
     proxy.close();
-    
+    leaderProxy.close();
+
     // indexing during a partition
-    sendDoc(2);
-    
-    // Have the partition last at least 1 sec
-    // While this gives the impression that recovery is timing related, this is
-    // really only
-    // to give time for the state to be written to ZK before the test completes.
-    // In other words,
-    // without a brief pause, the test finishes so quickly that it doesn't give
-    // time for the recovery process to kick-in
-    Thread.sleep(sleepMsBeforeHealPartition);
+    sendDoc(2, null, leaderJetty);
+    // replica should publish itself as DOWN if the network is not healed after some amount time
+    waitForDownState(testCollectionName, notLeader.getName(), 10000);
     
     proxy.reopen();
+    leaderProxy.reopen();
     
     List<Replica> notLeaders = 
         ensureAllReplicasAreActive(testCollectionName, "shard1", 1, 2, maxWaitSecsToSeeAllActive);
     
-    int achievedRf = sendDoc(3, 2);
+    int achievedRf = sendDoc(3);
     if (achievedRf == 1) {
       // this case can happen when leader reuse an connection get established before network partition
       // TODO: Remove when SOLR-11776 get committed
@@ -293,21 +292,25 @@ public class HttpPartitionTest extends AbstractFullDistribZkTestBase {
       if (d % 10 == 0) {
         if (hasPartition) {
           proxy.reopen();
+          leaderProxy.reopen();
           hasPartition = false;
         } else {
           if (d >= 10) {
             proxy.close();
+            leaderProxy.close();
             hasPartition = true;
             Thread.sleep(sleepMsBeforeHealPartition);
           }
         }
       }
-      sendDoc(d + 4); // 4 is offset as we've already indexed 1-3
+      // always send doc directly to leader without going through proxy
+      sendDoc(d + 4, null, leaderJetty); // 4 is offset as we've already indexed 1-3
     }
     
     // restore connectivity if lost
     if (hasPartition) {
       proxy.reopen();
+      leaderProxy.reopen();
     }
     
     notLeaders = ensureAllReplicasAreActive(testCollectionName, "shard1", 1, 2, maxWaitSecsToSeeAllActive);
@@ -328,7 +331,24 @@ public class HttpPartitionTest extends AbstractFullDistribZkTestBase {
     // try to clean up
     attemptCollectionDelete(cloudClient, testCollectionName);
   }
-  
+
+  private void waitForDownState(String collection, String replicaName, long ms) throws KeeperException, InterruptedException {
+    TimeOut timeOut = new TimeOut(ms, TimeUnit.MILLISECONDS, TimeSource.CURRENT_TIME);
+    Replica.State replicaState = Replica.State.ACTIVE;
+    while (!timeOut.hasTimedOut()) {
+      ZkStateReader zkr = cloudClient.getZkStateReader();
+      zkr.forceUpdateCollection(collection);; // force the state to be fresh
+      ClusterState cs = zkr.getClusterState();
+      Collection<Slice> slices = cs.getCollection(collection).getActiveSlices();
+      Slice slice = slices.iterator().next();
+      Replica partitionedReplica = slice.getReplica(replicaName);
+      replicaState = partitionedReplica.getState();
+      if (replicaState == Replica.State.DOWN) return;
+    }
+    assertEquals("The partitioned replica did not published it self as down",
+        Replica.State.DOWN, replicaState);
+  }
+
   protected void testRf3() throws Exception {
     // create a collection that has 1 shard but 2 replicas
     String testCollectionName = "c8n_1x3";
@@ -344,27 +364,30 @@ public class HttpPartitionTest extends AbstractFullDistribZkTestBase {
         + " but found " + notLeaders.size() + "; clusterState: "
         + printClusterStateInfo(testCollectionName),
         notLeaders.size() == 2);
+    JettySolrRunner leaderJetty = getJettyOnPort(getReplicaPort(getShardLeader(testCollectionName, "shard1", 1000)));
 
     // ok, now introduce a network partition between the leader and the replica
     SocketProxy proxy0 = getProxyForReplica(notLeaders.get(0));
+    SocketProxy leaderProxy = getProxyForReplica(getShardLeader(testCollectionName, "shard1", 1000));
     
     proxy0.close();
+    leaderProxy.close();
     
     // indexing during a partition
-    sendDoc(2);
+    sendDoc(2, null, leaderJetty);
     
     Thread.sleep(sleepMsBeforeHealPartition);
-    
     proxy0.reopen();
     
     SocketProxy proxy1 = getProxyForReplica(notLeaders.get(1));
-    
     proxy1.close();
     
-    sendDoc(3);
+    sendDoc(3, null, leaderJetty);
     
     Thread.sleep(sleepMsBeforeHealPartition);
     proxy1.reopen();
+
+    leaderProxy.reopen();
     
     // sent 4 docs in so far, verify they are on the leader and replica
     notLeaders = ensureAllReplicasAreActive(testCollectionName, "shard1", 1, 3, maxWaitSecsToSeeAllActive); 
@@ -522,8 +545,19 @@ public class HttpPartitionTest extends AbstractFullDistribZkTestBase {
   protected int sendDoc(int docId) throws Exception {
     return sendDoc(docId, null);
   }
-  
+
+  // Send doc directly to a server (without going through proxy)
+  protected int sendDoc(int docId, Integer minRf, JettySolrRunner leaderJetty) throws IOException, SolrServerException {
+    try (HttpSolrClient solrClient = new HttpSolrClient.Builder(leaderJetty.getBaseUrl().toString()).build()) {
+      return sendDoc(docId, minRf, solrClient, cloudClient.getDefaultCollection());
+    }
+  }
+
   protected int sendDoc(int docId, Integer minRf) throws Exception {
+    return sendDoc(docId, minRf, cloudClient, cloudClient.getDefaultCollection());
+  }
+
+  protected int sendDoc(int docId, Integer minRf, SolrClient solrClient, String collection) throws IOException, SolrServerException {
     SolrInputDocument doc = new SolrInputDocument();
     doc.addField(id, String.valueOf(docId));
     doc.addField("a_t", "hello" + docId);
@@ -533,8 +567,7 @@ public class HttpPartitionTest extends AbstractFullDistribZkTestBase {
       up.setParam(UpdateRequest.MIN_REPFACT, String.valueOf(minRf));
     }
     up.add(doc);
-
-    return cloudClient.getMinAchievedReplicationFactor(cloudClient.getDefaultCollection(), cloudClient.request(up));
+    return cloudClient.getMinAchievedReplicationFactor(collection, solrClient.request(up, collection));
   }
 
   /**