You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by da...@apache.org on 2017/12/29 03:49:12 UTC

[5/5] lucene-solr:jira/solr-11702: SOLR-11702: RecoveryStrategy should ping leader first

SOLR-11702: RecoveryStrategy should ping leader first


Project: http://git-wip-us.apache.org/repos/asf/lucene-solr/repo
Commit: http://git-wip-us.apache.org/repos/asf/lucene-solr/commit/80044bce
Tree: http://git-wip-us.apache.org/repos/asf/lucene-solr/tree/80044bce
Diff: http://git-wip-us.apache.org/repos/asf/lucene-solr/diff/80044bce

Branch: refs/heads/jira/solr-11702
Commit: 80044bce46a5980e14eb99a1e9b0dbe05cc9f2d9
Parents: 866a073
Author: Cao Manh Dat <da...@apache.org>
Authored: Fri Dec 29 10:48:57 2017 +0700
Committer: Cao Manh Dat <da...@apache.org>
Committed: Fri Dec 29 10:48:57 2017 +0700

----------------------------------------------------------------------
 .../org/apache/solr/cloud/RecoveryStrategy.java | 51 ++++++++++++++------
 1 file changed, 36 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/80044bce/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java b/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java
index 944ae4f..2482123 100644
--- a/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java
+++ b/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java
@@ -19,6 +19,10 @@ package org.apache.solr.cloud;
 import java.io.Closeable;
 import java.io.IOException;
 import java.lang.invoke.MethodHandles;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.net.URL;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
@@ -516,21 +520,18 @@ public class RecoveryStrategy implements Runnable, Closeable {
       zkController.stopReplicationFromLeader(coreName);
     }
 
+    final String ourUrl = ZkCoreNodeProps.getCoreUrl(baseUrl, coreName);
     Future<RecoveryInfo> replayFuture = null;
     while (!successfulRecovery && !Thread.currentThread().isInterrupted() && !isClosed()) { // don't use interruption or it will close channels though
       try {
         CloudDescriptor cloudDesc = core.getCoreDescriptor().getCloudDescriptor();
-        ZkNodeProps leaderprops = zkStateReader.getLeaderRetry(
-            cloudDesc.getCollectionName(), cloudDesc.getShardId());
-      
-        final String leaderBaseUrl = leaderprops.getStr(ZkStateReader.BASE_URL_PROP);
-        final String leaderCoreName = leaderprops.getStr(ZkStateReader.CORE_NAME_PROP);
-
-        String leaderUrl = ZkCoreNodeProps.getCoreUrl(leaderBaseUrl, leaderCoreName);
-
-        String ourUrl = ZkCoreNodeProps.getCoreUrl(baseUrl, coreName);
+        final Replica leader = checkConnectionToLeader(ourUrl, cloudDesc);
+        if (isClosed()) {
+          LOG.info("RecoveryStrategy has been closed");
+          break;
+        }
 
-        boolean isLeader = leaderUrl.equals(ourUrl);
+        boolean isLeader = leader.getCoreUrl().equals(ourUrl);
         if (isLeader && !cloudDesc.isLeader()) {
           throw new SolrException(ErrorCode.SERVER_ERROR, "Cloud state still says we are leader.");
         }
@@ -548,7 +549,7 @@ public class RecoveryStrategy implements Runnable, Closeable {
         ulog.bufferUpdates();
         replayed = false;
         
-        LOG.info("Publishing state of core [{}] as recovering, leader is [{}] and I am [{}]", core.getName(), leaderUrl,
+        LOG.info("Publishing state of core [{}] as recovering, leader is [{}] and I am [{}]", core.getName(), leader.getCoreUrl(),
             ourUrl);
         zkController.publish(core.getCoreDescriptor(), Replica.State.RECOVERING);
         
@@ -567,7 +568,7 @@ public class RecoveryStrategy implements Runnable, Closeable {
           break;
         }
 
-        sendPrepRecoveryCmd(leaderBaseUrl, leaderCoreName, slice);
+        sendPrepRecoveryCmd(leader.getBaseUrl(), leader.getCoreName(), slice);
         
         if (isClosed()) {
           LOG.info("RecoveryStrategy has been closed");
@@ -587,11 +588,11 @@ public class RecoveryStrategy implements Runnable, Closeable {
         // first thing we just try to sync
         if (firstTime) {
           firstTime = false; // only try sync the first time through the loop
-          LOG.info("Attempting to PeerSync from [{}] - recoveringAfterStartup=[{}]", leaderUrl, recoveringAfterStartup);
+          LOG.info("Attempting to PeerSync from [{}] - recoveringAfterStartup=[{}]", leader.getCoreUrl(), recoveringAfterStartup);
           // System.out.println("Attempting to PeerSync from " + leaderUrl
           // + " i am:" + zkController.getNodeName());
           PeerSync peerSync = new PeerSync(core,
-              Collections.singletonList(leaderUrl), ulog.getNumRecordsToKeep(), false, false);
+              Collections.singletonList(leader.getCoreUrl()), ulog.getNumRecordsToKeep(), false, false);
           peerSync.setStartingVersions(recentVersions);
           boolean syncSuccess = peerSync.sync().isSuccess();
           if (syncSuccess) {
@@ -625,7 +626,7 @@ public class RecoveryStrategy implements Runnable, Closeable {
 
         try {
 
-          replicate(zkController.getNodeName(), core, leaderprops);
+          replicate(zkController.getNodeName(), core, leader);
 
           if (isClosed()) {
             LOG.info("RecoveryStrategy has been closed");
@@ -747,6 +748,26 @@ public class RecoveryStrategy implements Runnable, Closeable {
     LOG.info("Finished recovery process, successful=[{}]", Boolean.toString(successfulRecovery));
   }
 
+  private Replica checkConnectionToLeader(String ourUrl, CloudDescriptor cloudDesc) throws InterruptedException {
+    while (true) {
+      final Replica leaderReplica = zkStateReader.getLeaderRetry(
+          cloudDesc.getCollectionName(), cloudDesc.getShardId());
+      if (isClosed()) {
+        return leaderReplica;
+      }
+      if (leaderReplica.getCoreUrl().equals(ourUrl)) {
+        return leaderReplica;
+      }
+      try (Socket socket = new Socket()) {
+        URL url = new URL(leaderReplica.getBaseUrl());
+        socket.connect(new InetSocketAddress(url.getHost(), url.getPort()), 2000);
+        return leaderReplica;
+      } catch (IOException e) {
+        LOG.info("Failed to connect leader {}, try again", leaderReplica.getBaseUrl());
+      }
+    }
+  }
+
   public static Runnable testing_beforeReplayBufferingUpdates;
 
   final private Future<RecoveryInfo> replay(SolrCore core)