You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by th...@apache.org on 2014/10/08 20:41:39 UTC
svn commit: r1630196 - in /lucene/dev/branches/lucene_solr_4_10/solr: ./
core/src/java/org/apache/solr/cloud/
core/src/java/org/apache/solr/update/processor/
core/src/test/org/apache/solr/cloud/
test-framework/src/java/org/apache/solr/cloud/
Author: thelabdude
Date: Wed Oct 8 18:41:39 2014
New Revision: 1630196
URL: http://svn.apache.org/r1630196
Log:
SOLR-6511: backport to 4.10 branch
Added:
lucene/dev/branches/lucene_solr_4_10/solr/core/src/test/org/apache/solr/cloud/LeaderFailoverAfterPartitionTest.java (with props)
lucene/dev/branches/lucene_solr_4_10/solr/test-framework/src/java/org/apache/solr/cloud/SocketProxy.java (with props)
Removed:
lucene/dev/branches/lucene_solr_4_10/solr/core/src/test/org/apache/solr/cloud/SocketProxy.java
Modified:
lucene/dev/branches/lucene_solr_4_10/solr/CHANGES.txt
lucene/dev/branches/lucene_solr_4_10/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java
lucene/dev/branches/lucene_solr_4_10/solr/core/src/java/org/apache/solr/cloud/LeaderInitiatedRecoveryThread.java
lucene/dev/branches/lucene_solr_4_10/solr/core/src/java/org/apache/solr/cloud/ZkController.java
lucene/dev/branches/lucene_solr_4_10/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java
lucene/dev/branches/lucene_solr_4_10/solr/core/src/test/org/apache/solr/cloud/HttpPartitionTest.java
lucene/dev/branches/lucene_solr_4_10/solr/test-framework/src/java/org/apache/solr/cloud/AbstractFullDistribZkTestBase.java
Modified: lucene/dev/branches/lucene_solr_4_10/solr/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene_solr_4_10/solr/CHANGES.txt?rev=1630196&r1=1630195&r2=1630196&view=diff
==============================================================================
--- lucene/dev/branches/lucene_solr_4_10/solr/CHANGES.txt (original)
+++ lucene/dev/branches/lucene_solr_4_10/solr/CHANGES.txt Wed Oct 8 18:41:39 2014
@@ -19,6 +19,12 @@ See the tutorial at http://lucene.apache
================== 4.10.2 ==================
+Bug Fixes
+----------------------
+
+* SOLR-6511: Fencepost error in LeaderInitiatedRecoveryThread (Timothy Potter)
+
+
Other Changes
----------------------
@@ -296,6 +302,7 @@ Optimizations
* SOLR-6261: Run ZooKeeper watch event callbacks in parallel to the ZooKeeper
event thread. (Ramkumar Aiyengar via Mark Miller)
+
Other Changes
---------------------
@@ -438,7 +445,6 @@ New Features
* SOLR-6064: DebugComponent track output should be returned as a JSON
object rather than a list (Christine Poerschke, Alan Woodward)
-
Bug Fixes
----------------------
Modified: lucene/dev/branches/lucene_solr_4_10/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene_solr_4_10/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java?rev=1630196&r1=1630195&r2=1630196&view=diff
==============================================================================
--- lucene/dev/branches/lucene_solr_4_10/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java (original)
+++ lucene/dev/branches/lucene_solr_4_10/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java Wed Oct 8 18:41:39 2014
@@ -382,7 +382,8 @@ final class ShardLeaderElectionContext e
collection,
shardId,
coreNodeProps,
- 120);
+ 120,
+ coreNodeName);
zkController.ensureReplicaInLeaderInitiatedRecovery(
collection, shardId, coreNodeProps.getCoreUrl(), coreNodeProps, false);
Modified: lucene/dev/branches/lucene_solr_4_10/solr/core/src/java/org/apache/solr/cloud/LeaderInitiatedRecoveryThread.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene_solr_4_10/solr/core/src/java/org/apache/solr/cloud/LeaderInitiatedRecoveryThread.java?rev=1630196&r1=1630195&r2=1630196&view=diff
==============================================================================
--- lucene/dev/branches/lucene_solr_4_10/solr/core/src/java/org/apache/solr/cloud/LeaderInitiatedRecoveryThread.java (original)
+++ lucene/dev/branches/lucene_solr_4_10/solr/core/src/java/org/apache/solr/cloud/LeaderInitiatedRecoveryThread.java Wed Oct 8 18:41:39 2014
@@ -50,13 +50,15 @@ public class LeaderInitiatedRecoveryThre
protected String shardId;
protected ZkCoreNodeProps nodeProps;
protected int maxTries;
+ protected String leaderCoreNodeName;
public LeaderInitiatedRecoveryThread(ZkController zkController,
CoreContainer cc,
String collection,
String shardId,
ZkCoreNodeProps nodeProps,
- int maxTries)
+ int maxTries,
+ String leaderCoreNodeName)
{
super("LeaderInitiatedRecoveryThread-"+nodeProps.getCoreName());
this.zkController = zkController;
@@ -65,6 +67,7 @@ public class LeaderInitiatedRecoveryThre
this.shardId = shardId;
this.nodeProps = nodeProps;
this.maxTries = maxTries;
+ this.leaderCoreNodeName = leaderCoreNodeName;
setDaemon(true);
}
@@ -103,7 +106,7 @@ public class LeaderInitiatedRecoveryThre
recoverRequestCmd.setAction(CoreAdminAction.REQUESTRECOVERY);
recoverRequestCmd.setCoreName(coreNeedingRecovery);
- while (continueTrying && ++tries < maxTries) {
+ while (continueTrying && ++tries <= maxTries) {
if (tries > 1) {
log.warn("Asking core={} coreNodeName={} on " + recoveryUrl +
" to recover; unsuccessful after "+tries+" of "+maxTries+" attempts so far ...", coreNeedingRecovery, replicaCoreNodeName);
@@ -170,6 +173,24 @@ public class LeaderInitiatedRecoveryThre
break;
}
+ // stop trying if I'm no longer the leader
+ if (leaderCoreNodeName != null && collection != null) {
+ String leaderCoreNodeNameFromZk = null;
+ try {
+ leaderCoreNodeNameFromZk = zkController.getZkStateReader().getLeaderRetry(collection, shardId, 1000).getName();
+ } catch (Exception exc) {
+ log.error("Failed to determine if " + leaderCoreNodeName + " is still the leader for " + collection +
+ " " + shardId + " before starting leader-initiated recovery thread for " + replicaUrl + " due to: " + exc);
+ }
+ if (!leaderCoreNodeName.equals(leaderCoreNodeNameFromZk)) {
+ log.warn("Stop trying to send recovery command to downed replica core=" + coreNeedingRecovery +
+ ",coreNodeName=" + replicaCoreNodeName + " on " + replicaNodeName + " because " +
+ leaderCoreNodeName + " is no longer the leader! New leader is " + leaderCoreNodeNameFromZk);
+ continueTrying = false;
+ break;
+ }
+ }
+
// additional safeguard against the replica trying to be in the active state
// before acknowledging the leader initiated recovery command
if (continueTrying && collection != null && shardId != null) {
Modified: lucene/dev/branches/lucene_solr_4_10/solr/core/src/java/org/apache/solr/cloud/ZkController.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene_solr_4_10/solr/core/src/java/org/apache/solr/cloud/ZkController.java?rev=1630196&r1=1630195&r2=1630196&view=diff
==============================================================================
--- lucene/dev/branches/lucene_solr_4_10/solr/core/src/java/org/apache/solr/cloud/ZkController.java (original)
+++ lucene/dev/branches/lucene_solr_4_10/solr/core/src/java/org/apache/solr/cloud/ZkController.java Wed Oct 8 18:41:39 2014
@@ -30,6 +30,7 @@ import java.util.Collections;
import java.util.Enumeration;
import java.util.HashMap;
import java.util.Iterator;
+import java.util.LinkedHashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
@@ -74,6 +75,8 @@ import org.apache.zookeeper.KeeperExcept
import org.apache.zookeeper.KeeperException.NoNodeException;
import org.apache.zookeeper.KeeperException.SessionExpiredException;
import org.apache.zookeeper.data.Stat;
+import org.noggit.JSONParser;
+import org.noggit.ObjectBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -1884,16 +1887,19 @@ public final class ZkController {
}
public String getLeaderInitiatedRecoveryState(String collection, String shardId, String coreNodeName) {
-
+ Map<String,Object> stateObj = getLeaderInitiatedRecoveryStateObject(collection, shardId, coreNodeName);
+ return (stateObj != null) ? (String)stateObj.get("state") : null;
+ }
+
+ public Map<String,Object> getLeaderInitiatedRecoveryStateObject(String collection, String shardId, String coreNodeName) {
+
if (collection == null || shardId == null || coreNodeName == null)
return null; // if we don't have complete data about a core in cloud mode, return null
String znodePath = getLeaderInitiatedRecoveryZnodePath(collection, shardId, coreNodeName);
- String state = null;
+ byte[] stateData = null;
try {
- byte[] data = zkClient.getData(znodePath, null, new Stat(), false);
- if (data != null && data.length > 0)
- state = new String(data, "UTF-8");
+ stateData = zkClient.getData(znodePath, null, new Stat(), false);
} catch (NoNodeException ignoreMe) {
// safe to ignore as this znode will only exist if the leader initiated recovery
} catch (ConnectionLossException cle) {
@@ -1904,8 +1910,6 @@ public final class ZkController {
// sort of safe to ignore ??? Usually these are seen when the core is going down
// or there are bigger issues to deal with than reading this znode
log.warn("Unable to read "+znodePath+" due to: "+see);
- } catch (UnsupportedEncodingException e) {
- throw new Error("JVM Does not seem to support UTF-8", e);
} catch (Exception exc) {
log.error("Failed to read data from znode "+znodePath+" due to: "+exc);
if (exc instanceof SolrException) {
@@ -1915,9 +1919,14 @@ public final class ZkController {
"Failed to read data from znodePath: "+znodePath, exc);
}
}
- return state;
+
+ Map<String,Object> stateObj = null;
+ if (stateData != null && stateData.length > 0)
+ stateObj = (Map<String, Object>) ZkStateReader.fromJSON(stateData);
+
+ return stateObj;
}
-
+
private void updateLeaderInitiatedRecoveryState(String collection, String shardId, String coreNodeName, String state) {
if (collection == null || shardId == null || coreNodeName == null) {
log.warn("Cannot set leader-initiated recovery state znode to "+state+" using: collection="+collection+
@@ -1926,7 +1935,7 @@ public final class ZkController {
}
String znodePath = getLeaderInitiatedRecoveryZnodePath(collection, shardId, coreNodeName);
-
+
if (ZkStateReader.ACTIVE.equals(state)) {
// since we're marking it active, we don't need this znode anymore, so delete instead of update
try {
@@ -1936,14 +1945,22 @@ public final class ZkController {
}
return;
}
-
- byte[] znodeData = null;
+
+ Map<String,Object> stateObj = null;
try {
- znodeData = state.getBytes("UTF-8");
- } catch (UnsupportedEncodingException e) {
- throw new Error("JVM Does not seem to support UTF-8", e);
+ stateObj = getLeaderInitiatedRecoveryStateObject(collection, shardId, coreNodeName);
+ } catch (Exception exc) {
+ log.warn(exc.getMessage(), exc);
}
+ if (stateObj == null)
+ stateObj = new LinkedHashMap<String,Object>();
+ stateObj.put("state", state);
+ // only update the createdBy value if its not set
+ if (stateObj.get("createdByNodeName") == null)
+ stateObj.put("createdByNodeName", String.valueOf(this.nodeName));
+
+ byte[] znodeData = ZkStateReader.toJSON(stateObj);
boolean retryOnConnLoss = true; // be a little more robust when trying to write data
try {
if (zkClient.exists(znodePath, retryOnConnLoss)) {
@@ -1956,8 +1973,8 @@ public final class ZkController {
if (exc instanceof SolrException) {
throw (SolrException)exc;
} else {
- throw new SolrException(ErrorCode.SERVER_ERROR,
- "Failed to update data to "+state+" for znode: "+znodePath, exc);
+ throw new SolrException(ErrorCode.SERVER_ERROR,
+ "Failed to update data to "+state+" for znode: "+znodePath, exc);
}
}
}
Modified: lucene/dev/branches/lucene_solr_4_10/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene_solr_4_10/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java?rev=1630196&r1=1630195&r2=1630196&view=diff
==============================================================================
--- lucene/dev/branches/lucene_solr_4_10/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java (original)
+++ lucene/dev/branches/lucene_solr_4_10/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java Wed Oct 8 18:41:39 2014
@@ -593,7 +593,9 @@ public class DistributedUpdateProcessor
String fromCollection = req.getParams().get(DISTRIB_FROM_COLLECTION); // is it because of a routing rule?
if (fromCollection == null) {
log.error("Request says it is coming from leader, but we are the leader: " + req.getParamString());
- throw new SolrException(ErrorCode.SERVICE_UNAVAILABLE, "Request says it is coming from leader, but we are the leader");
+ SolrException solrExc = new SolrException(ErrorCode.SERVICE_UNAVAILABLE, "Request says it is coming from leader, but we are the leader");
+ solrExc.setMetadata("cause", "LeaderChanged");
+ throw solrExc;
}
}
}
@@ -803,57 +805,91 @@ public class DistributedUpdateProcessor
DistribPhase.parseParam(error.req.uReq.getParams().get(DISTRIB_UPDATE_PARAM));
if (phase != DistribPhase.FROMLEADER)
continue; // don't have non-leaders try to recovery other nodes
-
- final String replicaUrl = error.req.node.getUrl();
+
+ final String replicaUrl = error.req.node.getUrl();
+
+ // if the remote replica failed the request because of leader change (SOLR-6511), then fail the request
+ String cause = (error.e instanceof SolrException) ? ((SolrException)error.e).getMetadata("cause") : null;
+ if ("LeaderChanged".equals(cause)) {
+ // let's just fail this request and let the client retry? or just call processAdd again?
+ log.error("On "+cloudDesc.getCoreNodeName()+", replica "+replicaUrl+
+ " now thinks it is the leader! Failing the request to let the client retry! "+error.e);
+ rsp.setException(error.e);
+ break;
+ }
int maxTries = 1;
boolean sendRecoveryCommand = true;
String collection = null;
String shardId = null;
-
+
if (error.req.node instanceof StdNode) {
StdNode stdNode = (StdNode)error.req.node;
collection = stdNode.getCollection();
shardId = stdNode.getShardId();
+
+ // before we go setting other replicas to down, make sure we're still the leader!
+ String leaderCoreNodeName = null;
try {
- // if false, then the node is probably not "live" anymore
- sendRecoveryCommand =
- zkController.ensureReplicaInLeaderInitiatedRecovery(collection,
- shardId,
- replicaUrl,
- stdNode.getNodeProps(),
- false);
-
- // we want to try more than once, ~10 minutes
- if (sendRecoveryCommand) {
- maxTries = 120;
- } // else the node is no longer "live" so no need to send any recovery command
-
- } catch (Exception e) {
- log.error("Leader failed to set replica "+
- error.req.node.getUrl()+" state to DOWN due to: "+e, e);
+ leaderCoreNodeName = zkController.getZkStateReader().getLeaderRetry(collection, shardId).getName();
+ } catch (Exception exc) {
+ log.error("Failed to determine if " + cloudDesc.getCoreNodeName() + " is still the leader for " + collection +
+ " " + shardId + " before putting " + replicaUrl + " into leader-initiated recovery due to: " + exc);
+ }
+
+ if (cloudDesc.getCoreNodeName().equals(leaderCoreNodeName)) {
+ try {
+ // if false, then the node is probably not "live" anymore
+ sendRecoveryCommand =
+ zkController.ensureReplicaInLeaderInitiatedRecovery(collection,
+ shardId,
+ replicaUrl,
+ stdNode.getNodeProps(),
+ false);
+
+ // we want to try more than once, ~10 minutes
+ if (sendRecoveryCommand) {
+ maxTries = 120;
+ } // else the node is no longer "live" so no need to send any recovery command
+ } catch (Exception exc) {
+ Throwable setLirZnodeFailedCause = SolrException.getRootCause(exc);
+ log.error("Leader failed to set replica " +
+ error.req.node.getUrl() + " state to DOWN due to: " + setLirZnodeFailedCause, setLirZnodeFailedCause);
+ if (setLirZnodeFailedCause instanceof KeeperException.SessionExpiredException ||
+ setLirZnodeFailedCause instanceof KeeperException.ConnectionLossException) {
+ // our session is expired, which means our state is suspect, so don't go
+ // putting other replicas in recovery (see SOLR-6511)
+ sendRecoveryCommand = false;
+ } // else will go ahead and try to send the recovery command once after this error
+ }
+ } else {
+ // not the leader anymore maybe?
+ sendRecoveryCommand = false;
+ log.warn("Core "+cloudDesc.getCoreNodeName()+" is no longer the leader for "+collection+" "+
+ shardId+", no request recovery command will be sent!");
}
} // else not a StdNode, recovery command still gets sent once
if (!sendRecoveryCommand)
continue; // the replica is already in recovery handling or is not live
-
- Throwable rootCause = SolrException.getRootCause(error.e);
- log.error("Setting up to try to start recovery on replica "+replicaUrl+" after: "+rootCause);
-
+
+ Throwable rootCause = SolrException.getRootCause(error.e);
+ log.error("Setting up to try to start recovery on replica " + replicaUrl + " after: " + rootCause);
+
// try to send the recovery command to the downed replica in a background thread
- CoreContainer coreContainer = req.getCore().getCoreDescriptor().getCoreContainer();
- LeaderInitiatedRecoveryThread lirThread =
+ CoreContainer coreContainer = req.getCore().getCoreDescriptor().getCoreContainer();
+ LeaderInitiatedRecoveryThread lirThread =
new LeaderInitiatedRecoveryThread(zkController,
- coreContainer,
- collection,
- shardId,
- error.req.node.getNodeProps(),
- maxTries);
+ coreContainer,
+ collection,
+ shardId,
+ error.req.node.getNodeProps(),
+ maxTries,
+ cloudDesc.getCoreNodeName()); // core node name of current leader
ExecutorService executor = coreContainer.getUpdateShardHandler().getUpdateExecutor();
- executor.execute(lirThread);
+ executor.execute(lirThread);
}
-
+
if (replicationTracker != null) {
rsp.getResponseHeader().add(UpdateRequest.REPFACT, replicationTracker.getAchievedRf());
rsp.getResponseHeader().add(UpdateRequest.MIN_REPFACT, replicationTracker.minRf);
Modified: lucene/dev/branches/lucene_solr_4_10/solr/core/src/test/org/apache/solr/cloud/HttpPartitionTest.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene_solr_4_10/solr/core/src/test/org/apache/solr/cloud/HttpPartitionTest.java?rev=1630196&r1=1630195&r2=1630196&view=diff
==============================================================================
--- lucene/dev/branches/lucene_solr_4_10/solr/core/src/test/org/apache/solr/cloud/HttpPartitionTest.java (original)
+++ lucene/dev/branches/lucene_solr_4_10/solr/core/src/test/org/apache/solr/cloud/HttpPartitionTest.java Wed Oct 8 18:41:39 2014
@@ -18,17 +18,15 @@ package org.apache.solr.cloud;
*/
import java.io.File;
-import java.net.ServerSocket;
-import java.net.URI;
-import java.net.URL;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.concurrent.TimeUnit;
-import org.apache.lucene.util.LuceneTestCase.AwaitsFix;
import org.apache.lucene.util.LuceneTestCase.Slow;
import org.apache.solr.JSONTestUtil;
import org.apache.solr.SolrTestCaseJ4.SuppressSSL;
@@ -36,6 +34,8 @@ import org.apache.solr.client.solrj.embe
import org.apache.solr.client.solrj.impl.HttpSolrServer;
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
import org.apache.solr.client.solrj.request.QueryRequest;
+import org.apache.solr.client.solrj.request.UpdateRequest;
+import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrInputDocument;
import org.apache.solr.common.cloud.ClusterState;
import org.apache.solr.common.cloud.Replica;
@@ -52,26 +52,24 @@ import org.slf4j.LoggerFactory;
* Simulates HTTP partitions between a leader and replica but the replica does
* not lose its ZooKeeper connection.
*/
+
@Slow
@SuppressSSL(bugUrl = "https://issues.apache.org/jira/browse/SOLR-5776")
-@AwaitsFix(bugUrl = "https://issues.apache.org/jira/browse/SOLR-6241")
public class HttpPartitionTest extends AbstractFullDistribZkTestBase {
- private static final transient Logger log =
+ protected static final transient Logger log =
LoggerFactory.getLogger(HttpPartitionTest.class);
// To prevent the test assertions firing too fast before cluster state
// recognizes (and propagates) partitions
- private static final long sleepMsBeforeHealPartition = 2000L;
-
- private static final int maxWaitSecsToSeeAllActive = 30;
-
- private Map<URI,SocketProxy> proxies = new HashMap<URI,SocketProxy>();
-
+ protected static final long sleepMsBeforeHealPartition = 2000L;
+
+ protected static final int maxWaitSecsToSeeAllActive = 30;
+
public HttpPartitionTest() {
super();
sliceCount = 2;
- shardCount = 2;
+ shardCount = 3;
}
@Before
@@ -84,73 +82,45 @@ public class HttpPartitionTest extends A
@Override
@After
public void tearDown() throws Exception {
- System.clearProperty("numShards");
-
try {
super.tearDown();
} catch (Exception exc) {}
resetExceptionIgnores();
-
- // close socket proxies after super.tearDown
- if (!proxies.isEmpty()) {
- for (SocketProxy proxy : proxies.values()) {
- proxy.close();
- }
- }
}
/**
- * Overrides the parent implementation so that we can configure a socket proxy
- * to sit infront of each Jetty server, which gives us the ability to simulate
- * network partitions without having to fuss with IPTables (which is not very
- * cross platform friendly).
+ * Overrides the parent implementation to install a SocketProxy in-front of the Jetty server.
*/
@Override
public JettySolrRunner createJetty(File solrHome, String dataDir,
String shardList, String solrConfigOverride, String schemaOverride)
- throws Exception {
-
- JettySolrRunner jetty = new JettySolrRunner(solrHome.getPath(), context,
- 0, solrConfigOverride, schemaOverride, false,
- getExtraServlets(), sslConfig, getExtraRequestFilters());
- jetty.setShards(shardList);
- jetty.setDataDir(getDataDir(dataDir));
-
- // setup to proxy Http requests to this server unless it is the control
- // server
- int proxyPort = getNextAvailablePort();
- jetty.setProxyPort(proxyPort);
- jetty.start();
-
- // create a socket proxy for the jetty server ...
- SocketProxy proxy = new SocketProxy(proxyPort, jetty.getBaseUrl().toURI());
- proxies.put(proxy.getUrl(), proxy);
-
- return jetty;
+ throws Exception
+ {
+ return createProxiedJetty(solrHome, dataDir, shardList, solrConfigOverride, schemaOverride);
}
-
- protected int getNextAvailablePort() throws Exception {
- int port = -1;
- try (ServerSocket s = new ServerSocket(0)) {
- port = s.getLocalPort();
- }
- return port;
- }
-
+
@Override
public void doTest() throws Exception {
waitForThingsToLevelOut(30000);
-
+
// test a 1x2 collection
testRf2();
+ waitForThingsToLevelOut(30000);
+
// now do similar for a 1x3 collection while taking 2 replicas on-and-off
// each time
testRf3();
- // kill a leader and make sure recovery occurs as expected
- testRf3WithLeaderFailover();
+ waitForThingsToLevelOut(30000);
+
+ // have the leader lose its Zk session temporarily
+ testLeaderZkSessionLoss();
+
+ waitForThingsToLevelOut(30000);
+
+ log.info("HttpParitionTest succeeded ... shutting down now!");
}
protected void testRf2() throws Exception {
@@ -162,7 +132,7 @@ public class HttpPartitionTest extends A
sendDoc(1);
Replica notLeader =
- ensureAllReplicasAreActive(testCollectionName, 2, maxWaitSecsToSeeAllActive).get(0);
+ ensureAllReplicasAreActive(testCollectionName, "shard1", 1, 2, maxWaitSecsToSeeAllActive).get(0);
// ok, now introduce a network partition between the leader and the replica
SocketProxy proxy = getProxyForReplica(notLeader);
@@ -184,7 +154,7 @@ public class HttpPartitionTest extends A
proxy.reopen();
List<Replica> notLeaders =
- ensureAllReplicasAreActive(testCollectionName, 2, maxWaitSecsToSeeAllActive);
+ ensureAllReplicasAreActive(testCollectionName, "shard1", 1, 2, maxWaitSecsToSeeAllActive);
sendDoc(3);
@@ -216,11 +186,13 @@ public class HttpPartitionTest extends A
proxy.reopen();
}
- notLeaders = ensureAllReplicasAreActive(testCollectionName, 2, maxWaitSecsToSeeAllActive);
+ notLeaders = ensureAllReplicasAreActive(testCollectionName, "shard1", 1, 2, maxWaitSecsToSeeAllActive);
// verify all docs received
assertDocsExistInAllReplicas(notLeaders, testCollectionName, 1, numDocs + 3);
+ log.info("testRf2 succeeded ... deleting the "+testCollectionName+" collection");
+
// try to clean up
try {
CollectionAdminRequest req = new CollectionAdminRequest.Delete();
@@ -241,14 +213,12 @@ public class HttpPartitionTest extends A
sendDoc(1);
List<Replica> notLeaders =
- ensureAllReplicasAreActive(testCollectionName, 3, maxWaitSecsToSeeAllActive);
+ ensureAllReplicasAreActive(testCollectionName, "shard1", 1, 3, maxWaitSecsToSeeAllActive);
assertTrue("Expected 2 replicas for collection " + testCollectionName
+ " but found " + notLeaders.size() + "; clusterState: "
- + printClusterStateInfo(),
+ + printClusterStateInfo(testCollectionName),
notLeaders.size() == 2);
-
- sendDoc(1);
-
+
// ok, now introduce a network partition between the leader and the replica
SocketProxy proxy0 = getProxyForReplica(notLeaders.get(0));
@@ -271,11 +241,14 @@ public class HttpPartitionTest extends A
proxy1.reopen();
// sent 4 docs in so far, verify they are on the leader and replica
- notLeaders = ensureAllReplicasAreActive(testCollectionName, 3, maxWaitSecsToSeeAllActive);
+ notLeaders = ensureAllReplicasAreActive(testCollectionName, "shard1", 1, 3, maxWaitSecsToSeeAllActive);
sendDoc(4);
assertDocsExistInAllReplicas(notLeaders, testCollectionName, 1, 4);
+
+ log.info("testRf3 succeeded ... deleting the "+testCollectionName+" collection");
+
// try to clean up
try {
CollectionAdminRequest req = new CollectionAdminRequest.Delete();
@@ -286,127 +259,103 @@ public class HttpPartitionTest extends A
log.warn("Could not delete collection {} after test completed", testCollectionName);
}
}
-
- protected void testRf3WithLeaderFailover() throws Exception {
- // now let's create a partition in one of the replicas and outright
- // kill the leader ... see what happens
- // create a collection that has 1 shard but 3 replicas
- String testCollectionName = "c8n_1x3_lf"; // _lf is leader fails
- createCollection(testCollectionName, 1, 3, 1);
+
+ // test inspired by SOLR-6511
+ protected void testLeaderZkSessionLoss() throws Exception {
+
+ String testCollectionName = "c8n_1x2_leader_session_loss";
+ createCollection(testCollectionName, 1, 2, 1);
cloudClient.setDefaultCollection(testCollectionName);
-
- sendDoc(1);
-
- List<Replica> notLeaders =
- ensureAllReplicasAreActive(testCollectionName, 3, maxWaitSecsToSeeAllActive);
- assertTrue("Expected 2 replicas for collection " + testCollectionName
- + " but found " + notLeaders.size() + "; clusterState: "
- + printClusterStateInfo(),
- notLeaders.size() == 2);
-
+
sendDoc(1);
-
- // ok, now introduce a network partition between the leader and the replica
- SocketProxy proxy0 = null;
- proxy0 = getProxyForReplica(notLeaders.get(0));
-
- proxy0.close();
-
- // indexing during a partition
- sendDoc(2);
-
- Thread.sleep(sleepMsBeforeHealPartition);
-
- proxy0.reopen();
-
- SocketProxy proxy1 = getProxyForReplica(notLeaders.get(1));
-
- proxy1.close();
-
- sendDoc(3);
-
- Thread.sleep(sleepMsBeforeHealPartition);
- proxy1.reopen();
-
- // sent 4 docs in so far, verify they are on the leader and replica
- notLeaders = ensureAllReplicasAreActive(testCollectionName, 3, maxWaitSecsToSeeAllActive);
-
- sendDoc(4);
-
- assertDocsExistInAllReplicas(notLeaders, testCollectionName, 1, 4);
-
- Replica leader =
+
+ List<Replica> notLeaders =
+ ensureAllReplicasAreActive(testCollectionName, "shard1", 1, 2, maxWaitSecsToSeeAllActive);
+ assertTrue("Expected 1 replicas for collection " + testCollectionName
+ + " but found " + notLeaders.size() + "; clusterState: "
+ + printClusterStateInfo(testCollectionName),
+ notLeaders.size() == 1);
+
+ Replica leader =
cloudClient.getZkStateReader().getLeaderRetry(testCollectionName, "shard1");
String leaderNode = leader.getNodeName();
assertNotNull("Could not find leader for shard1 of "+
- testCollectionName+"; clusterState: "+printClusterStateInfo(), leader);
+ testCollectionName+"; clusterState: "+printClusterStateInfo(testCollectionName), leader);
JettySolrRunner leaderJetty = getJettyOnPort(getReplicaPort(leader));
-
- // since maxShardsPerNode is 1, we're safe to kill the leader
- notLeaders = ensureAllReplicasAreActive(testCollectionName, 3, maxWaitSecsToSeeAllActive);
- proxy0 = getProxyForReplica(notLeaders.get(0));
- proxy0.close();
-
- // indexing during a partition
- // doc should be on leader and 1 replica
- sendDoc(5);
-
- Thread.sleep(sleepMsBeforeHealPartition);
-
- String shouldNotBeNewLeaderNode = notLeaders.get(0).getNodeName();
- // kill the leader
- leaderJetty.stop();
-
- if (leaderJetty.isRunning())
- fail("Failed to stop the leader on "+leaderNode);
-
- SocketProxy oldLeaderProxy = getProxyForReplica(leader);
- if (oldLeaderProxy != null) {
- oldLeaderProxy.close();
- } else {
- log.warn("No SocketProxy found for old leader node "+leaderNode);
- }
+ HttpSolrServer leaderSolr = getHttpSolrServer(leader, testCollectionName);
+ SolrInputDocument doc = new SolrInputDocument();
+ doc.addField(id, String.valueOf(2));
+ doc.addField("a_t", "hello" + 2);
- Thread.sleep(10000); // give chance for new leader to be elected.
-
- Replica newLeader =
- cloudClient.getZkStateReader().getLeaderRetry(testCollectionName, "shard1", 60000);
-
- assertNotNull("No new leader was elected after 60 seconds; clusterState: "+
- printClusterStateInfo(),newLeader);
-
- assertTrue("Expected node "+shouldNotBeNewLeaderNode+
- " to NOT be the new leader b/c it was out-of-sync with the old leader! ClusterState: "+
- printClusterStateInfo(),
- !shouldNotBeNewLeaderNode.equals(newLeader.getNodeName()));
-
- proxy0.reopen();
-
+ // cause leader migration by expiring the current leader's zk session
+ chaosMonkey.expireSession(leaderJetty);
+
+ String expectedNewLeaderCoreNodeName = notLeaders.get(0).getName();
long timeout = System.nanoTime() + TimeUnit.NANOSECONDS.convert(60, TimeUnit.SECONDS);
while (System.nanoTime() < timeout) {
- cloudClient.getZkStateReader().updateClusterState(true);
+ String currentLeaderName = null;
+ try {
+ Replica currentLeader =
+ cloudClient.getZkStateReader().getLeaderRetry(testCollectionName, "shard1");
+ currentLeaderName = currentLeader.getName();
+ } catch (Exception exc) {}
+
+ if (expectedNewLeaderCoreNodeName.equals(currentLeaderName))
+ break; // new leader was elected after zk session expiration
- List<Replica> activeReps = getActiveOrRecoveringReplicas(testCollectionName, "shard1");
- if (activeReps.size() == 2) break;
- Thread.sleep(1000);
+ Thread.sleep(500);
}
- List<Replica> activeReps = getActiveOrRecoveringReplicas(testCollectionName, "shard1");
- assertTrue("Expected 2 of 3 replicas to be active but only found "+
- activeReps.size()+"; "+activeReps+"; clusterState: "+printClusterStateInfo(),
- activeReps.size() == 2);
+ Replica currentLeader =
+ cloudClient.getZkStateReader().getLeaderRetry(testCollectionName, "shard1");
+ assertEquals(expectedNewLeaderCoreNodeName, currentLeader.getName());
- sendDoc(6);
+ // TODO: This test logic seems to be timing dependent and fails on Jenkins
+ // need to come up with a better approach
+ log.info("Sending doc 2 to old leader "+leader.getName());
+ try {
+ leaderSolr.add(doc);
+ leaderSolr.shutdown();
- assertDocsExistInAllReplicas(activeReps, testCollectionName, 1, 6);
- }
-
- protected String printClusterStateInfo() throws Exception {
- cloudClient.getZkStateReader().updateClusterState(true);
- return String.valueOf(cloudClient.getZkStateReader().getClusterState());
+ // if the add worked, then the doc must exist on the new leader
+ HttpSolrServer newLeaderSolr = getHttpSolrServer(currentLeader, testCollectionName);
+ try {
+ assertDocExists(newLeaderSolr, testCollectionName, "2");
+ } finally {
+ newLeaderSolr.shutdown();
+ }
+
+ } catch (SolrException exc) {
+ // this is ok provided the doc doesn't exist on the current leader
+ leaderSolr = getHttpSolrServer(currentLeader, testCollectionName);
+ try {
+ leaderSolr.add(doc); // this should work
+ } finally {
+ leaderSolr.shutdown();
+ }
+ }
+
+ List<Replica> participatingReplicas = getActiveOrRecoveringReplicas(testCollectionName, "shard1");
+ Set<String> replicasToCheck = new HashSet<>();
+ for (Replica stillUp : participatingReplicas)
+ replicasToCheck.add(stillUp.getName());
+ waitToSeeReplicasActive(testCollectionName, "shard1", replicasToCheck, 20);
+ assertDocsExistInAllReplicas(participatingReplicas, testCollectionName, 1, 2);
+
+ log.info("testLeaderZkSessionLoss succeeded ... deleting the "+testCollectionName+" collection");
+
+ // try to clean up
+ try {
+ CollectionAdminRequest req = new CollectionAdminRequest.Delete();
+ req.setCollectionName(testCollectionName);
+ req.process(cloudClient);
+ } catch (Exception e) {
+ // don't fail the test
+ log.warn("Could not delete collection {} after test completed", testCollectionName);
+ }
}
-
+
protected List<Replica> getActiveOrRecoveringReplicas(String testCollectionName, String shardId) throws Exception {
Map<String,Replica> activeReplicas = new HashMap<String,Replica>();
ZkStateReader zkr = cloudClient.getZkStateReader();
@@ -426,21 +375,7 @@ public class HttpPartitionTest extends A
replicas.addAll(activeReplicas.values());
return replicas;
}
-
- protected SocketProxy getProxyForReplica(Replica replica) throws Exception {
- String replicaBaseUrl = replica.getStr(ZkStateReader.BASE_URL_PROP);
- assertNotNull(replicaBaseUrl);
- URL baseUrl = new URL(replicaBaseUrl);
-
- SocketProxy proxy = proxies.get(baseUrl.toURI());
- if (proxy == null && !baseUrl.toExternalForm().endsWith("/")) {
- baseUrl = new URL(baseUrl.toExternalForm() + "/");
- proxy = proxies.get(baseUrl.toURI());
- }
- assertNotNull("No proxy found for " + baseUrl + "!", proxy);
- return proxy;
- }
-
+
protected void assertDocsExistInAllReplicas(List<Replica> notLeaders,
String testCollectionName, int firstDocId, int lastDocId)
throws Exception {
@@ -478,112 +413,84 @@ public class HttpPartitionTest extends A
}
protected void sendDoc(int docId) throws Exception {
+ UpdateRequest up = new UpdateRequest();
+ up.setParam(UpdateRequest.MIN_REPFACT, String.valueOf(2));
SolrInputDocument doc = new SolrInputDocument();
doc.addField(id, String.valueOf(docId));
doc.addField("a_t", "hello" + docId);
- cloudClient.add(doc);
- }
-
- protected List<Replica> ensureAllReplicasAreActive(String testCollectionName, int rf, int maxWaitSecs) throws Exception {
- long startMs = System.currentTimeMillis();
-
- Map<String,Replica> notLeaders = new HashMap<String,Replica>();
-
- ZkStateReader zkr = cloudClient.getZkStateReader();
- ClusterState cs = zkr.getClusterState();
- Collection<Slice> slices = cs.getActiveSlices(testCollectionName);
- assertTrue(slices.size() == 1); // shards == 1
- boolean allReplicasUp = false;
- long waitMs = 0L;
- long maxWaitMs = maxWaitSecs * 1000L;
- Replica leader = null;
- while (waitMs < maxWaitMs && !allReplicasUp) {
- cs = zkr.getClusterState();
- assertNotNull(cs);
- for (Slice shard : cs.getActiveSlices(testCollectionName)) {
- allReplicasUp = true; // assume true
- Collection<Replica> replicas = shard.getReplicas();
- assertTrue(replicas.size() == rf);
- leader = shard.getLeader();
- assertNotNull(leader);
-
- // ensure all replicas are "active" and identify the non-leader replica
- for (Replica replica : replicas) {
- String replicaState = replica.getStr(ZkStateReader.STATE_PROP);
- if (!ZkStateReader.ACTIVE.equals(replicaState)) {
- log.info("Replica " + replica.getName() + " is currently " + replicaState);
- allReplicasUp = false;
- }
-
- if (!leader.equals(replica))
- notLeaders.put(replica.getName(), replica);
- }
-
- if (!allReplicasUp) {
- try {
- Thread.sleep(500L);
- } catch (Exception ignoreMe) {}
- waitMs += 500L;
- }
- }
- } // end while
-
- if (!allReplicasUp)
- fail("Didn't see all replicas come up within " + maxWaitMs +
- " ms! ClusterState: " + printClusterStateInfo());
-
- if (notLeaders.isEmpty())
- fail("Didn't isolate any replicas that are not the leader! ClusterState: " +
- printClusterStateInfo());
-
- long diffMs = (System.currentTimeMillis() - startMs);
- log.info("Took " + diffMs + " ms to see all replicas become active.");
-
- List<Replica> replicas = new ArrayList<Replica>();
- replicas.addAll(notLeaders.values());
- return replicas;
+ up.add(doc);
+ int minAchievedRf =
+ cloudClient.getMinAchievedReplicationFactor(cloudClient.getDefaultCollection(), cloudClient.request(up));
}
/**
* Query the real-time get handler for a specific doc by ID to verify it
- * exists in the provided server.
+ * exists in the provided server, using distrib=false so it doesn't route to another replica.
*/
@SuppressWarnings("rawtypes")
protected void assertDocExists(HttpSolrServer solr, String coll, String docId) throws Exception {
- QueryRequest qr = new QueryRequest(params("qt", "/get", "id", docId));
+ QueryRequest qr = new QueryRequest(params("qt", "/get", "id", docId, "distrib", "false"));
NamedList rsp = solr.request(qr);
- String match =
- JSONTestUtil.matchObj("/id", rsp.get("doc"), new Integer(docId));
+ String match = JSONTestUtil.matchObj("/id", rsp.get("doc"), new Integer(docId));
assertTrue("Doc with id=" + docId + " not found in " + solr.getBaseURL()
- + " due to: " + match, match == null);
+ + " due to: " + match + "; rsp="+rsp, match == null);
}
-
- protected JettySolrRunner getJettyOnPort(int port) {
- JettySolrRunner theJetty = null;
- for (JettySolrRunner jetty : jettys) {
- if (port == jetty.getLocalPort()) {
- theJetty = jetty;
- break;
- }
- }
-
- if (theJetty == null) {
- if (controlJetty.getLocalPort() == port) {
- theJetty = controlJetty;
- }
- }
-
- if (theJetty == null)
- fail("Not able to find JettySolrRunner for port: "+port);
-
- return theJetty;
- }
-
+
protected int getReplicaPort(Replica replica) {
String replicaNode = replica.getNodeName();
String tmp = replicaNode.substring(replicaNode.indexOf(':')+1);
if (tmp.indexOf('_') != -1)
tmp = tmp.substring(0,tmp.indexOf('_'));
return Integer.parseInt(tmp);
- }
+ }
+
+ protected void waitToSeeReplicasActive(String testCollectionName, String shardId, Set<String> replicasToCheck, int maxWaitSecs) throws Exception {
+ long startMs = System.currentTimeMillis();
+
+ ZkStateReader zkr = cloudClient.getZkStateReader();
+ zkr.updateClusterState(true); // force the state to be fresh
+
+ ClusterState cs = zkr.getClusterState();
+ Collection<Slice> slices = cs.getActiveSlices(testCollectionName);
+ boolean allReplicasUp = false;
+ long waitMs = 0L;
+ long maxWaitMs = maxWaitSecs * 1000L;
+ while (waitMs < maxWaitMs && !allReplicasUp) {
+ // refresh state every 2 secs
+ if (waitMs % 2000 == 0)
+ cloudClient.getZkStateReader().updateClusterState(true);
+
+ cs = cloudClient.getZkStateReader().getClusterState();
+ assertNotNull(cs);
+ Slice shard = cs.getSlice(testCollectionName, shardId);
+ assertNotNull("No Slice for "+shardId, shard);
+ allReplicasUp = true; // assume true
+
+ // wait to see all replicas are "active"
+ for (Replica replica : shard.getReplicas()) {
+ if (!replicasToCheck.contains(replica.getName()))
+ continue;
+
+ String replicaState = replica.getStr(ZkStateReader.STATE_PROP);
+ if (!ZkStateReader.ACTIVE.equals(replicaState)) {
+ log.info("Replica " + replica.getName() + " is currently " + replicaState);
+ allReplicasUp = false;
+ }
+ }
+
+ if (!allReplicasUp) {
+ try {
+ Thread.sleep(1000L);
+ } catch (Exception ignoreMe) {}
+ waitMs += 1000L;
+ }
+ } // end while
+
+ if (!allReplicasUp)
+ fail("Didn't see replicas "+ replicasToCheck +
+ " come up within " + maxWaitMs + " ms! ClusterState: " + printClusterStateInfo(testCollectionName));
+
+ long diffMs = (System.currentTimeMillis() - startMs);
+ log.info("Took " + diffMs + " ms to see replicas ["+replicasToCheck+"] become active.");
+ }
}
Added: lucene/dev/branches/lucene_solr_4_10/solr/core/src/test/org/apache/solr/cloud/LeaderFailoverAfterPartitionTest.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene_solr_4_10/solr/core/src/test/org/apache/solr/cloud/LeaderFailoverAfterPartitionTest.java?rev=1630196&view=auto
==============================================================================
--- lucene/dev/branches/lucene_solr_4_10/solr/core/src/test/org/apache/solr/cloud/LeaderFailoverAfterPartitionTest.java (added)
+++ lucene/dev/branches/lucene_solr_4_10/solr/core/src/test/org/apache/solr/cloud/LeaderFailoverAfterPartitionTest.java Wed Oct 8 18:41:39 2014
@@ -0,0 +1,182 @@
+package org.apache.solr.cloud;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import org.apache.lucene.util.LuceneTestCase.Slow;
+import org.apache.solr.SolrTestCaseJ4.SuppressSSL;
+import org.apache.solr.client.solrj.embedded.JettySolrRunner;
+import org.apache.solr.client.solrj.request.CollectionAdminRequest;
+import org.apache.solr.common.cloud.Replica;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Tests leader-initiated recovery scenarios after a leader node fails
+ * and one of the replicas is out-of-sync.
+ */
+@Slow
+@SuppressSSL(bugUrl = "https://issues.apache.org/jira/browse/SOLR-5776")
+public class LeaderFailoverAfterPartitionTest extends HttpPartitionTest {
+
+ public LeaderFailoverAfterPartitionTest() {
+ super();
+ }
+
+
+ @Override
+ public void doTest() throws Exception {
+ waitForThingsToLevelOut(30000);
+
+ // kill a leader and make sure recovery occurs as expected
+ testRf3WithLeaderFailover();
+ }
+
+ protected void testRf3WithLeaderFailover() throws Exception {
+ // now let's create a partition in one of the replicas and outright
+ // kill the leader ... see what happens
+ // create a collection that has 1 shard but 3 replicas
+ String testCollectionName = "c8n_1x3_lf"; // _lf is leader fails
+ createCollection(testCollectionName, 1, 3, 1);
+ cloudClient.setDefaultCollection(testCollectionName);
+
+ sendDoc(1);
+
+ List<Replica> notLeaders =
+ ensureAllReplicasAreActive(testCollectionName, "shard1", 1, 3, maxWaitSecsToSeeAllActive);
+ assertTrue("Expected 2 replicas for collection " + testCollectionName
+ + " but found " + notLeaders.size() + "; clusterState: "
+ + printClusterStateInfo(testCollectionName),
+ notLeaders.size() == 2);
+
+ // ok, now introduce a network partition between the leader and the replica
+ SocketProxy proxy0 = null;
+ proxy0 = getProxyForReplica(notLeaders.get(0));
+
+ proxy0.close();
+
+ // indexing during a partition
+ sendDoc(2);
+
+ Thread.sleep(sleepMsBeforeHealPartition);
+
+ proxy0.reopen();
+
+ SocketProxy proxy1 = getProxyForReplica(notLeaders.get(1));
+
+ proxy1.close();
+
+ sendDoc(3);
+
+ Thread.sleep(sleepMsBeforeHealPartition);
+ proxy1.reopen();
+
+ // sent 4 docs in so far, verify they are on the leader and replica
+ notLeaders = ensureAllReplicasAreActive(testCollectionName, "shard1", 1, 3, maxWaitSecsToSeeAllActive);
+
+ sendDoc(4);
+
+ assertDocsExistInAllReplicas(notLeaders, testCollectionName, 1, 4);
+
+ Replica leader =
+ cloudClient.getZkStateReader().getLeaderRetry(testCollectionName, "shard1");
+ String leaderNode = leader.getNodeName();
+ assertNotNull("Could not find leader for shard1 of "+
+ testCollectionName+"; clusterState: "+printClusterStateInfo(testCollectionName), leader);
+ JettySolrRunner leaderJetty = getJettyOnPort(getReplicaPort(leader));
+
+ // since maxShardsPerNode is 1, we're safe to kill the leader
+ notLeaders = ensureAllReplicasAreActive(testCollectionName, "shard1", 1, 3, maxWaitSecsToSeeAllActive);
+ proxy0 = getProxyForReplica(notLeaders.get(0));
+ proxy0.close();
+
+ // indexing during a partition
+ // doc should be on leader and 1 replica
+ sendDoc(5);
+
+ assertDocExists(getHttpSolrServer(leader, testCollectionName), testCollectionName, "5");
+ assertDocExists(getHttpSolrServer(notLeaders.get(1), testCollectionName), testCollectionName, "5");
+
+ Thread.sleep(sleepMsBeforeHealPartition);
+
+ String shouldNotBeNewLeaderNode = notLeaders.get(0).getNodeName();
+
+ //chaosMonkey.expireSession(leaderJetty);
+ // kill the leader
+ leaderJetty.stop();
+ if (leaderJetty.isRunning())
+ fail("Failed to stop the leader on "+leaderNode);
+
+ SocketProxy oldLeaderProxy = getProxyForReplica(leader);
+ if (oldLeaderProxy != null) {
+ oldLeaderProxy.close();
+ } else {
+ log.warn("No SocketProxy found for old leader node "+leaderNode);
+ }
+
+ Thread.sleep(10000); // give chance for new leader to be elected.
+
+ Replica newLeader =
+ cloudClient.getZkStateReader().getLeaderRetry(testCollectionName, "shard1", 60000);
+
+ assertNotNull("No new leader was elected after 60 seconds; clusterState: "+
+ printClusterStateInfo(testCollectionName),newLeader);
+
+ assertTrue("Expected node "+shouldNotBeNewLeaderNode+
+ " to NOT be the new leader b/c it was out-of-sync with the old leader! ClusterState: "+
+ printClusterStateInfo(testCollectionName),
+ !shouldNotBeNewLeaderNode.equals(newLeader.getNodeName()));
+
+ proxy0.reopen();
+
+ long timeout = System.nanoTime() + TimeUnit.NANOSECONDS.convert(60, TimeUnit.SECONDS);
+ while (System.nanoTime() < timeout) {
+ cloudClient.getZkStateReader().updateClusterState(true);
+
+ List<Replica> activeReps = getActiveOrRecoveringReplicas(testCollectionName, "shard1");
+ if (activeReps.size() >= 2) break;
+ Thread.sleep(1000);
+ }
+
+ List<Replica> participatingReplicas = getActiveOrRecoveringReplicas(testCollectionName, "shard1");
+ assertTrue("Expected 2 of 3 replicas to be active but only found "+
+ participatingReplicas.size()+"; "+participatingReplicas+"; clusterState: "+
+ printClusterStateInfo(testCollectionName),
+ participatingReplicas.size() >= 2);
+
+ sendDoc(6);
+
+ Set<String> replicasToCheck = new HashSet<>();
+ for (Replica stillUp : participatingReplicas)
+ replicasToCheck.add(stillUp.getName());
+ waitToSeeReplicasActive(testCollectionName, "shard1", replicasToCheck, 20);
+ assertDocsExistInAllReplicas(participatingReplicas, testCollectionName, 1, 6);
+
+ // try to clean up
+ try {
+ CollectionAdminRequest req = new CollectionAdminRequest.Delete();
+ req.setCollectionName(testCollectionName);
+ req.process(cloudClient);
+ } catch (Exception e) {
+ // don't fail the test
+ log.warn("Could not delete collection {} after test completed", testCollectionName);
+ }
+ }
+}
+
Modified: lucene/dev/branches/lucene_solr_4_10/solr/test-framework/src/java/org/apache/solr/cloud/AbstractFullDistribZkTestBase.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene_solr_4_10/solr/test-framework/src/java/org/apache/solr/cloud/AbstractFullDistribZkTestBase.java?rev=1630196&r1=1630195&r2=1630196&view=diff
==============================================================================
--- lucene/dev/branches/lucene_solr_4_10/solr/test-framework/src/java/org/apache/solr/cloud/AbstractFullDistribZkTestBase.java (original)
+++ lucene/dev/branches/lucene_solr_4_10/solr/test-framework/src/java/org/apache/solr/cloud/AbstractFullDistribZkTestBase.java Wed Oct 8 18:41:39 2014
@@ -25,9 +25,12 @@ import static org.apache.solr.common.clo
import java.io.File;
import java.io.IOException;
+import java.net.ServerSocket;
import java.net.MalformedURLException;
import java.net.URI;
+import java.net.URL;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
@@ -78,6 +81,8 @@ import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
+import org.noggit.CharArr;
+import org.noggit.JSONWriter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -130,6 +135,8 @@ public abstract class AbstractFullDistri
private boolean cloudInit;
protected boolean checkCreatedVsState;
protected boolean useJettyDataDir = true;
+
+ protected Map<URI,SocketProxy> proxies = new HashMap<URI,SocketProxy>();
public static class CloudJettyRunner {
public JettySolrRunner jetty;
@@ -503,6 +510,85 @@ public abstract class AbstractFullDistri
return jetty;
}
+ /**
+ * Creates a JettySolrRunner with a socket proxy sitting infront of the Jetty server,
+ * which gives us the ability to simulate network partitions without having to fuss
+ * with IPTables.
+ */
+ public JettySolrRunner createProxiedJetty(File solrHome, String dataDir,
+ String shardList, String solrConfigOverride, String schemaOverride)
+ throws Exception {
+
+ JettySolrRunner jetty = new JettySolrRunner(solrHome.getPath(), context,
+ 0, solrConfigOverride, schemaOverride, false,
+ getExtraServlets(), sslConfig, getExtraRequestFilters());
+ jetty.setShards(shardList);
+ jetty.setDataDir(getDataDir(dataDir));
+
+ // setup to proxy Http requests to this server unless it is the control
+ // server
+ int proxyPort = getNextAvailablePort();
+ jetty.setProxyPort(proxyPort);
+ jetty.start();
+
+ // create a socket proxy for the jetty server ...
+ SocketProxy proxy = new SocketProxy(proxyPort, jetty.getBaseUrl().toURI());
+ proxies.put(proxy.getUrl(), proxy);
+
+ return jetty;
+ }
+
+ protected int getReplicaPort(Replica replica) {
+ String replicaNode = replica.getNodeName();
+ String tmp = replicaNode.substring(replicaNode.indexOf(':')+1);
+ if (tmp.indexOf('_') != -1)
+ tmp = tmp.substring(0,tmp.indexOf('_'));
+ return Integer.parseInt(tmp);
+ }
+
+ protected JettySolrRunner getJettyOnPort(int port) {
+ JettySolrRunner theJetty = null;
+ for (JettySolrRunner jetty : jettys) {
+ if (port == jetty.getLocalPort()) {
+ theJetty = jetty;
+ break;
+ }
+ }
+
+ if (theJetty == null) {
+ if (controlJetty.getLocalPort() == port) {
+ theJetty = controlJetty;
+ }
+ }
+
+ if (theJetty == null)
+ fail("Not able to find JettySolrRunner for port: "+port);
+
+ return theJetty;
+ }
+
+ protected SocketProxy getProxyForReplica(Replica replica) throws Exception {
+ String replicaBaseUrl = replica.getStr(ZkStateReader.BASE_URL_PROP);
+ assertNotNull(replicaBaseUrl);
+ URL baseUrl = new URL(replicaBaseUrl);
+
+ SocketProxy proxy = proxies.get(baseUrl.toURI());
+ if (proxy == null && !baseUrl.toExternalForm().endsWith("/")) {
+ baseUrl = new URL(baseUrl.toExternalForm() + "/");
+ proxy = proxies.get(baseUrl.toURI());
+ }
+ assertNotNull("No proxy found for " + baseUrl + "!", proxy);
+ return proxy;
+ }
+
+ protected int getNextAvailablePort() throws Exception {
+ int port = -1;
+ try (ServerSocket s = new ServerSocket(0)) {
+ port = s.getLocalPort();
+ }
+ return port;
+ }
+
private File getRelativeSolrHomePath(File solrHome) {
String path = SolrResourceLoader.normalizeDir(new File(".").getAbsolutePath());
String base = new File(solrHome.getPath()).getAbsolutePath();
@@ -1455,6 +1541,13 @@ public abstract class AbstractFullDistri
System.clearProperty("zkHost");
System.clearProperty("numShards");
+
+ // close socket proxies after super.tearDown
+ if (!proxies.isEmpty()) {
+ for (SocketProxy proxy : proxies.values()) {
+ proxy.close();
+ }
+ }
}
@Override
@@ -1769,4 +1862,92 @@ public abstract class AbstractFullDistri
baseServer.shutdown();
return r;
}
+
+ protected List<Replica> ensureAllReplicasAreActive(String testCollectionName, String shardId, int shards, int rf, int maxWaitSecs) throws Exception {
+ long startMs = System.currentTimeMillis();
+
+ Map<String,Replica> notLeaders = new HashMap<String,Replica>();
+
+ ZkStateReader zkr = cloudClient.getZkStateReader();
+ zkr.updateClusterState(true); // force the state to be fresh
+
+ ClusterState cs = zkr.getClusterState();
+ Collection<Slice> slices = cs.getActiveSlices(testCollectionName);
+ assertTrue(slices.size() == shards);
+ boolean allReplicasUp = false;
+ long waitMs = 0L;
+ long maxWaitMs = maxWaitSecs * 1000L;
+ Replica leader = null;
+ while (waitMs < maxWaitMs && !allReplicasUp) {
+ // refresh state every 2 secs
+ if (waitMs % 2000 == 0)
+ cloudClient.getZkStateReader().updateClusterState(true);
+
+ cs = cloudClient.getZkStateReader().getClusterState();
+ assertNotNull(cs);
+ Slice shard = cs.getSlice(testCollectionName, shardId);
+ assertNotNull("No Slice for "+shardId, shard);
+ allReplicasUp = true; // assume true
+ Collection<Replica> replicas = shard.getReplicas();
+ assertTrue(replicas.size() == rf);
+ leader = shard.getLeader();
+ assertNotNull(leader);
+ log.info("Found "+replicas.size()+" replicas and leader on "+
+ leader.getNodeName()+" for "+shardId+" in "+testCollectionName);
+
+ // ensure all replicas are "active" and identify the non-leader replica
+ for (Replica replica : replicas) {
+ String replicaState = replica.getStr(ZkStateReader.STATE_PROP);
+ if (!ZkStateReader.ACTIVE.equals(replicaState)) {
+ log.info("Replica " + replica.getName() + " is currently " + replicaState);
+ allReplicasUp = false;
+ }
+
+ if (!leader.equals(replica))
+ notLeaders.put(replica.getName(), replica);
+ }
+
+ if (!allReplicasUp) {
+ try {
+ Thread.sleep(500L);
+ } catch (Exception ignoreMe) {}
+ waitMs += 500L;
+ }
+ } // end while
+
+ if (!allReplicasUp)
+ fail("Didn't see all replicas for shard "+shardId+" in "+testCollectionName+
+ " come up within " + maxWaitMs + " ms! ClusterState: " + printClusterStateInfo());
+
+ if (notLeaders.isEmpty())
+ fail("Didn't isolate any replicas that are not the leader! ClusterState: " + printClusterStateInfo());
+
+ long diffMs = (System.currentTimeMillis() - startMs);
+ log.info("Took " + diffMs + " ms to see all replicas become active.");
+
+ List<Replica> replicas = new ArrayList<Replica>();
+ replicas.addAll(notLeaders.values());
+ return replicas;
+ }
+
+ protected String printClusterStateInfo() throws Exception {
+ return printClusterStateInfo(null);
+ }
+
+ protected String printClusterStateInfo(String collection) throws Exception {
+ cloudClient.getZkStateReader().updateClusterState(true);
+ String cs = null;
+ ClusterState clusterState = cloudClient.getZkStateReader().getClusterState();
+ if (collection != null) {
+ cs = clusterState.getCollection(collection).toString();
+ } else {
+ Map<String,DocCollection> map = new HashMap<String,DocCollection>();
+ for (String coll : clusterState.getCollections())
+ map.put(coll, clusterState.getCollection(coll));
+ CharArr out = new CharArr();
+ new JSONWriter(out, 2).write(map);
+ cs = out.toString();
+ }
+ return cs;
+ }
}
Added: lucene/dev/branches/lucene_solr_4_10/solr/test-framework/src/java/org/apache/solr/cloud/SocketProxy.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene_solr_4_10/solr/test-framework/src/java/org/apache/solr/cloud/SocketProxy.java?rev=1630196&view=auto
==============================================================================
--- lucene/dev/branches/lucene_solr_4_10/solr/test-framework/src/java/org/apache/solr/cloud/SocketProxy.java (added)
+++ lucene/dev/branches/lucene_solr_4_10/solr/test-framework/src/java/org/apache/solr/cloud/SocketProxy.java Wed Oct 8 18:41:39 2014
@@ -0,0 +1,407 @@
+package org.apache.solr.cloud;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.InetSocketAddress;
+import java.net.ServerSocket;
+import java.net.Socket;
+import java.net.SocketException;
+import java.net.SocketTimeoutException;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+import javax.net.ssl.SSLServerSocketFactory;
+import javax.net.ssl.SSLSocketFactory;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Kindly borrowed the idea and base implementation from the ActiveMQ project;
+ * useful for blocking traffic on a specified port.
+ */
+public class SocketProxy {
+
+ private static final transient Logger log = LoggerFactory.getLogger(SocketProxy.class);
+
+ public static final int ACCEPT_TIMEOUT_MILLIS = 100;
+
+ private URI proxyUrl;
+ private URI target;
+
+ private Acceptor acceptor;
+ private ServerSocket serverSocket;
+
+ private CountDownLatch closed = new CountDownLatch(1);
+
+ public List<Bridge> connections = new LinkedList<Bridge>();
+
+ private int listenPort = 0;
+
+ private int receiveBufferSize = -1;
+
+ private boolean pauseAtStart = false;
+
+ private int acceptBacklog = 50;
+
+ public SocketProxy() throws Exception {}
+
+ public SocketProxy(URI uri) throws Exception {
+ this(0, uri);
+ }
+
+ public SocketProxy(int port, URI uri) throws Exception {
+ listenPort = port;
+ target = uri;
+ open();
+ }
+
+ public String toString() {
+ return "SocketyProxy: port="+listenPort+"; target="+target;
+ }
+
+ public void setReceiveBufferSize(int receiveBufferSize) {
+ this.receiveBufferSize = receiveBufferSize;
+ }
+
+ public void setTarget(URI tcpBrokerUri) {
+ target = tcpBrokerUri;
+ }
+
+ public void open() throws Exception {
+ serverSocket = createServerSocket(target);
+ serverSocket.setReuseAddress(true);
+ if (receiveBufferSize > 0) {
+ serverSocket.setReceiveBufferSize(receiveBufferSize);
+ }
+ if (proxyUrl == null) {
+ serverSocket.bind(new InetSocketAddress(listenPort), acceptBacklog);
+ proxyUrl = urlFromSocket(target, serverSocket);
+ } else {
+ serverSocket.bind(new InetSocketAddress(proxyUrl.getPort()));
+ }
+ acceptor = new Acceptor(serverSocket, target);
+ if (pauseAtStart) {
+ acceptor.pause();
+ }
+ new Thread(null, acceptor, "SocketProxy-Acceptor-"
+ + serverSocket.getLocalPort()).start();
+ closed = new CountDownLatch(1);
+ }
+
+ private boolean isSsl(URI target) {
+ return "ssl".equals(target.getScheme());
+ }
+
+ private ServerSocket createServerSocket(URI target) throws Exception {
+ if (isSsl(target)) {
+ return SSLServerSocketFactory.getDefault().createServerSocket();
+ }
+ return new ServerSocket();
+ }
+
+ private Socket createSocket(URI target) throws Exception {
+ if (isSsl(target)) {
+ return SSLSocketFactory.getDefault().createSocket();
+ }
+ return new Socket();
+ }
+
+ public URI getUrl() {
+ return proxyUrl;
+ }
+
+ /*
+ * close all proxy connections and acceptor
+ */
+ public void close() {
+ List<Bridge> connections;
+ synchronized (this.connections) {
+ connections = new ArrayList<Bridge>(this.connections);
+ }
+ log.warn("Closing " + connections.size()+" connections to: "+getUrl());
+ for (Bridge con : connections) {
+ closeConnection(con);
+ }
+ acceptor.close();
+ closed.countDown();
+ }
+
+ /*
+ * close all proxy receive connections, leaving acceptor open
+ */
+ public void halfClose() {
+ List<Bridge> connections;
+ synchronized (this.connections) {
+ connections = new ArrayList<Bridge>(this.connections);
+ }
+ log.info("halfClose, numConnections=" + connections.size());
+ for (Bridge con : connections) {
+ halfCloseConnection(con);
+ }
+ }
+
+ public boolean waitUntilClosed(long timeoutSeconds)
+ throws InterruptedException {
+ return closed.await(timeoutSeconds, TimeUnit.SECONDS);
+ }
+
+ /*
+ * called after a close to restart the acceptor on the same port
+ */
+ public void reopen() {
+ log.info("Re-opening connectivity to "+getUrl());
+ try {
+ open();
+ } catch (Exception e) {
+ log.debug("exception on reopen url:" + getUrl(), e);
+ }
+ }
+
+ /*
+ * pause accepting new connections and data transfer through existing proxy
+ * connections. All sockets remain open
+ */
+ public void pause() {
+ synchronized (connections) {
+ log.info("pause, numConnections=" + connections.size());
+ acceptor.pause();
+ for (Bridge con : connections) {
+ con.pause();
+ }
+ }
+ }
+
+ /*
+ * continue after pause
+ */
+ public void goOn() {
+ synchronized (connections) {
+ log.info("goOn, numConnections=" + connections.size());
+ for (Bridge con : connections) {
+ con.goOn();
+ }
+ }
+ acceptor.goOn();
+ }
+
+ private void closeConnection(Bridge c) {
+ try {
+ c.close();
+ } catch (Exception e) {
+ log.debug("exception on close of: " + c, e);
+ }
+ }
+
+ private void halfCloseConnection(Bridge c) {
+ try {
+ c.halfClose();
+ } catch (Exception e) {
+ log.debug("exception on half close of: " + c, e);
+ }
+ }
+
+ public boolean isPauseAtStart() {
+ return pauseAtStart;
+ }
+
+ public void setPauseAtStart(boolean pauseAtStart) {
+ this.pauseAtStart = pauseAtStart;
+ }
+
+ public int getAcceptBacklog() {
+ return acceptBacklog;
+ }
+
+ public void setAcceptBacklog(int acceptBacklog) {
+ this.acceptBacklog = acceptBacklog;
+ }
+
+ private URI urlFromSocket(URI uri, ServerSocket serverSocket)
+ throws Exception {
+ int listenPort = serverSocket.getLocalPort();
+
+ return new URI(uri.getScheme(), uri.getUserInfo(), uri.getHost(),
+ listenPort, uri.getPath(), uri.getQuery(), uri.getFragment());
+ }
+
+ public class Bridge {
+
+ private Socket receiveSocket;
+ private Socket sendSocket;
+ private Pump requestThread;
+ private Pump responseThread;
+
+ public Bridge(Socket socket, URI target) throws Exception {
+ receiveSocket = socket;
+ sendSocket = createSocket(target);
+ if (receiveBufferSize > 0) {
+ sendSocket.setReceiveBufferSize(receiveBufferSize);
+ }
+ sendSocket.connect(new InetSocketAddress(target.getHost(), target
+ .getPort()));
+ linkWithThreads(receiveSocket, sendSocket);
+ log.info("proxy connection " + sendSocket + ", receiveBufferSize="
+ + sendSocket.getReceiveBufferSize());
+ }
+
+ public void goOn() {
+ responseThread.goOn();
+ requestThread.goOn();
+ }
+
+ public void pause() {
+ requestThread.pause();
+ responseThread.pause();
+ }
+
+ public void close() throws Exception {
+ synchronized (connections) {
+ connections.remove(this);
+ }
+ receiveSocket.close();
+ sendSocket.close();
+ }
+
+ public void halfClose() throws Exception {
+ receiveSocket.close();
+ }
+
+ private void linkWithThreads(Socket source, Socket dest) {
+ requestThread = new Pump(source, dest);
+ requestThread.start();
+ responseThread = new Pump(dest, source);
+ responseThread.start();
+ }
+
+ public class Pump extends Thread {
+
+ protected Socket src;
+ private Socket destination;
+ private AtomicReference<CountDownLatch> pause = new AtomicReference<CountDownLatch>();
+
+ public Pump(Socket source, Socket dest) {
+ super("SocketProxy-DataTransfer-" + source.getPort() + ":"
+ + dest.getPort());
+ src = source;
+ destination = dest;
+ pause.set(new CountDownLatch(0));
+ }
+
+ public void pause() {
+ pause.set(new CountDownLatch(1));
+ }
+
+ public void goOn() {
+ pause.get().countDown();
+ }
+
+ public void run() {
+ byte[] buf = new byte[1024];
+ try {
+ InputStream in = src.getInputStream();
+ OutputStream out = destination.getOutputStream();
+ while (true) {
+ int len = in.read(buf);
+ if (len == -1) {
+ log.debug("read eof from:" + src);
+ break;
+ }
+ pause.get().await();
+ out.write(buf, 0, len);
+ }
+ } catch (Exception e) {
+ log.debug("read/write failed, reason: " + e.getLocalizedMessage());
+ try {
+ if (!receiveSocket.isClosed()) {
+ // for halfClose, on read/write failure if we close the
+ // remote end will see a close at the same time.
+ close();
+ }
+ } catch (Exception ignore) {}
+ }
+ }
+ }
+ }
+
+ public class Acceptor implements Runnable {
+
+ private ServerSocket socket;
+ private URI target;
+ private AtomicReference<CountDownLatch> pause = new AtomicReference<CountDownLatch>();
+
+ public Acceptor(ServerSocket serverSocket, URI uri) {
+ socket = serverSocket;
+ target = uri;
+ pause.set(new CountDownLatch(0));
+ try {
+ socket.setSoTimeout(ACCEPT_TIMEOUT_MILLIS);
+ } catch (SocketException e) {
+ e.printStackTrace();
+ }
+ }
+
+ public void pause() {
+ pause.set(new CountDownLatch(1));
+ }
+
+ public void goOn() {
+ pause.get().countDown();
+ }
+
+ public void run() {
+ try {
+ while (!socket.isClosed()) {
+ pause.get().await();
+ try {
+ Socket source = socket.accept();
+ pause.get().await();
+ if (receiveBufferSize > 0) {
+ source.setReceiveBufferSize(receiveBufferSize);
+ }
+ log.info("accepted " + source + ", receiveBufferSize:"
+ + source.getReceiveBufferSize());
+ synchronized (connections) {
+ connections.add(new Bridge(source, target));
+ }
+ } catch (SocketTimeoutException expected) {}
+ }
+ } catch (Exception e) {
+ log.debug("acceptor: finished for reason: " + e.getLocalizedMessage());
+ }
+ }
+
+ public void close() {
+ try {
+ socket.close();
+ closed.countDown();
+ goOn();
+ } catch (IOException ignored) {}
+ }
+ }
+
+}
+