You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by da...@apache.org on 2017/12/29 03:49:12 UTC
[5/5] lucene-solr:jira/solr-11702: SOLR-11702: RecoveryStrategy
should ping leader first
SOLR-11702: RecoveryStrategy should ping leader first
Project: http://git-wip-us.apache.org/repos/asf/lucene-solr/repo
Commit: http://git-wip-us.apache.org/repos/asf/lucene-solr/commit/80044bce
Tree: http://git-wip-us.apache.org/repos/asf/lucene-solr/tree/80044bce
Diff: http://git-wip-us.apache.org/repos/asf/lucene-solr/diff/80044bce
Branch: refs/heads/jira/solr-11702
Commit: 80044bce46a5980e14eb99a1e9b0dbe05cc9f2d9
Parents: 866a073
Author: Cao Manh Dat <da...@apache.org>
Authored: Fri Dec 29 10:48:57 2017 +0700
Committer: Cao Manh Dat <da...@apache.org>
Committed: Fri Dec 29 10:48:57 2017 +0700
----------------------------------------------------------------------
.../org/apache/solr/cloud/RecoveryStrategy.java | 51 ++++++++++++++------
1 file changed, 36 insertions(+), 15 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/80044bce/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java b/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java
index 944ae4f..2482123 100644
--- a/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java
+++ b/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java
@@ -19,6 +19,10 @@ package org.apache.solr.cloud;
import java.io.Closeable;
import java.io.IOException;
import java.lang.invoke.MethodHandles;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.net.URL;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
@@ -516,21 +520,18 @@ public class RecoveryStrategy implements Runnable, Closeable {
zkController.stopReplicationFromLeader(coreName);
}
+ final String ourUrl = ZkCoreNodeProps.getCoreUrl(baseUrl, coreName);
Future<RecoveryInfo> replayFuture = null;
while (!successfulRecovery && !Thread.currentThread().isInterrupted() && !isClosed()) { // don't use interruption or it will close channels though
try {
CloudDescriptor cloudDesc = core.getCoreDescriptor().getCloudDescriptor();
- ZkNodeProps leaderprops = zkStateReader.getLeaderRetry(
- cloudDesc.getCollectionName(), cloudDesc.getShardId());
-
- final String leaderBaseUrl = leaderprops.getStr(ZkStateReader.BASE_URL_PROP);
- final String leaderCoreName = leaderprops.getStr(ZkStateReader.CORE_NAME_PROP);
-
- String leaderUrl = ZkCoreNodeProps.getCoreUrl(leaderBaseUrl, leaderCoreName);
-
- String ourUrl = ZkCoreNodeProps.getCoreUrl(baseUrl, coreName);
+ final Replica leader = checkConnectionToLeader(ourUrl, cloudDesc);
+ if (isClosed()) {
+ LOG.info("RecoveryStrategy has been closed");
+ break;
+ }
- boolean isLeader = leaderUrl.equals(ourUrl);
+ boolean isLeader = leader.getCoreUrl().equals(ourUrl);
if (isLeader && !cloudDesc.isLeader()) {
throw new SolrException(ErrorCode.SERVER_ERROR, "Cloud state still says we are leader.");
}
@@ -548,7 +549,7 @@ public class RecoveryStrategy implements Runnable, Closeable {
ulog.bufferUpdates();
replayed = false;
- LOG.info("Publishing state of core [{}] as recovering, leader is [{}] and I am [{}]", core.getName(), leaderUrl,
+ LOG.info("Publishing state of core [{}] as recovering, leader is [{}] and I am [{}]", core.getName(), leader.getCoreUrl(),
ourUrl);
zkController.publish(core.getCoreDescriptor(), Replica.State.RECOVERING);
@@ -567,7 +568,7 @@ public class RecoveryStrategy implements Runnable, Closeable {
break;
}
- sendPrepRecoveryCmd(leaderBaseUrl, leaderCoreName, slice);
+ sendPrepRecoveryCmd(leader.getBaseUrl(), leader.getCoreName(), slice);
if (isClosed()) {
LOG.info("RecoveryStrategy has been closed");
@@ -587,11 +588,11 @@ public class RecoveryStrategy implements Runnable, Closeable {
// first thing we just try to sync
if (firstTime) {
firstTime = false; // only try sync the first time through the loop
- LOG.info("Attempting to PeerSync from [{}] - recoveringAfterStartup=[{}]", leaderUrl, recoveringAfterStartup);
+ LOG.info("Attempting to PeerSync from [{}] - recoveringAfterStartup=[{}]", leader.getCoreUrl(), recoveringAfterStartup);
// System.out.println("Attempting to PeerSync from " + leaderUrl
// + " i am:" + zkController.getNodeName());
PeerSync peerSync = new PeerSync(core,
- Collections.singletonList(leaderUrl), ulog.getNumRecordsToKeep(), false, false);
+ Collections.singletonList(leader.getCoreUrl()), ulog.getNumRecordsToKeep(), false, false);
peerSync.setStartingVersions(recentVersions);
boolean syncSuccess = peerSync.sync().isSuccess();
if (syncSuccess) {
@@ -625,7 +626,7 @@ public class RecoveryStrategy implements Runnable, Closeable {
try {
- replicate(zkController.getNodeName(), core, leaderprops);
+ replicate(zkController.getNodeName(), core, leader);
if (isClosed()) {
LOG.info("RecoveryStrategy has been closed");
@@ -747,6 +748,26 @@ public class RecoveryStrategy implements Runnable, Closeable {
LOG.info("Finished recovery process, successful=[{}]", Boolean.toString(successfulRecovery));
}
+ private Replica checkConnectionToLeader(String ourUrl, CloudDescriptor cloudDesc) throws InterruptedException {
+ while (true) {
+ final Replica leaderReplica = zkStateReader.getLeaderRetry(
+ cloudDesc.getCollectionName(), cloudDesc.getShardId());
+ if (isClosed()) {
+ return leaderReplica;
+ }
+ if (leaderReplica.getCoreUrl().equals(ourUrl)) {
+ return leaderReplica;
+ }
+ try (Socket socket = new Socket()) {
+ URL url = new URL(leaderReplica.getBaseUrl());
+ socket.connect(new InetSocketAddress(url.getHost(), url.getPort()), 2000);
+ return leaderReplica;
+ } catch (IOException e) {
+ LOG.info("Failed to connect leader {}, try again", leaderReplica.getBaseUrl());
+ }
+ }
+ }
+
public static Runnable testing_beforeReplayBufferingUpdates;
final private Future<RecoveryInfo> replay(SolrCore core)