You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by th...@apache.org on 2014/09/24 17:21:47 UTC
svn commit: r1627347 - in /lucene/dev/trunk/solr: ./
core/src/java/org/apache/solr/cloud/
core/src/java/org/apache/solr/update/processor/
core/src/test/org/apache/solr/cloud/
test-framework/src/java/org/apache/solr/cloud/
Author: thelabdude
Date: Wed Sep 24 15:21:47 2014
New Revision: 1627347
URL: http://svn.apache.org/r1627347
Log:
SOLR-6511: Fencepost error in LeaderInitiatedRecoveryThread; refactor HttpPartitionTest to resolve jenkins failures.
Added:
lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/LeaderFailoverAfterPartitionTest.java (with props)
lucene/dev/trunk/solr/test-framework/src/java/org/apache/solr/cloud/SocketProxy.java
- copied unchanged from r1627328, lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/SocketProxy.java
Removed:
lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/SocketProxy.java
Modified:
lucene/dev/trunk/solr/CHANGES.txt
lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java
lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/LeaderInitiatedRecoveryThread.java
lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java
lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/HttpPartitionTest.java
lucene/dev/trunk/solr/test-framework/src/java/org/apache/solr/cloud/AbstractFullDistribZkTestBase.java
Modified: lucene/dev/trunk/solr/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/CHANGES.txt?rev=1627347&r1=1627346&r2=1627347&view=diff
==============================================================================
--- lucene/dev/trunk/solr/CHANGES.txt (original)
+++ lucene/dev/trunk/solr/CHANGES.txt Wed Sep 24 15:21:47 2014
@@ -201,6 +201,8 @@ Bug Fixes
* SOLR-6509: Solr start scripts interactive mode doesn't honor -z argument (Timothy Potter)
+* SOLR-6511: Fencepost error in LeaderInitiatedRecoveryThread (Timothy Potter)
+
Other Changes
----------------------
Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java?rev=1627347&r1=1627346&r2=1627347&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java Wed Sep 24 15:21:47 2014
@@ -387,7 +387,8 @@ final class ShardLeaderElectionContext e
collection,
shardId,
coreNodeProps,
- 120);
+ 120,
+ coreNodeName);
zkController.ensureReplicaInLeaderInitiatedRecovery(
collection, shardId, coreNodeProps.getCoreUrl(), coreNodeProps, false);
Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/LeaderInitiatedRecoveryThread.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/LeaderInitiatedRecoveryThread.java?rev=1627347&r1=1627346&r2=1627347&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/LeaderInitiatedRecoveryThread.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/LeaderInitiatedRecoveryThread.java Wed Sep 24 15:21:47 2014
@@ -50,13 +50,15 @@ public class LeaderInitiatedRecoveryThre
protected String shardId;
protected ZkCoreNodeProps nodeProps;
protected int maxTries;
+ protected String leaderCoreNodeName;
public LeaderInitiatedRecoveryThread(ZkController zkController,
CoreContainer cc,
String collection,
String shardId,
ZkCoreNodeProps nodeProps,
- int maxTries)
+ int maxTries,
+ String leaderCoreNodeName)
{
super("LeaderInitiatedRecoveryThread-"+nodeProps.getCoreName());
this.zkController = zkController;
@@ -65,6 +67,7 @@ public class LeaderInitiatedRecoveryThre
this.shardId = shardId;
this.nodeProps = nodeProps;
this.maxTries = maxTries;
+ this.leaderCoreNodeName = leaderCoreNodeName;
setDaemon(true);
}
@@ -103,7 +106,7 @@ public class LeaderInitiatedRecoveryThre
recoverRequestCmd.setAction(CoreAdminAction.REQUESTRECOVERY);
recoverRequestCmd.setCoreName(coreNeedingRecovery);
- while (continueTrying && ++tries < maxTries) {
+ while (continueTrying && ++tries <= maxTries) {
if (tries > 1) {
log.warn("Asking core={} coreNodeName={} on " + recoveryUrl +
" to recover; unsuccessful after "+tries+" of "+maxTries+" attempts so far ...", coreNeedingRecovery, replicaCoreNodeName);
@@ -150,7 +153,7 @@ public class LeaderInitiatedRecoveryThre
if (coreContainer.isShutDown()) {
log.warn("Stop trying to send recovery command to downed replica core={} coreNodeName={} on "
- + replicaNodeName + " because my core container is close.", coreNeedingRecovery, replicaCoreNodeName);
+ + replicaNodeName + " because my core container is closed.", coreNeedingRecovery, replicaCoreNodeName);
continueTrying = false;
break;
}
@@ -170,6 +173,24 @@ public class LeaderInitiatedRecoveryThre
break;
}
+ // stop trying if I'm no longer the leader
+ if (leaderCoreNodeName != null && collection != null) {
+ String leaderCoreNodeNameFromZk = null;
+ try {
+ leaderCoreNodeNameFromZk = zkController.getZkStateReader().getLeaderRetry(collection, shardId, 1000).getName();
+ } catch (Exception exc) {
+ log.error("Failed to determine if " + leaderCoreNodeName + " is still the leader for " + collection +
+ " " + shardId + " before starting leader-initiated recovery thread for " + replicaUrl + " due to: " + exc);
+ }
+ if (!leaderCoreNodeName.equals(leaderCoreNodeNameFromZk)) {
+ log.warn("Stop trying to send recovery command to downed replica core=" + coreNeedingRecovery +
+ ",coreNodeName=" + replicaCoreNodeName + " on " + replicaNodeName + " because " +
+ leaderCoreNodeName + " is no longer the leader! New leader is " + leaderCoreNodeNameFromZk);
+ continueTrying = false;
+ break;
+ }
+ }
+
// additional safeguard against the replica trying to be in the active state
// before acknowledging the leader initiated recovery command
if (continueTrying && collection != null && shardId != null) {
Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java?rev=1627347&r1=1627346&r2=1627347&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java Wed Sep 24 15:21:47 2014
@@ -595,7 +595,9 @@ public class DistributedUpdateProcessor
String fromCollection = req.getParams().get(DISTRIB_FROM_COLLECTION); // is it because of a routing rule?
if (fromCollection == null) {
log.error("Request says it is coming from leader, but we are the leader: " + req.getParamString());
- throw new SolrException(ErrorCode.SERVICE_UNAVAILABLE, "Request says it is coming from leader, but we are the leader");
+ SolrException solrExc = new SolrException(ErrorCode.SERVICE_UNAVAILABLE, "Request says it is coming from leader, but we are the leader");
+ solrExc.setMetadata("cause", "LeaderChanged");
+ throw solrExc;
}
}
}
@@ -805,57 +807,92 @@ public class DistributedUpdateProcessor
DistribPhase.parseParam(error.req.uReq.getParams().get(DISTRIB_UPDATE_PARAM));
if (phase != DistribPhase.FROMLEADER)
continue; // don't have non-leaders try to recovery other nodes
-
- final String replicaUrl = error.req.node.getUrl();
+
+ final String replicaUrl = error.req.node.getUrl();
+
+ // if the remote replica failed the request because of leader change (SOLR-6511), then fail the request
+ String cause = (error.e instanceof SolrException) ? ((SolrException)error.e).getMetadata("cause") : null;
+ if ("LeaderChanged".equals(cause)) {
+ // let's just fail this request and let the client retry? or just call processAdd again?
+ log.error("On "+cloudDesc.getCoreNodeName()+", replica "+replicaUrl+
+ " now thinks it is the leader! Failing the request to let the client retry! "+error.e);
+ rsp.setException(error.e);
+ break;
+ }
int maxTries = 1;
boolean sendRecoveryCommand = true;
String collection = null;
String shardId = null;
-
+
if (error.req.node instanceof StdNode) {
StdNode stdNode = (StdNode)error.req.node;
collection = stdNode.getCollection();
shardId = stdNode.getShardId();
+
+ // before we go setting other replicas to down, make sure we're still the leader!
+ String leaderCoreNodeName = null;
try {
- // if false, then the node is probably not "live" anymore
- sendRecoveryCommand =
- zkController.ensureReplicaInLeaderInitiatedRecovery(collection,
- shardId,
- replicaUrl,
- stdNode.getNodeProps(),
- false);
-
- // we want to try more than once, ~10 minutes
- if (sendRecoveryCommand) {
- maxTries = 120;
- } // else the node is no longer "live" so no need to send any recovery command
-
- } catch (Exception e) {
- log.error("Leader failed to set replica "+
- error.req.node.getUrl()+" state to DOWN due to: "+e, e);
+ leaderCoreNodeName = zkController.getZkStateReader().getLeaderRetry(collection, shardId).getName();
+ } catch (Exception exc) {
+ log.error("Failed to determine if " + cloudDesc.getCoreNodeName() + " is still the leader for " + collection +
+ " " + shardId + " before putting " + replicaUrl + " into leader-initiated recovery due to: " + exc);
+ }
+
+ if (cloudDesc.getCoreNodeName().equals(leaderCoreNodeName)) {
+ try {
+ // if false, then the node is probably not "live" anymore
+ sendRecoveryCommand =
+ zkController.ensureReplicaInLeaderInitiatedRecovery(collection,
+ shardId,
+ replicaUrl,
+ stdNode.getNodeProps(),
+ false);
+
+ // we want to try more than once, ~10 minutes
+ if (sendRecoveryCommand) {
+ maxTries = 120;
+ } // else the node is no longer "live" so no need to send any recovery command
+
+ } catch (KeeperException.SessionExpiredException see) {
+ log.error("Leader failed to set replica " +
+ error.req.node.getUrl() + " state to DOWN due to: " + see, see);
+ // our session is expired, which means our state is suspect, so don't go
+ // putting other replicas in recovery (see SOLR-6511)
+ sendRecoveryCommand = false;
+ } catch (Exception e) {
+ log.error("Leader failed to set replica " +
+ error.req.node.getUrl() + " state to DOWN due to: " + e, e);
+ // will go ahead and try to send the recovery command once after this error
+ }
+ } else {
+ // not the leader anymore maybe?
+ sendRecoveryCommand = false;
+ log.warn("Core "+cloudDesc.getCoreNodeName()+" is no longer the leader for "+collection+" "+
+ shardId+", no request recovery command will be sent!");
}
} // else not a StdNode, recovery command still gets sent once
if (!sendRecoveryCommand)
continue; // the replica is already in recovery handling or is not live
-
- Throwable rootCause = SolrException.getRootCause(error.e);
- log.error("Setting up to try to start recovery on replica "+replicaUrl+" after: "+rootCause);
-
+
+ Throwable rootCause = SolrException.getRootCause(error.e);
+ log.error("Setting up to try to start recovery on replica " + replicaUrl + " after: " + rootCause);
+
// try to send the recovery command to the downed replica in a background thread
- CoreContainer coreContainer = req.getCore().getCoreDescriptor().getCoreContainer();
- LeaderInitiatedRecoveryThread lirThread =
+ CoreContainer coreContainer = req.getCore().getCoreDescriptor().getCoreContainer();
+ LeaderInitiatedRecoveryThread lirThread =
new LeaderInitiatedRecoveryThread(zkController,
- coreContainer,
- collection,
- shardId,
- error.req.node.getNodeProps(),
- maxTries);
+ coreContainer,
+ collection,
+ shardId,
+ error.req.node.getNodeProps(),
+ maxTries,
+ cloudDesc.getCoreNodeName()); // core node name of current leader
ExecutorService executor = coreContainer.getUpdateShardHandler().getUpdateExecutor();
- executor.execute(lirThread);
+ executor.execute(lirThread);
}
-
+
if (replicationTracker != null) {
rsp.getResponseHeader().add(UpdateRequest.REPFACT, replicationTracker.getAchievedRf());
rsp.getResponseHeader().add(UpdateRequest.MIN_REPFACT, replicationTracker.minRf);
Modified: lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/HttpPartitionTest.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/HttpPartitionTest.java?rev=1627347&r1=1627346&r2=1627347&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/HttpPartitionTest.java (original)
+++ lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/HttpPartitionTest.java Wed Sep 24 15:21:47 2014
@@ -18,9 +18,6 @@ package org.apache.solr.cloud;
*/
import java.io.File;
-import java.net.ServerSocket;
-import java.net.URI;
-import java.net.URL;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
@@ -30,7 +27,6 @@ import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
-import org.apache.lucene.util.LuceneTestCase.AwaitsFix;
import org.apache.lucene.util.LuceneTestCase.Slow;
import org.apache.solr.JSONTestUtil;
import org.apache.solr.SolrTestCaseJ4.SuppressSSL;
@@ -39,6 +35,7 @@ import org.apache.solr.client.solrj.impl
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
import org.apache.solr.client.solrj.request.QueryRequest;
import org.apache.solr.client.solrj.request.UpdateRequest;
+import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrInputDocument;
import org.apache.solr.common.cloud.ClusterState;
import org.apache.solr.common.cloud.Replica;
@@ -55,26 +52,24 @@ import org.slf4j.LoggerFactory;
* Simulates HTTP partitions between a leader and replica but the replica does
* not lose its ZooKeeper connection.
*/
+
@Slow
@SuppressSSL(bugUrl = "https://issues.apache.org/jira/browse/SOLR-5776")
-@AwaitsFix(bugUrl = "https://issues.apache.org/jira/browse/SOLR-6241")
public class HttpPartitionTest extends AbstractFullDistribZkTestBase {
- private static final transient Logger log =
+ protected static final transient Logger log =
LoggerFactory.getLogger(HttpPartitionTest.class);
// To prevent the test assertions firing too fast before cluster state
// recognizes (and propagates) partitions
- private static final long sleepMsBeforeHealPartition = 2000L;
-
- private static final int maxWaitSecsToSeeAllActive = 30;
-
- private Map<URI,SocketProxy> proxies = new HashMap<URI,SocketProxy>();
-
+ protected static final long sleepMsBeforeHealPartition = 2000L;
+
+ protected static final int maxWaitSecsToSeeAllActive = 30;
+
public HttpPartitionTest() {
super();
sliceCount = 2;
- shardCount = 2;
+ shardCount = 3;
}
@Before
@@ -87,60 +82,24 @@ public class HttpPartitionTest extends A
@Override
@After
public void tearDown() throws Exception {
- System.clearProperty("numShards");
-
try {
super.tearDown();
} catch (Exception exc) {}
resetExceptionIgnores();
-
- // close socket proxies after super.tearDown
- if (!proxies.isEmpty()) {
- for (SocketProxy proxy : proxies.values()) {
- proxy.close();
- }
- }
}
/**
- * Overrides the parent implementation so that we can configure a socket proxy
- * to sit infront of each Jetty server, which gives us the ability to simulate
- * network partitions without having to fuss with IPTables (which is not very
- * cross platform friendly).
+ * Overrides the parent implementation to install a SocketProxy in-front of the Jetty server.
*/
@Override
public JettySolrRunner createJetty(File solrHome, String dataDir,
String shardList, String solrConfigOverride, String schemaOverride)
- throws Exception {
-
- JettySolrRunner jetty = new JettySolrRunner(solrHome.getPath(), context,
- 0, solrConfigOverride, schemaOverride, false,
- getExtraServlets(), sslConfig, getExtraRequestFilters());
- jetty.setShards(shardList);
- jetty.setDataDir(getDataDir(dataDir));
-
- // setup to proxy Http requests to this server unless it is the control
- // server
- int proxyPort = getNextAvailablePort();
- jetty.setProxyPort(proxyPort);
- jetty.start();
-
- // create a socket proxy for the jetty server ...
- SocketProxy proxy = new SocketProxy(proxyPort, jetty.getBaseUrl().toURI());
- proxies.put(proxy.getUrl(), proxy);
-
- return jetty;
+ throws Exception
+ {
+ return createProxiedJetty(solrHome, dataDir, shardList, solrConfigOverride, schemaOverride);
}
-
- protected int getNextAvailablePort() throws Exception {
- int port = -1;
- try (ServerSocket s = new ServerSocket(0)) {
- port = s.getLocalPort();
- }
- return port;
- }
-
+
@Override
public void doTest() throws Exception {
waitForThingsToLevelOut(30000);
@@ -148,12 +107,16 @@ public class HttpPartitionTest extends A
// test a 1x2 collection
testRf2();
+ waitForThingsToLevelOut(30000);
+
// now do similar for a 1x3 collection while taking 2 replicas on-and-off
// each time
testRf3();
- // kill a leader and make sure recovery occurs as expected
- testRf3WithLeaderFailover();
+ waitForThingsToLevelOut(30000);
+
+ // have the leader lose its Zk session temporarily
+ testLeaderZkSessionLoss();
}
protected void testRf2() throws Exception {
@@ -247,11 +210,9 @@ public class HttpPartitionTest extends A
ensureAllReplicasAreActive(testCollectionName, "shard1", 1, 3, maxWaitSecsToSeeAllActive);
assertTrue("Expected 2 replicas for collection " + testCollectionName
+ " but found " + notLeaders.size() + "; clusterState: "
- + printClusterStateInfo(),
+ + printClusterStateInfo(testCollectionName),
notLeaders.size() == 2);
-
- sendDoc(1);
-
+
// ok, now introduce a network partition between the leader and the replica
SocketProxy proxy0 = getProxyForReplica(notLeaders.get(0));
@@ -289,127 +250,112 @@ public class HttpPartitionTest extends A
log.warn("Could not delete collection {} after test completed", testCollectionName);
}
}
-
- protected void testRf3WithLeaderFailover() throws Exception {
- // now let's create a partition in one of the replicas and outright
- // kill the leader ... see what happens
- // create a collection that has 1 shard but 3 replicas
- String testCollectionName = "c8n_1x3_lf"; // _lf is leader fails
- createCollection(testCollectionName, 1, 3, 1);
+
+ // test inspired by SOLR-6511
+ protected void testLeaderZkSessionLoss() throws Exception {
+
+ String testCollectionName = "c8n_1x2_leader_session_loss";
+ createCollection(testCollectionName, 1, 2, 1);
cloudClient.setDefaultCollection(testCollectionName);
-
- sendDoc(1);
-
- List<Replica> notLeaders =
- ensureAllReplicasAreActive(testCollectionName, "shard1", 1, 3, maxWaitSecsToSeeAllActive);
- assertTrue("Expected 2 replicas for collection " + testCollectionName
- + " but found " + notLeaders.size() + "; clusterState: "
- + printClusterStateInfo(),
- notLeaders.size() == 2);
-
+
sendDoc(1);
-
- // ok, now introduce a network partition between the leader and the replica
- SocketProxy proxy0 = null;
- proxy0 = getProxyForReplica(notLeaders.get(0));
-
- proxy0.close();
-
- // indexing during a partition
- sendDoc(2);
-
- Thread.sleep(sleepMsBeforeHealPartition);
-
- proxy0.reopen();
-
- SocketProxy proxy1 = getProxyForReplica(notLeaders.get(1));
-
- proxy1.close();
-
- sendDoc(3);
-
- Thread.sleep(sleepMsBeforeHealPartition);
- proxy1.reopen();
-
- // sent 4 docs in so far, verify they are on the leader and replica
- notLeaders = ensureAllReplicasAreActive(testCollectionName, "shard1", 1, 3, maxWaitSecsToSeeAllActive);
-
- sendDoc(4);
-
- assertDocsExistInAllReplicas(notLeaders, testCollectionName, 1, 4);
-
- Replica leader =
+
+ List<Replica> notLeaders =
+ ensureAllReplicasAreActive(testCollectionName, "shard1", 1, 2, maxWaitSecsToSeeAllActive);
+ assertTrue("Expected 1 replicas for collection " + testCollectionName
+ + " but found " + notLeaders.size() + "; clusterState: "
+ + printClusterStateInfo(testCollectionName),
+ notLeaders.size() == 1);
+
+ Replica leader =
cloudClient.getZkStateReader().getLeaderRetry(testCollectionName, "shard1");
String leaderNode = leader.getNodeName();
assertNotNull("Could not find leader for shard1 of "+
- testCollectionName+"; clusterState: "+printClusterStateInfo(), leader);
+ testCollectionName+"; clusterState: "+printClusterStateInfo(testCollectionName), leader);
JettySolrRunner leaderJetty = getJettyOnPort(getReplicaPort(leader));
-
- // since maxShardsPerNode is 1, we're safe to kill the leader
- notLeaders = ensureAllReplicasAreActive(testCollectionName, "shard1", 1, 3, maxWaitSecsToSeeAllActive);
- proxy0 = getProxyForReplica(notLeaders.get(0));
- proxy0.close();
-
- // indexing during a partition
- // doc should be on leader and 1 replica
- sendDoc(5);
- assertDocExists(getHttpSolrServer(leader, testCollectionName), testCollectionName, "5");
- assertDocExists(getHttpSolrServer(notLeaders.get(1), testCollectionName), testCollectionName, "5");
-
- Thread.sleep(sleepMsBeforeHealPartition);
-
- String shouldNotBeNewLeaderNode = notLeaders.get(0).getNodeName();
+ HttpSolrServer leaderSolr = getHttpSolrServer(leader, testCollectionName);
+ SolrInputDocument doc = new SolrInputDocument();
+ doc.addField(id, String.valueOf(2));
+ doc.addField("a_t", "hello" + 2);
- // kill the leader
- leaderJetty.stop();
-
- if (leaderJetty.isRunning())
- fail("Failed to stop the leader on "+leaderNode);
-
- SocketProxy oldLeaderProxy = getProxyForReplica(leader);
- if (oldLeaderProxy != null) {
- oldLeaderProxy.close();
- } else {
- log.warn("No SocketProxy found for old leader node "+leaderNode);
- }
+ // cause leader migration by expiring the current leader's zk session
+ chaosMonkey.expireSession(leaderJetty);
- Thread.sleep(10000); // give chance for new leader to be elected.
-
- Replica newLeader =
- cloudClient.getZkStateReader().getLeaderRetry(testCollectionName, "shard1", 60000);
-
- assertNotNull("No new leader was elected after 60 seconds; clusterState: "+
- printClusterStateInfo(),newLeader);
-
- assertTrue("Expected node "+shouldNotBeNewLeaderNode+
- " to NOT be the new leader b/c it was out-of-sync with the old leader! ClusterState: "+
- printClusterStateInfo(),
- !shouldNotBeNewLeaderNode.equals(newLeader.getNodeName()));
-
- proxy0.reopen();
-
+ String expectedNewLeaderCoreNodeName = notLeaders.get(0).getName();
long timeout = System.nanoTime() + TimeUnit.NANOSECONDS.convert(60, TimeUnit.SECONDS);
while (System.nanoTime() < timeout) {
- cloudClient.getZkStateReader().updateClusterState(true);
+ String currentLeaderName = null;
+ try {
+ Replica currentLeader =
+ cloudClient.getZkStateReader().getLeaderRetry(testCollectionName, "shard1");
+ currentLeaderName = currentLeader.getName();
+ } catch (Exception exc) {}
- List<Replica> activeReps = getActiveOrRecoveringReplicas(testCollectionName, "shard1");
- if (activeReps.size() == 2) break;
- Thread.sleep(1000);
+ if (expectedNewLeaderCoreNodeName.equals(currentLeaderName))
+ break; // new leader was elected after zk session expiration
+
+ Thread.sleep(500);
}
- List<Replica> participatingReplicas = getActiveOrRecoveringReplicas(testCollectionName, "shard1");
- assertTrue("Expected 2 of 3 replicas to be active but only found "+
- participatingReplicas.size()+"; "+participatingReplicas+"; clusterState: "+printClusterStateInfo(),
- participatingReplicas.size() == 2);
+ Replica currentLeader =
+ cloudClient.getZkStateReader().getLeaderRetry(testCollectionName, "shard1");
+ assertEquals(expectedNewLeaderCoreNodeName, currentLeader.getName());
+
+ log.info("Sending doc 2 to old leader "+leader.getName());
+ try {
+ leaderSolr.add(doc);
+ leaderSolr.shutdown();
- sendDoc(6);
+ Replica oldLeaderInRecovery = null;
+ for (Replica next : getActiveOrRecoveringReplicas(testCollectionName, "shard1")) {
+ if (next.getName().equals(leader.getName()) &&
+ ZkStateReader.RECOVERING.equals(next.getStr(ZkStateReader.STATE_PROP)))
+ {
+ oldLeaderInRecovery = next;
+ break;
+ }
+ }
+ // if the old leader is not active or recovering, the add should have failed
+ if (oldLeaderInRecovery != null) {
+ HttpSolrServer oldLeaderSolr = getHttpSolrServer(oldLeaderInRecovery, testCollectionName);
+ try {
+ assertDocExists(oldLeaderSolr, testCollectionName, "2");
+ } finally {
+ oldLeaderSolr.shutdown();
+ }
+ } else {
+ fail("Send doc 2 to old leader " + leader.getName() +
+ " should have failed! ClusterState: " + printClusterStateInfo(testCollectionName));
+ }
+
+ } catch (SolrException exc) {
+ // this is expected ..
+ leaderSolr = getHttpSolrServer(currentLeader, testCollectionName);
+ try {
+ leaderSolr.add(doc); // this should work
+ } finally {
+ leaderSolr.shutdown();
+ }
+ }
+
+ List<Replica> participatingReplicas = getActiveOrRecoveringReplicas(testCollectionName, "shard1");
Set<String> replicasToCheck = new HashSet<>();
for (Replica stillUp : participatingReplicas)
replicasToCheck.add(stillUp.getName());
waitToSeeReplicasActive(testCollectionName, "shard1", replicasToCheck, 20);
- assertDocsExistInAllReplicas(participatingReplicas, testCollectionName, 1, 6);
+ assertDocsExistInAllReplicas(participatingReplicas, testCollectionName, 1, 2);
+
+ // try to clean up
+ try {
+ CollectionAdminRequest req = new CollectionAdminRequest.Delete();
+ req.setCollectionName(testCollectionName);
+ req.process(cloudClient);
+ } catch (Exception e) {
+ // don't fail the test
+ log.warn("Could not delete collection {} after test completed", testCollectionName);
+ }
}
protected List<Replica> getActiveOrRecoveringReplicas(String testCollectionName, String shardId) throws Exception {
@@ -431,21 +377,7 @@ public class HttpPartitionTest extends A
replicas.addAll(activeReplicas.values());
return replicas;
}
-
- protected SocketProxy getProxyForReplica(Replica replica) throws Exception {
- String replicaBaseUrl = replica.getStr(ZkStateReader.BASE_URL_PROP);
- assertNotNull(replicaBaseUrl);
- URL baseUrl = new URL(replicaBaseUrl);
-
- SocketProxy proxy = proxies.get(baseUrl.toURI());
- if (proxy == null && !baseUrl.toExternalForm().endsWith("/")) {
- baseUrl = new URL(baseUrl.toExternalForm() + "/");
- proxy = proxies.get(baseUrl.toURI());
- }
- assertNotNull("No proxy found for " + baseUrl + "!", proxy);
- return proxy;
- }
-
+
protected void assertDocsExistInAllReplicas(List<Replica> notLeaders,
String testCollectionName, int firstDocId, int lastDocId)
throws Exception {
@@ -501,33 +433,11 @@ public class HttpPartitionTest extends A
protected void assertDocExists(HttpSolrServer solr, String coll, String docId) throws Exception {
QueryRequest qr = new QueryRequest(params("qt", "/get", "id", docId, "distrib", "false"));
NamedList rsp = solr.request(qr);
- String match =
- JSONTestUtil.matchObj("/id", rsp.get("doc"), new Integer(docId));
+ String match = JSONTestUtil.matchObj("/id", rsp.get("doc"), new Integer(docId));
assertTrue("Doc with id=" + docId + " not found in " + solr.getBaseURL()
+ " due to: " + match + "; rsp="+rsp, match == null);
}
- protected JettySolrRunner getJettyOnPort(int port) {
- JettySolrRunner theJetty = null;
- for (JettySolrRunner jetty : jettys) {
- if (port == jetty.getLocalPort()) {
- theJetty = jetty;
- break;
- }
- }
-
- if (theJetty == null) {
- if (controlJetty.getLocalPort() == port) {
- theJetty = controlJetty;
- }
- }
-
- if (theJetty == null)
- fail("Not able to find JettySolrRunner for port: "+port);
-
- return theJetty;
- }
-
protected int getReplicaPort(Replica replica) {
String replicaNode = replica.getNodeName();
String tmp = replicaNode.substring(replicaNode.indexOf(':')+1);
@@ -580,7 +490,7 @@ public class HttpPartitionTest extends A
if (!allReplicasUp)
fail("Didn't see replicas "+ replicasToCheck +
- " come up within " + maxWaitMs + " ms! ClusterState: " + printClusterStateInfo());
+ " come up within " + maxWaitMs + " ms! ClusterState: " + printClusterStateInfo(testCollectionName));
long diffMs = (System.currentTimeMillis() - startMs);
log.info("Took " + diffMs + " ms to see replicas ["+replicasToCheck+"] become active.");
Added: lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/LeaderFailoverAfterPartitionTest.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/LeaderFailoverAfterPartitionTest.java?rev=1627347&view=auto
==============================================================================
--- lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/LeaderFailoverAfterPartitionTest.java (added)
+++ lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/LeaderFailoverAfterPartitionTest.java Wed Sep 24 15:21:47 2014
@@ -0,0 +1,181 @@
+package org.apache.solr.cloud;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import org.apache.lucene.util.LuceneTestCase.Slow;
+import org.apache.solr.SolrTestCaseJ4.SuppressSSL;
+import org.apache.solr.client.solrj.embedded.JettySolrRunner;
+import org.apache.solr.client.solrj.request.CollectionAdminRequest;
+import org.apache.solr.common.cloud.Replica;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Tests leader-initiated recovery scenarios after a leader node fails
+ * and one of the replicas is out-of-sync.
+ */
+@Slow
+@SuppressSSL(bugUrl = "https://issues.apache.org/jira/browse/SOLR-5776")
+public class LeaderFailoverAfterPartitionTest extends HttpPartitionTest {
+
+ public LeaderFailoverAfterPartitionTest() {
+ super();
+ }
+
+
+ @Override
+ public void doTest() throws Exception {
+ waitForThingsToLevelOut(30000);
+
+ // kill a leader and make sure recovery occurs as expected
+ testRf3WithLeaderFailover();
+ }
+
+ protected void testRf3WithLeaderFailover() throws Exception {
+ // now let's create a partition in one of the replicas and outright
+ // kill the leader ... see what happens
+ // create a collection that has 1 shard but 3 replicas
+ String testCollectionName = "c8n_1x3_lf"; // _lf is leader fails
+ createCollection(testCollectionName, 1, 3, 1);
+ cloudClient.setDefaultCollection(testCollectionName);
+
+ sendDoc(1);
+
+ List<Replica> notLeaders =
+ ensureAllReplicasAreActive(testCollectionName, "shard1", 1, 3, maxWaitSecsToSeeAllActive);
+ assertTrue("Expected 2 replicas for collection " + testCollectionName
+ + " but found " + notLeaders.size() + "; clusterState: "
+ + printClusterStateInfo(testCollectionName),
+ notLeaders.size() == 2);
+
+ // ok, now introduce a network partition between the leader and the replica
+ SocketProxy proxy0 = null;
+ proxy0 = getProxyForReplica(notLeaders.get(0));
+
+ proxy0.close();
+
+ // indexing during a partition
+ sendDoc(2);
+
+ Thread.sleep(sleepMsBeforeHealPartition);
+
+ proxy0.reopen();
+
+ SocketProxy proxy1 = getProxyForReplica(notLeaders.get(1));
+
+ proxy1.close();
+
+ sendDoc(3);
+
+ Thread.sleep(sleepMsBeforeHealPartition);
+ proxy1.reopen();
+
+ // sent 4 docs in so far, verify they are on the leader and replica
+ notLeaders = ensureAllReplicasAreActive(testCollectionName, "shard1", 1, 3, maxWaitSecsToSeeAllActive);
+
+ sendDoc(4);
+
+ assertDocsExistInAllReplicas(notLeaders, testCollectionName, 1, 4);
+
+ Replica leader =
+ cloudClient.getZkStateReader().getLeaderRetry(testCollectionName, "shard1");
+ String leaderNode = leader.getNodeName();
+ assertNotNull("Could not find leader for shard1 of "+
+ testCollectionName+"; clusterState: "+printClusterStateInfo(testCollectionName), leader);
+ JettySolrRunner leaderJetty = getJettyOnPort(getReplicaPort(leader));
+
+ // since maxShardsPerNode is 1, we're safe to kill the leader
+ notLeaders = ensureAllReplicasAreActive(testCollectionName, "shard1", 1, 3, maxWaitSecsToSeeAllActive);
+ proxy0 = getProxyForReplica(notLeaders.get(0));
+ proxy0.close();
+
+ // indexing during a partition
+ // doc should be on leader and 1 replica
+ sendDoc(5);
+
+ assertDocExists(getHttpSolrServer(leader, testCollectionName), testCollectionName, "5");
+ assertDocExists(getHttpSolrServer(notLeaders.get(1), testCollectionName), testCollectionName, "5");
+
+ Thread.sleep(sleepMsBeforeHealPartition);
+
+ String shouldNotBeNewLeaderNode = notLeaders.get(0).getNodeName();
+
+ //chaosMonkey.expireSession(leaderJetty);
+ // kill the leader
+ leaderJetty.stop();
+ if (leaderJetty.isRunning())
+ fail("Failed to stop the leader on "+leaderNode);
+
+ SocketProxy oldLeaderProxy = getProxyForReplica(leader);
+ if (oldLeaderProxy != null) {
+ oldLeaderProxy.close();
+ } else {
+ log.warn("No SocketProxy found for old leader node "+leaderNode);
+ }
+
+ Thread.sleep(10000); // give chance for new leader to be elected.
+
+ Replica newLeader =
+ cloudClient.getZkStateReader().getLeaderRetry(testCollectionName, "shard1", 60000);
+
+ assertNotNull("No new leader was elected after 60 seconds; clusterState: "+
+ printClusterStateInfo(testCollectionName),newLeader);
+
+ assertTrue("Expected node "+shouldNotBeNewLeaderNode+
+ " to NOT be the new leader b/c it was out-of-sync with the old leader! ClusterState: "+
+ printClusterStateInfo(testCollectionName),
+ !shouldNotBeNewLeaderNode.equals(newLeader.getNodeName()));
+
+ proxy0.reopen();
+
+ long timeout = System.nanoTime() + TimeUnit.NANOSECONDS.convert(60, TimeUnit.SECONDS);
+ while (System.nanoTime() < timeout) {
+ cloudClient.getZkStateReader().updateClusterState(true);
+
+ List<Replica> activeReps = getActiveOrRecoveringReplicas(testCollectionName, "shard1");
+ if (activeReps.size() >= 2) break;
+ Thread.sleep(1000);
+ }
+
+ List<Replica> participatingReplicas = getActiveOrRecoveringReplicas(testCollectionName, "shard1");
+ assertTrue("Expected 2 of 3 replicas to be active but only found "+
+ participatingReplicas.size()+"; "+participatingReplicas+"; clusterState: "+
+ printClusterStateInfo(testCollectionName),
+ participatingReplicas.size() >= 2);
+
+ sendDoc(6);
+
+ Set<String> replicasToCheck = new HashSet<>();
+ for (Replica stillUp : participatingReplicas)
+ replicasToCheck.add(stillUp.getName());
+ waitToSeeReplicasActive(testCollectionName, "shard1", replicasToCheck, 20);
+ assertDocsExistInAllReplicas(participatingReplicas, testCollectionName, 1, 6);
+
+ // try to clean up
+ try {
+ CollectionAdminRequest req = new CollectionAdminRequest.Delete();
+ req.setCollectionName(testCollectionName);
+ req.process(cloudClient);
+ } catch (Exception e) {
+ // don't fail the test
+ log.warn("Could not delete collection {} after test completed", testCollectionName);
+ }
+ }
+}
Modified: lucene/dev/trunk/solr/test-framework/src/java/org/apache/solr/cloud/AbstractFullDistribZkTestBase.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/test-framework/src/java/org/apache/solr/cloud/AbstractFullDistribZkTestBase.java?rev=1627347&r1=1627346&r2=1627347&view=diff
==============================================================================
--- lucene/dev/trunk/solr/test-framework/src/java/org/apache/solr/cloud/AbstractFullDistribZkTestBase.java (original)
+++ lucene/dev/trunk/solr/test-framework/src/java/org/apache/solr/cloud/AbstractFullDistribZkTestBase.java Wed Sep 24 15:21:47 2014
@@ -26,7 +26,9 @@ import static org.apache.solr.common.clo
import java.io.File;
import java.io.IOException;
+import java.net.ServerSocket;
import java.net.URI;
+import java.net.URL;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
@@ -81,6 +83,8 @@ import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
+import org.noggit.CharArr;
+import org.noggit.JSONWriter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -123,6 +127,8 @@ public abstract class AbstractFullDistri
private boolean cloudInit;
protected boolean checkCreatedVsState;
protected boolean useJettyDataDir = true;
+
+ protected Map<URI,SocketProxy> proxies = new HashMap<URI,SocketProxy>();
public static class CloudJettyRunner {
public JettySolrRunner jetty;
@@ -515,6 +521,77 @@ public abstract class AbstractFullDistri
return jetty;
}
+ /**
+ * Creates a JettySolrRunner with a socket proxy sitting infront of the Jetty server,
+ * which gives us the ability to simulate network partitions without having to fuss
+ * with IPTables.
+ */
+ public JettySolrRunner createProxiedJetty(File solrHome, String dataDir,
+ String shardList, String solrConfigOverride, String schemaOverride)
+ throws Exception {
+
+ JettySolrRunner jetty = new JettySolrRunner(solrHome.getPath(), context,
+ 0, solrConfigOverride, schemaOverride, false,
+ getExtraServlets(), sslConfig, getExtraRequestFilters());
+ jetty.setShards(shardList);
+ jetty.setDataDir(getDataDir(dataDir));
+
+ // setup to proxy Http requests to this server unless it is the control
+ // server
+ int proxyPort = getNextAvailablePort();
+ jetty.setProxyPort(proxyPort);
+ jetty.start();
+
+ // create a socket proxy for the jetty server ...
+ SocketProxy proxy = new SocketProxy(proxyPort, jetty.getBaseUrl().toURI());
+ proxies.put(proxy.getUrl(), proxy);
+
+ return jetty;
+ }
+
+ protected JettySolrRunner getJettyOnPort(int port) {
+ JettySolrRunner theJetty = null;
+ for (JettySolrRunner jetty : jettys) {
+ if (port == jetty.getLocalPort()) {
+ theJetty = jetty;
+ break;
+ }
+ }
+
+ if (theJetty == null) {
+ if (controlJetty.getLocalPort() == port) {
+ theJetty = controlJetty;
+ }
+ }
+
+ if (theJetty == null)
+ fail("Not able to find JettySolrRunner for port: "+port);
+
+ return theJetty;
+ }
+
+ protected SocketProxy getProxyForReplica(Replica replica) throws Exception {
+ String replicaBaseUrl = replica.getStr(ZkStateReader.BASE_URL_PROP);
+ assertNotNull(replicaBaseUrl);
+ URL baseUrl = new URL(replicaBaseUrl);
+
+ SocketProxy proxy = proxies.get(baseUrl.toURI());
+ if (proxy == null && !baseUrl.toExternalForm().endsWith("/")) {
+ baseUrl = new URL(baseUrl.toExternalForm() + "/");
+ proxy = proxies.get(baseUrl.toURI());
+ }
+ assertNotNull("No proxy found for " + baseUrl + "!", proxy);
+ return proxy;
+ }
+
+ protected int getNextAvailablePort() throws Exception {
+ int port = -1;
+ try (ServerSocket s = new ServerSocket(0)) {
+ port = s.getLocalPort();
+ }
+ return port;
+ }
+
private File getRelativeSolrHomePath(File solrHome) {
String path = SolrResourceLoader.normalizeDir(new File(".").getAbsolutePath());
String base = new File(solrHome.getPath()).getAbsolutePath();
@@ -1467,6 +1544,13 @@ public abstract class AbstractFullDistri
System.clearProperty("zkHost");
System.clearProperty("numShards");
+
+ // close socket proxies after super.tearDown
+ if (!proxies.isEmpty()) {
+ for (SocketProxy proxy : proxies.values()) {
+ proxy.close();
+ }
+ }
}
@Override
@@ -1860,7 +1944,23 @@ public abstract class AbstractFullDistri
}
protected String printClusterStateInfo() throws Exception {
+ return printClusterStateInfo(null);
+ }
+
+ protected String printClusterStateInfo(String collection) throws Exception {
cloudClient.getZkStateReader().updateClusterState(true);
- return String.valueOf(cloudClient.getZkStateReader().getClusterState());
- }
+ String cs = null;
+ ClusterState clusterState = cloudClient.getZkStateReader().getClusterState();
+ if (collection != null) {
+ cs = clusterState.getCollection(collection).toString();
+ } else {
+ Map<String,DocCollection> map = new HashMap<String,DocCollection>();
+ for (String coll : clusterState.getCollections())
+ map.put(coll, clusterState.getCollection(coll));
+ CharArr out = new CharArr();
+ new JSONWriter(out, 2).write(map);
+ cs = out.toString();
+ }
+ return cs;
+ }
}