You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by da...@apache.org on 2018/01/03 10:52:06 UTC
[18/18] lucene-solr:jira/solr-11702: SOLR-11702: Replica should check
the connection between its and leader before doing recovery
SOLR-11702: Replica should check the connection between its and leader before doing recovery
Project: http://git-wip-us.apache.org/repos/asf/lucene-solr/repo
Commit: http://git-wip-us.apache.org/repos/asf/lucene-solr/commit/3e6afede
Tree: http://git-wip-us.apache.org/repos/asf/lucene-solr/tree/3e6afede
Diff: http://git-wip-us.apache.org/repos/asf/lucene-solr/diff/3e6afede
Branch: refs/heads/jira/solr-11702
Commit: 3e6afede46d9824bea07a6df87214fa991c7c209
Parents: 764c1f2
Author: Cao Manh Dat <da...@apache.org>
Authored: Wed Jan 3 17:51:31 2018 +0700
Committer: Cao Manh Dat <da...@apache.org>
Committed: Wed Jan 3 17:51:31 2018 +0700
----------------------------------------------------------------------
.../org/apache/solr/cloud/RecoveryStrategy.java | 52 ++++++++-
.../apache/solr/cloud/HttpPartitionTest.java | 107 ++++++++++++-------
2 files changed, 120 insertions(+), 39 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/3e6afede/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java b/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java
index 95ba3f1..63dfe19 100644
--- a/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java
+++ b/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java
@@ -35,8 +35,10 @@ import org.apache.solr.client.solrj.impl.HttpSolrClient.HttpUriRequestResponse;
import org.apache.solr.client.solrj.request.AbstractUpdateRequest;
import org.apache.solr.client.solrj.request.CoreAdminRequest.WaitForState;
import org.apache.solr.client.solrj.request.UpdateRequest;
+import org.apache.solr.client.solrj.response.SolrPingResponse;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrException.ErrorCode;
+import org.apache.solr.common.cloud.DocCollection;
import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.cloud.Slice;
import org.apache.solr.common.cloud.ZkCoreNodeProps;
@@ -521,8 +523,12 @@ public class RecoveryStrategy implements Runnable, Closeable {
while (!successfulRecovery && !Thread.currentThread().isInterrupted() && !isClosed()) { // don't use interruption or it will close channels though
try {
CloudDescriptor cloudDesc = core.getCoreDescriptor().getCloudDescriptor();
- final Replica leader = zkStateReader.getLeaderRetry(cloudDesc.getCollectionName(), cloudDesc.getShardId());
- //TODO we should try to ping to the leader to ensure the network is healthy
+ final Replica leader = pingLeader(ourUrl, core.getCoreDescriptor(), true);
+ if (isClosed()) {
+ LOG.info("RecoveryStrategy has been closed");
+ break;
+ }
+
boolean isLeader = leader.getCoreUrl().equals(ourUrl);
if (isLeader && !cloudDesc.isLeader()) {
throw new SolrException(ErrorCode.SERVER_ERROR, "Cloud state still says we are leader.");
@@ -738,6 +744,48 @@ public class RecoveryStrategy implements Runnable, Closeable {
LOG.info("Finished recovery process, successful=[{}]", Boolean.toString(successfulRecovery));
}
+ private final Replica pingLeader(String ourUrl, CoreDescriptor coreDesc, boolean mayPutReplicaAsDown) throws Exception {
+ int numTried = 0;
+ while (true) {
+ CloudDescriptor cloudDesc = coreDesc.getCloudDescriptor();
+ DocCollection docCollection = zkStateReader.getClusterState().getCollection(cloudDesc.getCollectionName());
+ if (mayPutReplicaAsDown && numTried == 1 &&
+ docCollection.getReplica(coreDesc.getCloudDescriptor().getCoreNodeName()).getState() == Replica.State.ACTIVE) {
+ // this operation may take a long time, by putting replica into DOWN state, client won't query this replica
+ zkController.publish(coreDesc, Replica.State.DOWN);
+ }
+ numTried++;
+ final Replica leaderReplica = zkStateReader.getLeaderRetry(
+ cloudDesc.getCollectionName(), cloudDesc.getShardId());
+
+ if (isClosed()) {
+ return leaderReplica;
+ }
+
+ if (leaderReplica.getCoreUrl().equals(ourUrl)) {
+ return leaderReplica;
+ }
+
+ try (HttpSolrClient httpSolrClient = new HttpSolrClient.Builder(leaderReplica.getCoreUrl())
+ .withSocketTimeout(1000)
+ .withConnectionTimeout(1000)
+ .build()) {
+ SolrPingResponse resp = httpSolrClient.ping();
+ return leaderReplica;
+ } catch (IOException e) {
+ LOG.info("Failed to connect leader {} on recovery, try again", leaderReplica.getBaseUrl(), e);
+ Thread.sleep(500);
+ } catch (Exception e) {
+ if (e.getCause() instanceof IOException) {
+ LOG.info("Failed to connect leader {} on recovery, try again", leaderReplica.getBaseUrl(), e);
+ Thread.sleep(500);
+ } else {
+ return leaderReplica;
+ }
+ }
+ }
+ }
+
public static Runnable testing_beforeReplayBufferingUpdates;
final private Future<RecoveryInfo> replay(SolrCore core)
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/3e6afede/solr/core/src/test/org/apache/solr/cloud/HttpPartitionTest.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/HttpPartitionTest.java b/solr/core/src/test/org/apache/solr/cloud/HttpPartitionTest.java
index ff083cd..24f4ff6 100644
--- a/solr/core/src/test/org/apache/solr/cloud/HttpPartitionTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/HttpPartitionTest.java
@@ -31,6 +31,7 @@ import java.util.concurrent.TimeUnit;
import org.apache.lucene.util.LuceneTestCase.Slow;
import org.apache.solr.JSONTestUtil;
import org.apache.solr.SolrTestCaseJ4.SuppressSSL;
+import org.apache.solr.client.solrj.SolrClient;
import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.client.solrj.embedded.JettySolrRunner;
import org.apache.solr.client.solrj.impl.CloudSolrClient;
@@ -46,10 +47,13 @@ import org.apache.solr.common.cloud.Slice;
import org.apache.solr.common.cloud.ZkCoreNodeProps;
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.util.NamedList;
+import org.apache.solr.common.util.TimeSource;
import org.apache.solr.core.CoreContainer;
import org.apache.solr.core.SolrCore;
import org.apache.solr.update.UpdateLog;
import org.apache.solr.util.RTimer;
+import org.apache.solr.util.TimeOut;
+import org.apache.zookeeper.KeeperException;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -148,6 +152,7 @@ public class HttpPartitionTest extends AbstractFullDistribZkTestBase {
sendDoc(1, 2);
+ JettySolrRunner leaderJetty = getJettyOnPort(getReplicaPort(getShardLeader(testCollectionName, "shard1", 1000)));
List<Replica> notLeaders =
ensureAllReplicasAreActive(testCollectionName, "shard1", 1, 3, maxWaitSecsToSeeAllActive);
assertTrue("Expected 2 non-leader replicas for collection " + testCollectionName
@@ -160,27 +165,21 @@ public class HttpPartitionTest extends AbstractFullDistribZkTestBase {
// Now introduce a network partition between the leader and 1 replica, so a minRf of 2 is still achieved
log.info("partitioning replica : " + notLeaders.get(0));
SocketProxy proxy0 = getProxyForReplica(notLeaders.get(0));
+ SocketProxy leaderProxy = getProxyForReplica(getShardLeader(testCollectionName, "shard1", 1000));
proxy0.close();
+ // leader still can connect to replica 2, by closing leaderProxy, replica 1 can not do recovery
+ leaderProxy.close();
// indexing during a partition
- int achievedRf = sendDoc(2, 2);
+ int achievedRf = sendDoc(2, 2, leaderJetty);
assertEquals("Unexpected achieved replication factor", 2, achievedRf);
-
+ try (ZkShardTerms zkShardTerms = new ZkShardTerms(testCollectionName, "shard1", cloudClient.getZkStateReader().getZkClient())) {
+ assertFalse(zkShardTerms.canBecomeLeader(notLeaders.get(0).getName()));
+ }
Thread.sleep(sleepMsBeforeHealPartition);
-
- // Verify that the partitioned replica is DOWN
- ZkStateReader zkr = cloudClient.getZkStateReader();
- zkr.forceUpdateCollection(testCollectionName);; // force the state to be fresh
- ClusterState cs = zkr.getClusterState();
- Collection<Slice> slices = cs.getCollection(testCollectionName).getActiveSlices();
- Slice slice = slices.iterator().next();
- Replica partitionedReplica = slice.getReplica(notLeaders.get(0).getName());
- assertEquals("The partitioned replica did not go into recovering",
- Replica.State.RECOVERING.toString(), partitionedReplica.getStr(ZkStateReader.STATE_PROP));
- log.info("un-partitioning replica : " + notLeaders.get(0));
-
proxy0.reopen();
+ leaderProxy.reopen();
notLeaders =
ensureAllReplicasAreActive(testCollectionName, "shard1", 1, 3, maxWaitSecsToSeeAllActive);
@@ -193,8 +192,10 @@ public class HttpPartitionTest extends AbstractFullDistribZkTestBase {
proxy0.close();
SocketProxy proxy1 = getProxyForReplica(notLeaders.get(1));
proxy1.close();
+ leaderProxy = getProxyForReplica(getShardLeader(testCollectionName, "shard1", 1000));
+ leaderProxy.close();
- achievedRf = sendDoc(3, 2);
+ achievedRf = sendDoc(3, 2, leaderJetty);
assertEquals("Unexpected achieved replication factor", 1, achievedRf);
Thread.sleep(sleepMsBeforeHealPartition);
@@ -204,6 +205,7 @@ public class HttpPartitionTest extends AbstractFullDistribZkTestBase {
proxy0.reopen();
proxy1.reopen();
+ leaderProxy.reopen();
notLeaders =
ensureAllReplicasAreActive(testCollectionName, "shard1", 1, 3, maxWaitSecsToSeeAllActive);
@@ -238,30 +240,27 @@ public class HttpPartitionTest extends AbstractFullDistribZkTestBase {
Replica notLeader =
ensureAllReplicasAreActive(testCollectionName, "shard1", 1, 2, maxWaitSecsToSeeAllActive).get(0);
-
+ JettySolrRunner leaderJetty = getJettyOnPort(getReplicaPort(getShardLeader(testCollectionName, "shard1", 1000)));
+
// ok, now introduce a network partition between the leader and the replica
SocketProxy proxy = getProxyForReplica(notLeader);
+ SocketProxy leaderProxy = getProxyForReplica(getShardLeader(testCollectionName, "shard1", 1000));
proxy.close();
-
+ leaderProxy.close();
+
// indexing during a partition
- sendDoc(2);
-
- // Have the partition last at least 1 sec
- // While this gives the impression that recovery is timing related, this is
- // really only
- // to give time for the state to be written to ZK before the test completes.
- // In other words,
- // without a brief pause, the test finishes so quickly that it doesn't give
- // time for the recovery process to kick-in
- Thread.sleep(sleepMsBeforeHealPartition);
+ sendDoc(2, null, leaderJetty);
+ // replica should publish itself as DOWN if the network is not healed after some amount time
+ waitForDownState(testCollectionName, notLeader.getName(), 10000);
proxy.reopen();
+ leaderProxy.reopen();
List<Replica> notLeaders =
ensureAllReplicasAreActive(testCollectionName, "shard1", 1, 2, maxWaitSecsToSeeAllActive);
- int achievedRf = sendDoc(3, 2);
+ int achievedRf = sendDoc(3);
if (achievedRf == 1) {
// this case can happen when leader reuse an connection get established before network partition
// TODO: Remove when SOLR-11776 get committed
@@ -293,21 +292,25 @@ public class HttpPartitionTest extends AbstractFullDistribZkTestBase {
if (d % 10 == 0) {
if (hasPartition) {
proxy.reopen();
+ leaderProxy.reopen();
hasPartition = false;
} else {
if (d >= 10) {
proxy.close();
+ leaderProxy.close();
hasPartition = true;
Thread.sleep(sleepMsBeforeHealPartition);
}
}
}
- sendDoc(d + 4); // 4 is offset as we've already indexed 1-3
+ // always send doc directly to leader without going through proxy
+ sendDoc(d + 4, null, leaderJetty); // 4 is offset as we've already indexed 1-3
}
// restore connectivity if lost
if (hasPartition) {
proxy.reopen();
+ leaderProxy.reopen();
}
notLeaders = ensureAllReplicasAreActive(testCollectionName, "shard1", 1, 2, maxWaitSecsToSeeAllActive);
@@ -328,7 +331,24 @@ public class HttpPartitionTest extends AbstractFullDistribZkTestBase {
// try to clean up
attemptCollectionDelete(cloudClient, testCollectionName);
}
-
+
+ private void waitForDownState(String collection, String replicaName, long ms) throws KeeperException, InterruptedException {
+ TimeOut timeOut = new TimeOut(ms, TimeUnit.MILLISECONDS, TimeSource.CURRENT_TIME);
+ Replica.State replicaState = Replica.State.ACTIVE;
+ while (!timeOut.hasTimedOut()) {
+ ZkStateReader zkr = cloudClient.getZkStateReader();
+ zkr.forceUpdateCollection(collection);; // force the state to be fresh
+ ClusterState cs = zkr.getClusterState();
+ Collection<Slice> slices = cs.getCollection(collection).getActiveSlices();
+ Slice slice = slices.iterator().next();
+ Replica partitionedReplica = slice.getReplica(replicaName);
+ replicaState = partitionedReplica.getState();
+ if (replicaState == Replica.State.DOWN) return;
+ }
+ assertEquals("The partitioned replica did not published it self as down",
+ Replica.State.DOWN, replicaState);
+ }
+
protected void testRf3() throws Exception {
// create a collection that has 1 shard but 2 replicas
String testCollectionName = "c8n_1x3";
@@ -344,27 +364,30 @@ public class HttpPartitionTest extends AbstractFullDistribZkTestBase {
+ " but found " + notLeaders.size() + "; clusterState: "
+ printClusterStateInfo(testCollectionName),
notLeaders.size() == 2);
+ JettySolrRunner leaderJetty = getJettyOnPort(getReplicaPort(getShardLeader(testCollectionName, "shard1", 1000)));
// ok, now introduce a network partition between the leader and the replica
SocketProxy proxy0 = getProxyForReplica(notLeaders.get(0));
+ SocketProxy leaderProxy = getProxyForReplica(getShardLeader(testCollectionName, "shard1", 1000));
proxy0.close();
+ leaderProxy.close();
// indexing during a partition
- sendDoc(2);
+ sendDoc(2, null, leaderJetty);
Thread.sleep(sleepMsBeforeHealPartition);
-
proxy0.reopen();
SocketProxy proxy1 = getProxyForReplica(notLeaders.get(1));
-
proxy1.close();
- sendDoc(3);
+ sendDoc(3, null, leaderJetty);
Thread.sleep(sleepMsBeforeHealPartition);
proxy1.reopen();
+
+ leaderProxy.reopen();
// sent 4 docs in so far, verify they are on the leader and replica
notLeaders = ensureAllReplicasAreActive(testCollectionName, "shard1", 1, 3, maxWaitSecsToSeeAllActive);
@@ -522,8 +545,19 @@ public class HttpPartitionTest extends AbstractFullDistribZkTestBase {
protected int sendDoc(int docId) throws Exception {
return sendDoc(docId, null);
}
-
+
+ // Send doc directly to a server (without going through proxy)
+ protected int sendDoc(int docId, Integer minRf, JettySolrRunner leaderJetty) throws IOException, SolrServerException {
+ try (HttpSolrClient solrClient = new HttpSolrClient.Builder(leaderJetty.getBaseUrl().toString()).build()) {
+ return sendDoc(docId, minRf, solrClient, cloudClient.getDefaultCollection());
+ }
+ }
+
protected int sendDoc(int docId, Integer minRf) throws Exception {
+ return sendDoc(docId, minRf, cloudClient, cloudClient.getDefaultCollection());
+ }
+
+ protected int sendDoc(int docId, Integer minRf, SolrClient solrClient, String collection) throws IOException, SolrServerException {
SolrInputDocument doc = new SolrInputDocument();
doc.addField(id, String.valueOf(docId));
doc.addField("a_t", "hello" + docId);
@@ -533,8 +567,7 @@ public class HttpPartitionTest extends AbstractFullDistribZkTestBase {
up.setParam(UpdateRequest.MIN_REPFACT, String.valueOf(minRf));
}
up.add(doc);
-
- return cloudClient.getMinAchievedReplicationFactor(cloudClient.getDefaultCollection(), cloudClient.request(up));
+ return cloudClient.getMinAchievedReplicationFactor(collection, solrClient.request(up, collection));
}
/**