You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by th...@apache.org on 2014/10/08 20:41:39 UTC

svn commit: r1630196 - in /lucene/dev/branches/lucene_solr_4_10/solr: ./ core/src/java/org/apache/solr/cloud/ core/src/java/org/apache/solr/update/processor/ core/src/test/org/apache/solr/cloud/ test-framework/src/java/org/apache/solr/cloud/

Author: thelabdude
Date: Wed Oct  8 18:41:39 2014
New Revision: 1630196

URL: http://svn.apache.org/r1630196
Log:
SOLR-6511: backport to 4.10 branch

Added:
    lucene/dev/branches/lucene_solr_4_10/solr/core/src/test/org/apache/solr/cloud/LeaderFailoverAfterPartitionTest.java   (with props)
    lucene/dev/branches/lucene_solr_4_10/solr/test-framework/src/java/org/apache/solr/cloud/SocketProxy.java   (with props)
Removed:
    lucene/dev/branches/lucene_solr_4_10/solr/core/src/test/org/apache/solr/cloud/SocketProxy.java
Modified:
    lucene/dev/branches/lucene_solr_4_10/solr/CHANGES.txt
    lucene/dev/branches/lucene_solr_4_10/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java
    lucene/dev/branches/lucene_solr_4_10/solr/core/src/java/org/apache/solr/cloud/LeaderInitiatedRecoveryThread.java
    lucene/dev/branches/lucene_solr_4_10/solr/core/src/java/org/apache/solr/cloud/ZkController.java
    lucene/dev/branches/lucene_solr_4_10/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java
    lucene/dev/branches/lucene_solr_4_10/solr/core/src/test/org/apache/solr/cloud/HttpPartitionTest.java
    lucene/dev/branches/lucene_solr_4_10/solr/test-framework/src/java/org/apache/solr/cloud/AbstractFullDistribZkTestBase.java

Modified: lucene/dev/branches/lucene_solr_4_10/solr/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene_solr_4_10/solr/CHANGES.txt?rev=1630196&r1=1630195&r2=1630196&view=diff
==============================================================================
--- lucene/dev/branches/lucene_solr_4_10/solr/CHANGES.txt (original)
+++ lucene/dev/branches/lucene_solr_4_10/solr/CHANGES.txt Wed Oct  8 18:41:39 2014
@@ -19,6 +19,12 @@ See the tutorial at http://lucene.apache
 
 ==================  4.10.2 ==================
 
+Bug Fixes
+----------------------
+
+* SOLR-6511: Fencepost error in LeaderInitiatedRecoveryThread (Timothy Potter)
+
+
 Other Changes
 ----------------------
 
@@ -296,6 +302,7 @@ Optimizations
 * SOLR-6261: Run ZooKeeper watch event callbacks in parallel to the ZooKeeper
   event thread. (Ramkumar Aiyengar via Mark Miller)
 
+
 Other Changes
 ---------------------
 
@@ -438,7 +445,6 @@ New Features
 * SOLR-6064: DebugComponent track output should be returned as a JSON
   object rather than a list (Christine Poerschke, Alan Woodward)
 
-
 Bug Fixes
 ----------------------
 

Modified: lucene/dev/branches/lucene_solr_4_10/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene_solr_4_10/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java?rev=1630196&r1=1630195&r2=1630196&view=diff
==============================================================================
--- lucene/dev/branches/lucene_solr_4_10/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java (original)
+++ lucene/dev/branches/lucene_solr_4_10/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java Wed Oct  8 18:41:39 2014
@@ -382,7 +382,8 @@ final class ShardLeaderElectionContext e
                                                     collection,
                                                     shardId,
                                                     coreNodeProps,
-                                                    120);
+                                                    120,
+                                                    coreNodeName);
               zkController.ensureReplicaInLeaderInitiatedRecovery(
                   collection, shardId, coreNodeProps.getCoreUrl(), coreNodeProps, false);
               

Modified: lucene/dev/branches/lucene_solr_4_10/solr/core/src/java/org/apache/solr/cloud/LeaderInitiatedRecoveryThread.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene_solr_4_10/solr/core/src/java/org/apache/solr/cloud/LeaderInitiatedRecoveryThread.java?rev=1630196&r1=1630195&r2=1630196&view=diff
==============================================================================
--- lucene/dev/branches/lucene_solr_4_10/solr/core/src/java/org/apache/solr/cloud/LeaderInitiatedRecoveryThread.java (original)
+++ lucene/dev/branches/lucene_solr_4_10/solr/core/src/java/org/apache/solr/cloud/LeaderInitiatedRecoveryThread.java Wed Oct  8 18:41:39 2014
@@ -50,13 +50,15 @@ public class LeaderInitiatedRecoveryThre
   protected String shardId;
   protected ZkCoreNodeProps nodeProps;
   protected int maxTries;
+  protected String leaderCoreNodeName;
   
   public LeaderInitiatedRecoveryThread(ZkController zkController, 
                                        CoreContainer cc, 
                                        String collection, 
                                        String shardId, 
                                        ZkCoreNodeProps nodeProps,
-                                       int maxTries)
+                                       int maxTries,
+                                       String leaderCoreNodeName)
   {
     super("LeaderInitiatedRecoveryThread-"+nodeProps.getCoreName());
     this.zkController = zkController;
@@ -65,6 +67,7 @@ public class LeaderInitiatedRecoveryThre
     this.shardId = shardId;    
     this.nodeProps = nodeProps;
     this.maxTries = maxTries;
+    this.leaderCoreNodeName = leaderCoreNodeName;
     
     setDaemon(true);
   }
@@ -103,7 +106,7 @@ public class LeaderInitiatedRecoveryThre
     recoverRequestCmd.setAction(CoreAdminAction.REQUESTRECOVERY);
     recoverRequestCmd.setCoreName(coreNeedingRecovery);
     
-    while (continueTrying && ++tries < maxTries) {
+    while (continueTrying && ++tries <= maxTries) {
       if (tries > 1) {
         log.warn("Asking core={} coreNodeName={} on " + recoveryUrl +
             " to recover; unsuccessful after "+tries+" of "+maxTries+" attempts so far ...", coreNeedingRecovery, replicaCoreNodeName);
@@ -170,6 +173,24 @@ public class LeaderInitiatedRecoveryThre
           break;
         }
 
+        // stop trying if I'm no longer the leader
+        if (leaderCoreNodeName != null && collection != null) {
+          String leaderCoreNodeNameFromZk = null;
+          try {
+            leaderCoreNodeNameFromZk = zkController.getZkStateReader().getLeaderRetry(collection, shardId, 1000).getName();
+          } catch (Exception exc) {
+            log.error("Failed to determine if " + leaderCoreNodeName + " is still the leader for " + collection +
+                " " + shardId + " before starting leader-initiated recovery thread for " + replicaUrl + " due to: " + exc);
+          }
+          if (!leaderCoreNodeName.equals(leaderCoreNodeNameFromZk)) {
+            log.warn("Stop trying to send recovery command to downed replica core=" + coreNeedingRecovery +
+                ",coreNodeName=" + replicaCoreNodeName + " on " + replicaNodeName + " because " +
+                leaderCoreNodeName + " is no longer the leader! New leader is " + leaderCoreNodeNameFromZk);
+            continueTrying = false;
+            break;
+          }
+        }
+
         // additional safeguard against the replica trying to be in the active state
         // before acknowledging the leader initiated recovery command
         if (continueTrying && collection != null && shardId != null) {

Modified: lucene/dev/branches/lucene_solr_4_10/solr/core/src/java/org/apache/solr/cloud/ZkController.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene_solr_4_10/solr/core/src/java/org/apache/solr/cloud/ZkController.java?rev=1630196&r1=1630195&r2=1630196&view=diff
==============================================================================
--- lucene/dev/branches/lucene_solr_4_10/solr/core/src/java/org/apache/solr/cloud/ZkController.java (original)
+++ lucene/dev/branches/lucene_solr_4_10/solr/core/src/java/org/apache/solr/cloud/ZkController.java Wed Oct  8 18:41:39 2014
@@ -30,6 +30,7 @@ import java.util.Collections;
 import java.util.Enumeration;
 import java.util.HashMap;
 import java.util.Iterator;
+import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Locale;
 import java.util.Map;
@@ -74,6 +75,8 @@ import org.apache.zookeeper.KeeperExcept
 import org.apache.zookeeper.KeeperException.NoNodeException;
 import org.apache.zookeeper.KeeperException.SessionExpiredException;
 import org.apache.zookeeper.data.Stat;
+import org.noggit.JSONParser;
+import org.noggit.ObjectBuilder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -1884,16 +1887,19 @@ public final class ZkController {
   }  
   
   public String getLeaderInitiatedRecoveryState(String collection, String shardId, String coreNodeName) {
-    
+    Map<String,Object> stateObj = getLeaderInitiatedRecoveryStateObject(collection, shardId, coreNodeName);
+    return (stateObj != null) ? (String)stateObj.get("state") : null;
+  }
+
+  public Map<String,Object> getLeaderInitiatedRecoveryStateObject(String collection, String shardId, String coreNodeName) {
+
     if (collection == null || shardId == null || coreNodeName == null)
       return null; // if we don't have complete data about a core in cloud mode, return null
     
     String znodePath = getLeaderInitiatedRecoveryZnodePath(collection, shardId, coreNodeName);
-    String state = null;
+    byte[] stateData = null;
     try {
-      byte[] data = zkClient.getData(znodePath, null, new Stat(), false);
-      if (data != null && data.length > 0)
-        state = new String(data, "UTF-8");
+      stateData = zkClient.getData(znodePath, null, new Stat(), false);
     } catch (NoNodeException ignoreMe) {
       // safe to ignore as this znode will only exist if the leader initiated recovery
     } catch (ConnectionLossException cle) {
@@ -1904,8 +1910,6 @@ public final class ZkController {
       // sort of safe to ignore ??? Usually these are seen when the core is going down
       // or there are bigger issues to deal with than reading this znode
       log.warn("Unable to read "+znodePath+" due to: "+see);
-    } catch (UnsupportedEncodingException e) {
-      throw new Error("JVM Does not seem to support UTF-8", e);
     } catch (Exception exc) {
       log.error("Failed to read data from znode "+znodePath+" due to: "+exc);
       if (exc instanceof SolrException) {
@@ -1915,9 +1919,14 @@ public final class ZkController {
             "Failed to read data from znodePath: "+znodePath, exc);
       }
     }
-    return state;
+
+    Map<String,Object> stateObj = null;
+    if (stateData != null && stateData.length > 0)
+      stateObj = (Map<String, Object>) ZkStateReader.fromJSON(stateData);
+
+    return stateObj;
   }
-  
+
   private void updateLeaderInitiatedRecoveryState(String collection, String shardId, String coreNodeName, String state) {
     if (collection == null || shardId == null || coreNodeName == null) {
       log.warn("Cannot set leader-initiated recovery state znode to "+state+" using: collection="+collection+
@@ -1926,7 +1935,7 @@ public final class ZkController {
     }
 
     String znodePath = getLeaderInitiatedRecoveryZnodePath(collection, shardId, coreNodeName);
-    
+
     if (ZkStateReader.ACTIVE.equals(state)) {
       // since we're marking it active, we don't need this znode anymore, so delete instead of update
       try {
@@ -1936,14 +1945,22 @@ public final class ZkController {
       }
       return;
     }
-    
-    byte[] znodeData = null;
+
+    Map<String,Object> stateObj = null;
     try {
-      znodeData = state.getBytes("UTF-8");
-    } catch (UnsupportedEncodingException e) {
-      throw new Error("JVM Does not seem to support UTF-8", e);
+      stateObj = getLeaderInitiatedRecoveryStateObject(collection, shardId, coreNodeName);
+    } catch (Exception exc) {
+      log.warn(exc.getMessage(), exc);
     }
+    if (stateObj == null)
+      stateObj = new LinkedHashMap<String,Object>();
 
+    stateObj.put("state", state);
+    // only update the createdBy value if its not set
+    if (stateObj.get("createdByNodeName") == null)
+      stateObj.put("createdByNodeName", String.valueOf(this.nodeName));
+
+    byte[] znodeData = ZkStateReader.toJSON(stateObj);
     boolean retryOnConnLoss = true; // be a little more robust when trying to write data
     try {
       if (zkClient.exists(znodePath, retryOnConnLoss)) {
@@ -1956,8 +1973,8 @@ public final class ZkController {
       if (exc instanceof SolrException) {
         throw (SolrException)exc;
       } else {
-        throw new SolrException(ErrorCode.SERVER_ERROR, 
-            "Failed to update data to "+state+" for znode: "+znodePath, exc);        
+        throw new SolrException(ErrorCode.SERVER_ERROR,
+            "Failed to update data to "+state+" for znode: "+znodePath, exc);
       }
     }
   }

Modified: lucene/dev/branches/lucene_solr_4_10/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene_solr_4_10/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java?rev=1630196&r1=1630195&r2=1630196&view=diff
==============================================================================
--- lucene/dev/branches/lucene_solr_4_10/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java (original)
+++ lucene/dev/branches/lucene_solr_4_10/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java Wed Oct  8 18:41:39 2014
@@ -593,7 +593,9 @@ public class DistributedUpdateProcessor 
         String fromCollection = req.getParams().get(DISTRIB_FROM_COLLECTION); // is it because of a routing rule?
         if (fromCollection == null)  {
           log.error("Request says it is coming from leader, but we are the leader: " + req.getParamString());
-          throw new SolrException(ErrorCode.SERVICE_UNAVAILABLE, "Request says it is coming from leader, but we are the leader");
+          SolrException solrExc = new SolrException(ErrorCode.SERVICE_UNAVAILABLE, "Request says it is coming from leader, but we are the leader");
+          solrExc.setMetadata("cause", "LeaderChanged");
+          throw solrExc;
         }
       }
     }
@@ -803,57 +805,91 @@ public class DistributedUpdateProcessor 
           DistribPhase.parseParam(error.req.uReq.getParams().get(DISTRIB_UPDATE_PARAM));       
       if (phase != DistribPhase.FROMLEADER)
         continue; // don't have non-leaders try to recovery other nodes
-      
-      final String replicaUrl = error.req.node.getUrl();      
+
+      final String replicaUrl = error.req.node.getUrl();
+
+      // if the remote replica failed the request because of leader change (SOLR-6511), then fail the request
+      String cause = (error.e instanceof SolrException) ? ((SolrException)error.e).getMetadata("cause") : null;
+      if ("LeaderChanged".equals(cause)) {
+        // let's just fail this request and let the client retry? or just call processAdd again?
+        log.error("On "+cloudDesc.getCoreNodeName()+", replica "+replicaUrl+
+            " now thinks it is the leader! Failing the request to let the client retry! "+error.e);
+        rsp.setException(error.e);
+        break;
+      }
 
       int maxTries = 1;       
       boolean sendRecoveryCommand = true;
       String collection = null;
       String shardId = null;
-      
+
       if (error.req.node instanceof StdNode) {
         StdNode stdNode = (StdNode)error.req.node;
         collection = stdNode.getCollection();
         shardId = stdNode.getShardId();
+
+        // before we go setting other replicas to down, make sure we're still the leader!
+        String leaderCoreNodeName = null;
         try {
-          // if false, then the node is probably not "live" anymore
-          sendRecoveryCommand = 
-              zkController.ensureReplicaInLeaderInitiatedRecovery(collection, 
-                                                                  shardId, 
-                                                                  replicaUrl, 
-                                                                  stdNode.getNodeProps(), 
-                                                                  false);
-          
-          // we want to try more than once, ~10 minutes
-          if (sendRecoveryCommand) {
-            maxTries = 120;
-          } // else the node is no longer "live" so no need to send any recovery command
-          
-        } catch (Exception e) {
-          log.error("Leader failed to set replica "+
-              error.req.node.getUrl()+" state to DOWN due to: "+e, e);
+          leaderCoreNodeName = zkController.getZkStateReader().getLeaderRetry(collection, shardId).getName();
+        } catch (Exception exc) {
+          log.error("Failed to determine if " + cloudDesc.getCoreNodeName() + " is still the leader for " + collection +
+              " " + shardId + " before putting " + replicaUrl + " into leader-initiated recovery due to: " + exc);
+        }
+
+        if (cloudDesc.getCoreNodeName().equals(leaderCoreNodeName)) {
+          try {
+            // if false, then the node is probably not "live" anymore
+            sendRecoveryCommand =
+                zkController.ensureReplicaInLeaderInitiatedRecovery(collection,
+                    shardId,
+                    replicaUrl,
+                    stdNode.getNodeProps(),
+                    false);
+
+            // we want to try more than once, ~10 minutes
+            if (sendRecoveryCommand) {
+              maxTries = 120;
+            } // else the node is no longer "live" so no need to send any recovery command
+          } catch (Exception exc) {
+            Throwable setLirZnodeFailedCause = SolrException.getRootCause(exc);
+            log.error("Leader failed to set replica " +
+                error.req.node.getUrl() + " state to DOWN due to: " + setLirZnodeFailedCause, setLirZnodeFailedCause);
+            if (setLirZnodeFailedCause instanceof KeeperException.SessionExpiredException ||
+                setLirZnodeFailedCause instanceof KeeperException.ConnectionLossException) {
+              // our session is expired, which means our state is suspect, so don't go
+              // putting other replicas in recovery (see SOLR-6511)
+              sendRecoveryCommand = false;
+            } // else will go ahead and try to send the recovery command once after this error
+          }
+        } else {
+          // not the leader anymore maybe?
+          sendRecoveryCommand = false;
+          log.warn("Core "+cloudDesc.getCoreNodeName()+" is no longer the leader for "+collection+" "+
+              shardId+", no request recovery command will be sent!");
         }
       } // else not a StdNode, recovery command still gets sent once
             
       if (!sendRecoveryCommand)
         continue; // the replica is already in recovery handling or is not live   
-      
-      Throwable rootCause = SolrException.getRootCause(error.e);      
-      log.error("Setting up to try to start recovery on replica "+replicaUrl+" after: "+rootCause);
-      
+
+      Throwable rootCause = SolrException.getRootCause(error.e);
+      log.error("Setting up to try to start recovery on replica " + replicaUrl + " after: " + rootCause);
+
       // try to send the recovery command to the downed replica in a background thread
-      CoreContainer coreContainer = req.getCore().getCoreDescriptor().getCoreContainer();      
-      LeaderInitiatedRecoveryThread lirThread = 
+      CoreContainer coreContainer = req.getCore().getCoreDescriptor().getCoreContainer();
+      LeaderInitiatedRecoveryThread lirThread =
           new LeaderInitiatedRecoveryThread(zkController,
-                                            coreContainer,
-                                            collection,
-                                            shardId,
-                                            error.req.node.getNodeProps(),
-                                            maxTries);
+              coreContainer,
+              collection,
+              shardId,
+              error.req.node.getNodeProps(),
+              maxTries,
+              cloudDesc.getCoreNodeName()); // core node name of current leader
       ExecutorService executor = coreContainer.getUpdateShardHandler().getUpdateExecutor();
-      executor.execute(lirThread);      
+      executor.execute(lirThread);
     }
-    
+
     if (replicationTracker != null) {
       rsp.getResponseHeader().add(UpdateRequest.REPFACT, replicationTracker.getAchievedRf());
       rsp.getResponseHeader().add(UpdateRequest.MIN_REPFACT, replicationTracker.minRf);

Modified: lucene/dev/branches/lucene_solr_4_10/solr/core/src/test/org/apache/solr/cloud/HttpPartitionTest.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene_solr_4_10/solr/core/src/test/org/apache/solr/cloud/HttpPartitionTest.java?rev=1630196&r1=1630195&r2=1630196&view=diff
==============================================================================
--- lucene/dev/branches/lucene_solr_4_10/solr/core/src/test/org/apache/solr/cloud/HttpPartitionTest.java (original)
+++ lucene/dev/branches/lucene_solr_4_10/solr/core/src/test/org/apache/solr/cloud/HttpPartitionTest.java Wed Oct  8 18:41:39 2014
@@ -18,17 +18,15 @@ package org.apache.solr.cloud;
  */
 
 import java.io.File;
-import java.net.ServerSocket;
-import java.net.URI;
-import java.net.URL;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.TimeUnit;
 
-import org.apache.lucene.util.LuceneTestCase.AwaitsFix;
 import org.apache.lucene.util.LuceneTestCase.Slow;
 import org.apache.solr.JSONTestUtil;
 import org.apache.solr.SolrTestCaseJ4.SuppressSSL;
@@ -36,6 +34,8 @@ import org.apache.solr.client.solrj.embe
 import org.apache.solr.client.solrj.impl.HttpSolrServer;
 import org.apache.solr.client.solrj.request.CollectionAdminRequest;
 import org.apache.solr.client.solrj.request.QueryRequest;
+import org.apache.solr.client.solrj.request.UpdateRequest;
+import org.apache.solr.common.SolrException;
 import org.apache.solr.common.SolrInputDocument;
 import org.apache.solr.common.cloud.ClusterState;
 import org.apache.solr.common.cloud.Replica;
@@ -52,26 +52,24 @@ import org.slf4j.LoggerFactory;
  * Simulates HTTP partitions between a leader and replica but the replica does
  * not lose its ZooKeeper connection.
  */
+
 @Slow
 @SuppressSSL(bugUrl = "https://issues.apache.org/jira/browse/SOLR-5776")
-@AwaitsFix(bugUrl = "https://issues.apache.org/jira/browse/SOLR-6241")
 public class HttpPartitionTest extends AbstractFullDistribZkTestBase {
   
-  private static final transient Logger log = 
+  protected static final transient Logger log =
       LoggerFactory.getLogger(HttpPartitionTest.class);
   
   // To prevent the test assertions firing too fast before cluster state
   // recognizes (and propagates) partitions
-  private static final long sleepMsBeforeHealPartition = 2000L;
-  
-  private static final int maxWaitSecsToSeeAllActive = 30;
-  
-  private Map<URI,SocketProxy> proxies = new HashMap<URI,SocketProxy>();
-  
+  protected static final long sleepMsBeforeHealPartition = 2000L;
+
+  protected static final int maxWaitSecsToSeeAllActive = 30;
+
   public HttpPartitionTest() {
     super();
     sliceCount = 2;
-    shardCount = 2;
+    shardCount = 3;
   }
   
   @Before
@@ -84,73 +82,45 @@ public class HttpPartitionTest extends A
   @Override
   @After
   public void tearDown() throws Exception {    
-    System.clearProperty("numShards");
-    
     try {
       super.tearDown();
     } catch (Exception exc) {}
     
     resetExceptionIgnores();
-    
-    // close socket proxies after super.tearDown
-    if (!proxies.isEmpty()) {
-      for (SocketProxy proxy : proxies.values()) {
-        proxy.close();
-      }
-    }    
   }
   
   /**
-   * Overrides the parent implementation so that we can configure a socket proxy
-   * to sit infront of each Jetty server, which gives us the ability to simulate
-   * network partitions without having to fuss with IPTables (which is not very
-   * cross platform friendly).
+   * Overrides the parent implementation to install a SocketProxy in-front of the Jetty server.
    */
   @Override
   public JettySolrRunner createJetty(File solrHome, String dataDir,
       String shardList, String solrConfigOverride, String schemaOverride)
-      throws Exception {
-    
-    JettySolrRunner jetty = new JettySolrRunner(solrHome.getPath(), context,
-        0, solrConfigOverride, schemaOverride, false,
-        getExtraServlets(), sslConfig, getExtraRequestFilters());
-    jetty.setShards(shardList);
-    jetty.setDataDir(getDataDir(dataDir));      
-    
-    // setup to proxy Http requests to this server unless it is the control
-    // server
-    int proxyPort = getNextAvailablePort();
-    jetty.setProxyPort(proxyPort);        
-    jetty.start();        
-    
-    // create a socket proxy for the jetty server ...
-    SocketProxy proxy = new SocketProxy(proxyPort, jetty.getBaseUrl().toURI());
-    proxies.put(proxy.getUrl(), proxy);
-    
-    return jetty;
+      throws Exception
+  {
+    return createProxiedJetty(solrHome, dataDir, shardList, solrConfigOverride, schemaOverride);
   }
-  
-  protected int getNextAvailablePort() throws Exception {    
-    int port = -1;
-    try (ServerSocket s = new ServerSocket(0)) {
-      port = s.getLocalPort();
-    }
-    return port;
-  }
-   
+
   @Override
   public void doTest() throws Exception {
     waitForThingsToLevelOut(30000);
-    
+
     // test a 1x2 collection
     testRf2();
 
+    waitForThingsToLevelOut(30000);
+
     // now do similar for a 1x3 collection while taking 2 replicas on-and-off
     // each time
     testRf3();
 
-    // kill a leader and make sure recovery occurs as expected
-    testRf3WithLeaderFailover();    
+    waitForThingsToLevelOut(30000);
+
+    // have the leader lose its Zk session temporarily
+    testLeaderZkSessionLoss();
+
+    waitForThingsToLevelOut(30000);
+
+    log.info("HttpParitionTest succeeded ... shutting down now!");
   }
   
   protected void testRf2() throws Exception {
@@ -162,7 +132,7 @@ public class HttpPartitionTest extends A
     sendDoc(1);
     
     Replica notLeader = 
-        ensureAllReplicasAreActive(testCollectionName, 2, maxWaitSecsToSeeAllActive).get(0);
+        ensureAllReplicasAreActive(testCollectionName, "shard1", 1, 2, maxWaitSecsToSeeAllActive).get(0);
     
     // ok, now introduce a network partition between the leader and the replica
     SocketProxy proxy = getProxyForReplica(notLeader);
@@ -184,7 +154,7 @@ public class HttpPartitionTest extends A
     proxy.reopen();
     
     List<Replica> notLeaders = 
-        ensureAllReplicasAreActive(testCollectionName, 2, maxWaitSecsToSeeAllActive);
+        ensureAllReplicasAreActive(testCollectionName, "shard1", 1, 2, maxWaitSecsToSeeAllActive);
     
     sendDoc(3);
     
@@ -216,11 +186,13 @@ public class HttpPartitionTest extends A
       proxy.reopen();
     }
     
-    notLeaders = ensureAllReplicasAreActive(testCollectionName, 2, maxWaitSecsToSeeAllActive);
+    notLeaders = ensureAllReplicasAreActive(testCollectionName, "shard1", 1, 2, maxWaitSecsToSeeAllActive);
     
     // verify all docs received
     assertDocsExistInAllReplicas(notLeaders, testCollectionName, 1, numDocs + 3);
 
+    log.info("testRf2 succeeded ... deleting the "+testCollectionName+" collection");
+
     // try to clean up
     try {
       CollectionAdminRequest req = new CollectionAdminRequest.Delete();
@@ -241,14 +213,12 @@ public class HttpPartitionTest extends A
     sendDoc(1);
     
     List<Replica> notLeaders = 
-        ensureAllReplicasAreActive(testCollectionName, 3, maxWaitSecsToSeeAllActive);
+        ensureAllReplicasAreActive(testCollectionName, "shard1", 1, 3, maxWaitSecsToSeeAllActive);
     assertTrue("Expected 2 replicas for collection " + testCollectionName
         + " but found " + notLeaders.size() + "; clusterState: "
-        + printClusterStateInfo(),
+        + printClusterStateInfo(testCollectionName),
         notLeaders.size() == 2);
-    
-    sendDoc(1);
-    
+
     // ok, now introduce a network partition between the leader and the replica
     SocketProxy proxy0 = getProxyForReplica(notLeaders.get(0));
     
@@ -271,11 +241,14 @@ public class HttpPartitionTest extends A
     proxy1.reopen();
     
     // sent 4 docs in so far, verify they are on the leader and replica
-    notLeaders = ensureAllReplicasAreActive(testCollectionName, 3, maxWaitSecsToSeeAllActive); 
+    notLeaders = ensureAllReplicasAreActive(testCollectionName, "shard1", 1, 3, maxWaitSecsToSeeAllActive); 
     
     sendDoc(4);
     
     assertDocsExistInAllReplicas(notLeaders, testCollectionName, 1, 4);
+
+    log.info("testRf3 succeeded ... deleting the "+testCollectionName+" collection");
+
     // try to clean up
     try {
       CollectionAdminRequest req = new CollectionAdminRequest.Delete();
@@ -286,127 +259,103 @@ public class HttpPartitionTest extends A
       log.warn("Could not delete collection {} after test completed", testCollectionName);
     }
   }
-  
-  protected void testRf3WithLeaderFailover() throws Exception {
-    // now let's create a partition in one of the replicas and outright
-    // kill the leader ... see what happens
-    // create a collection that has 1 shard but 3 replicas
-    String testCollectionName = "c8n_1x3_lf"; // _lf is leader fails
-    createCollection(testCollectionName, 1, 3, 1);
+
+  // test inspired by SOLR-6511
+  protected void testLeaderZkSessionLoss() throws Exception {
+
+    String testCollectionName = "c8n_1x2_leader_session_loss";
+    createCollection(testCollectionName, 1, 2, 1);
     cloudClient.setDefaultCollection(testCollectionName);
-    
-    sendDoc(1);
-    
-    List<Replica> notLeaders = 
-        ensureAllReplicasAreActive(testCollectionName, 3, maxWaitSecsToSeeAllActive);
-    assertTrue("Expected 2 replicas for collection " + testCollectionName
-        + " but found " + notLeaders.size() + "; clusterState: "
-        + printClusterStateInfo(),
-        notLeaders.size() == 2);
-        
+
     sendDoc(1);
-    
-    // ok, now introduce a network partition between the leader and the replica
-    SocketProxy proxy0 = null;
-    proxy0 = getProxyForReplica(notLeaders.get(0));
-    
-    proxy0.close();
-    
-    // indexing during a partition
-    sendDoc(2);
-    
-    Thread.sleep(sleepMsBeforeHealPartition);
-    
-    proxy0.reopen();
-    
-    SocketProxy proxy1 = getProxyForReplica(notLeaders.get(1));
-    
-    proxy1.close();
-    
-    sendDoc(3);
-    
-    Thread.sleep(sleepMsBeforeHealPartition);
-    proxy1.reopen();
-    
-    // sent 4 docs in so far, verify they are on the leader and replica
-    notLeaders = ensureAllReplicasAreActive(testCollectionName, 3, maxWaitSecsToSeeAllActive); 
-    
-    sendDoc(4);
-    
-    assertDocsExistInAllReplicas(notLeaders, testCollectionName, 1, 4);    
-        
-    Replica leader = 
+
+    List<Replica> notLeaders =
+        ensureAllReplicasAreActive(testCollectionName, "shard1", 1, 2, maxWaitSecsToSeeAllActive);
+    assertTrue("Expected 1 replicas for collection " + testCollectionName
+            + " but found " + notLeaders.size() + "; clusterState: "
+            + printClusterStateInfo(testCollectionName),
+        notLeaders.size() == 1);
+
+    Replica leader =
         cloudClient.getZkStateReader().getLeaderRetry(testCollectionName, "shard1");
     String leaderNode = leader.getNodeName();
     assertNotNull("Could not find leader for shard1 of "+
-      testCollectionName+"; clusterState: "+printClusterStateInfo(), leader);
+        testCollectionName+"; clusterState: "+printClusterStateInfo(testCollectionName), leader);
     JettySolrRunner leaderJetty = getJettyOnPort(getReplicaPort(leader));
-    
-    // since maxShardsPerNode is 1, we're safe to kill the leader
-    notLeaders = ensureAllReplicasAreActive(testCollectionName, 3, maxWaitSecsToSeeAllActive);    
-    proxy0 = getProxyForReplica(notLeaders.get(0));
-    proxy0.close();
-        
-    // indexing during a partition
-    // doc should be on leader and 1 replica
-    sendDoc(5);
-    
-    Thread.sleep(sleepMsBeforeHealPartition);
-    
-    String shouldNotBeNewLeaderNode = notLeaders.get(0).getNodeName();
 
-    // kill the leader
-    leaderJetty.stop();
-    
-    if (leaderJetty.isRunning())
-      fail("Failed to stop the leader on "+leaderNode);
-        
-    SocketProxy oldLeaderProxy = getProxyForReplica(leader);
-    if (oldLeaderProxy != null) {
-      oldLeaderProxy.close();      
-    } else {
-      log.warn("No SocketProxy found for old leader node "+leaderNode);      
-    }
+    HttpSolrServer leaderSolr = getHttpSolrServer(leader, testCollectionName);
+    SolrInputDocument doc = new SolrInputDocument();
+    doc.addField(id, String.valueOf(2));
+    doc.addField("a_t", "hello" + 2);
 
-    Thread.sleep(10000); // give chance for new leader to be elected.
-    
-    Replica newLeader = 
-        cloudClient.getZkStateReader().getLeaderRetry(testCollectionName, "shard1", 60000);
-        
-    assertNotNull("No new leader was elected after 60 seconds; clusterState: "+
-      printClusterStateInfo(),newLeader);
-        
-    assertTrue("Expected node "+shouldNotBeNewLeaderNode+
-        " to NOT be the new leader b/c it was out-of-sync with the old leader! ClusterState: "+
-        printClusterStateInfo(), 
-        !shouldNotBeNewLeaderNode.equals(newLeader.getNodeName()));
-    
-    proxy0.reopen();
-    
+    // cause leader migration by expiring the current leader's zk session
+    chaosMonkey.expireSession(leaderJetty);
+
+    String expectedNewLeaderCoreNodeName = notLeaders.get(0).getName();
     long timeout = System.nanoTime() + TimeUnit.NANOSECONDS.convert(60, TimeUnit.SECONDS);
     while (System.nanoTime() < timeout) {
-      cloudClient.getZkStateReader().updateClusterState(true);
+      String currentLeaderName = null;
+      try {
+        Replica currentLeader =
+            cloudClient.getZkStateReader().getLeaderRetry(testCollectionName, "shard1");
+        currentLeaderName = currentLeader.getName();
+      } catch (Exception exc) {}
+
+      if (expectedNewLeaderCoreNodeName.equals(currentLeaderName))
+        break; // new leader was elected after zk session expiration
 
-      List<Replica> activeReps = getActiveOrRecoveringReplicas(testCollectionName, "shard1");
-      if (activeReps.size() == 2) break;
-      Thread.sleep(1000);
+      Thread.sleep(500);
     }
 
-    List<Replica> activeReps = getActiveOrRecoveringReplicas(testCollectionName, "shard1");
-    assertTrue("Expected 2 of 3 replicas to be active but only found "+
-            activeReps.size()+"; "+activeReps+"; clusterState: "+printClusterStateInfo(),
-        activeReps.size() == 2);
+    Replica currentLeader =
+        cloudClient.getZkStateReader().getLeaderRetry(testCollectionName, "shard1");
+    assertEquals(expectedNewLeaderCoreNodeName, currentLeader.getName());
 
-    sendDoc(6);
+    // TODO: This test logic seems to be timing dependent and fails on Jenkins
+    // need to come up with a better approach
+    log.info("Sending doc 2 to old leader "+leader.getName());
+    try {
+      leaderSolr.add(doc);
+      leaderSolr.shutdown();
 
-    assertDocsExistInAllReplicas(activeReps, testCollectionName, 1, 6);
-  }
-  
-  protected String printClusterStateInfo() throws Exception {
-    cloudClient.getZkStateReader().updateClusterState(true);
-    return String.valueOf(cloudClient.getZkStateReader().getClusterState());
+      // if the add worked, then the doc must exist on the new leader
+      HttpSolrServer newLeaderSolr = getHttpSolrServer(currentLeader, testCollectionName);
+      try {
+        assertDocExists(newLeaderSolr, testCollectionName, "2");
+      } finally {
+        newLeaderSolr.shutdown();
+      }
+
+    } catch (SolrException exc) {
+      // this is ok provided the doc doesn't exist on the current leader
+      leaderSolr = getHttpSolrServer(currentLeader, testCollectionName);
+      try {
+        leaderSolr.add(doc); // this should work
+      } finally {
+        leaderSolr.shutdown();
+      }
+    }
+
+    List<Replica> participatingReplicas = getActiveOrRecoveringReplicas(testCollectionName, "shard1");
+    Set<String> replicasToCheck = new HashSet<>();
+    for (Replica stillUp : participatingReplicas)
+      replicasToCheck.add(stillUp.getName());
+    waitToSeeReplicasActive(testCollectionName, "shard1", replicasToCheck, 20);
+    assertDocsExistInAllReplicas(participatingReplicas, testCollectionName, 1, 2);
+
+    log.info("testLeaderZkSessionLoss succeeded ... deleting the "+testCollectionName+" collection");
+
+    // try to clean up
+    try {
+      CollectionAdminRequest req = new CollectionAdminRequest.Delete();
+      req.setCollectionName(testCollectionName);
+      req.process(cloudClient);
+    } catch (Exception e) {
+      // don't fail the test
+      log.warn("Could not delete collection {} after test completed", testCollectionName);
+    }
   }
-  
+
   protected List<Replica> getActiveOrRecoveringReplicas(String testCollectionName, String shardId) throws Exception {    
     Map<String,Replica> activeReplicas = new HashMap<String,Replica>();    
     ZkStateReader zkr = cloudClient.getZkStateReader();
@@ -426,21 +375,7 @@ public class HttpPartitionTest extends A
     replicas.addAll(activeReplicas.values());
     return replicas;
   }  
-  
-  protected SocketProxy getProxyForReplica(Replica replica) throws Exception {
-    String replicaBaseUrl = replica.getStr(ZkStateReader.BASE_URL_PROP);
-    assertNotNull(replicaBaseUrl);
-    URL baseUrl = new URL(replicaBaseUrl);
-    
-    SocketProxy proxy = proxies.get(baseUrl.toURI());
-    if (proxy == null && !baseUrl.toExternalForm().endsWith("/")) {
-      baseUrl = new URL(baseUrl.toExternalForm() + "/");
-      proxy = proxies.get(baseUrl.toURI());
-    }
-    assertNotNull("No proxy found for " + baseUrl + "!", proxy);
-    return proxy;
-  }
-  
+
   protected void assertDocsExistInAllReplicas(List<Replica> notLeaders,
       String testCollectionName, int firstDocId, int lastDocId)
       throws Exception {
@@ -478,112 +413,84 @@ public class HttpPartitionTest extends A
   }
   
   protected void sendDoc(int docId) throws Exception {
+    UpdateRequest up = new UpdateRequest();
+    up.setParam(UpdateRequest.MIN_REPFACT, String.valueOf(2));
     SolrInputDocument doc = new SolrInputDocument();
     doc.addField(id, String.valueOf(docId));
     doc.addField("a_t", "hello" + docId);
-    cloudClient.add(doc);
-  }
-  
-  protected List<Replica> ensureAllReplicasAreActive(String testCollectionName, int rf, int maxWaitSecs) throws Exception {
-    long startMs = System.currentTimeMillis();
-    
-    Map<String,Replica> notLeaders = new HashMap<String,Replica>();
-    
-    ZkStateReader zkr = cloudClient.getZkStateReader();
-    ClusterState cs = zkr.getClusterState();
-    Collection<Slice> slices = cs.getActiveSlices(testCollectionName);
-    assertTrue(slices.size() == 1); // shards == 1
-    boolean allReplicasUp = false;
-    long waitMs = 0L;
-    long maxWaitMs = maxWaitSecs * 1000L;
-    Replica leader = null;
-    while (waitMs < maxWaitMs && !allReplicasUp) {
-      cs = zkr.getClusterState();
-      assertNotNull(cs);
-      for (Slice shard : cs.getActiveSlices(testCollectionName)) {
-        allReplicasUp = true; // assume true
-        Collection<Replica> replicas = shard.getReplicas();
-        assertTrue(replicas.size() == rf);
-        leader = shard.getLeader();
-        assertNotNull(leader);
-        
-        // ensure all replicas are "active" and identify the non-leader replica
-        for (Replica replica : replicas) {
-          String replicaState = replica.getStr(ZkStateReader.STATE_PROP);
-          if (!ZkStateReader.ACTIVE.equals(replicaState)) {
-            log.info("Replica " + replica.getName() + " is currently " + replicaState);
-            allReplicasUp = false;
-          }
-          
-          if (!leader.equals(replica)) 
-            notLeaders.put(replica.getName(), replica);
-        }
-        
-        if (!allReplicasUp) {
-          try {
-            Thread.sleep(500L);
-          } catch (Exception ignoreMe) {}
-          waitMs += 500L;
-        }
-      }
-    } // end while
-    
-    if (!allReplicasUp) 
-      fail("Didn't see all replicas come up within " + maxWaitMs + 
-          " ms! ClusterState: " + printClusterStateInfo());
-    
-    if (notLeaders.isEmpty()) 
-      fail("Didn't isolate any replicas that are not the leader! ClusterState: " + 
-         printClusterStateInfo());
-    
-    long diffMs = (System.currentTimeMillis() - startMs);
-    log.info("Took " + diffMs + " ms to see all replicas become active.");
-    
-    List<Replica> replicas = new ArrayList<Replica>();
-    replicas.addAll(notLeaders.values());
-    return replicas;
+    up.add(doc);
+    int minAchievedRf =
+        cloudClient.getMinAchievedReplicationFactor(cloudClient.getDefaultCollection(), cloudClient.request(up));
   }
   
   /**
    * Query the real-time get handler for a specific doc by ID to verify it
-   * exists in the provided server.
+   * exists in the provided server, using distrib=false so it doesn't route to another replica.
    */
   @SuppressWarnings("rawtypes")
   protected void assertDocExists(HttpSolrServer solr, String coll, String docId) throws Exception {
-    QueryRequest qr = new QueryRequest(params("qt", "/get", "id", docId));
+    QueryRequest qr = new QueryRequest(params("qt", "/get", "id", docId, "distrib", "false"));
     NamedList rsp = solr.request(qr);
-    String match = 
-        JSONTestUtil.matchObj("/id", rsp.get("doc"), new Integer(docId));
+    String match = JSONTestUtil.matchObj("/id", rsp.get("doc"), new Integer(docId));
     assertTrue("Doc with id=" + docId + " not found in " + solr.getBaseURL()
-        + " due to: " + match, match == null);
+        + " due to: " + match + "; rsp="+rsp, match == null);
   }
-  
-  protected JettySolrRunner getJettyOnPort(int port) {
-    JettySolrRunner theJetty = null;
-    for (JettySolrRunner jetty : jettys) {
-      if (port == jetty.getLocalPort()) {
-        theJetty = jetty;
-        break;
-      }
-    }    
-    
-    if (theJetty == null) {
-      if (controlJetty.getLocalPort() == port) {
-        theJetty = controlJetty;
-      }
-    }
-    
-    if (theJetty == null)
-      fail("Not able to find JettySolrRunner for port: "+port);
-    
-    return theJetty;
-  }
-  
+
   protected int getReplicaPort(Replica replica) {
     String replicaNode = replica.getNodeName();    
     String tmp = replicaNode.substring(replicaNode.indexOf(':')+1);
     if (tmp.indexOf('_') != -1)
       tmp = tmp.substring(0,tmp.indexOf('_'));
     return Integer.parseInt(tmp);    
-  }  
+  }
+
+  protected void waitToSeeReplicasActive(String testCollectionName, String shardId, Set<String> replicasToCheck, int maxWaitSecs) throws Exception {
+    long startMs = System.currentTimeMillis();
+
+    ZkStateReader zkr = cloudClient.getZkStateReader();
+    zkr.updateClusterState(true); // force the state to be fresh
+
+    ClusterState cs = zkr.getClusterState();
+    Collection<Slice> slices = cs.getActiveSlices(testCollectionName);
+    boolean allReplicasUp = false;
+    long waitMs = 0L;
+    long maxWaitMs = maxWaitSecs * 1000L;
+    while (waitMs < maxWaitMs && !allReplicasUp) {
+      // refresh state every 2 secs
+      if (waitMs % 2000 == 0)
+        cloudClient.getZkStateReader().updateClusterState(true);
+
+      cs = cloudClient.getZkStateReader().getClusterState();
+      assertNotNull(cs);
+      Slice shard = cs.getSlice(testCollectionName, shardId);
+      assertNotNull("No Slice for "+shardId, shard);
+      allReplicasUp = true; // assume true
+
+      // wait to see all replicas are "active"
+      for (Replica replica : shard.getReplicas()) {
+        if (!replicasToCheck.contains(replica.getName()))
+          continue;
+
+        String replicaState = replica.getStr(ZkStateReader.STATE_PROP);
+        if (!ZkStateReader.ACTIVE.equals(replicaState)) {
+          log.info("Replica " + replica.getName() + " is currently " + replicaState);
+          allReplicasUp = false;
+        }
+      }
+
+      if (!allReplicasUp) {
+        try {
+          Thread.sleep(1000L);
+        } catch (Exception ignoreMe) {}
+        waitMs += 1000L;
+      }
+    } // end while
+
+    if (!allReplicasUp)
+      fail("Didn't see replicas "+ replicasToCheck +
+          " come up within " + maxWaitMs + " ms! ClusterState: " + printClusterStateInfo(testCollectionName));
+
+    long diffMs = (System.currentTimeMillis() - startMs);
+    log.info("Took " + diffMs + " ms to see replicas ["+replicasToCheck+"] become active.");
+  }
 }

Added: lucene/dev/branches/lucene_solr_4_10/solr/core/src/test/org/apache/solr/cloud/LeaderFailoverAfterPartitionTest.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene_solr_4_10/solr/core/src/test/org/apache/solr/cloud/LeaderFailoverAfterPartitionTest.java?rev=1630196&view=auto
==============================================================================
--- lucene/dev/branches/lucene_solr_4_10/solr/core/src/test/org/apache/solr/cloud/LeaderFailoverAfterPartitionTest.java (added)
+++ lucene/dev/branches/lucene_solr_4_10/solr/core/src/test/org/apache/solr/cloud/LeaderFailoverAfterPartitionTest.java Wed Oct  8 18:41:39 2014
@@ -0,0 +1,182 @@
+package org.apache.solr.cloud;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import org.apache.lucene.util.LuceneTestCase.Slow;
+import org.apache.solr.SolrTestCaseJ4.SuppressSSL;
+import org.apache.solr.client.solrj.embedded.JettySolrRunner;
+import org.apache.solr.client.solrj.request.CollectionAdminRequest;
+import org.apache.solr.common.cloud.Replica;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Tests leader-initiated recovery scenarios after a leader node fails
+ * and one of the replicas is out-of-sync.
+ */
+@Slow
+@SuppressSSL(bugUrl = "https://issues.apache.org/jira/browse/SOLR-5776")
+public class LeaderFailoverAfterPartitionTest extends HttpPartitionTest {
+
+  public LeaderFailoverAfterPartitionTest() {
+    super();
+  }
+
+
+  @Override
+  public void doTest() throws Exception {
+    waitForThingsToLevelOut(30000);
+
+    // kill a leader and make sure recovery occurs as expected
+    testRf3WithLeaderFailover();
+  }
+
+  protected void testRf3WithLeaderFailover() throws Exception {
+    // now let's create a partition in one of the replicas and outright
+    // kill the leader ... see what happens
+    // create a collection that has 1 shard but 3 replicas
+    String testCollectionName = "c8n_1x3_lf"; // _lf is leader fails
+    createCollection(testCollectionName, 1, 3, 1);
+    cloudClient.setDefaultCollection(testCollectionName);
+    
+    sendDoc(1);
+    
+    List<Replica> notLeaders = 
+        ensureAllReplicasAreActive(testCollectionName, "shard1", 1, 3, maxWaitSecsToSeeAllActive);
+    assertTrue("Expected 2 replicas for collection " + testCollectionName
+        + " but found " + notLeaders.size() + "; clusterState: "
+        + printClusterStateInfo(testCollectionName),
+        notLeaders.size() == 2);
+        
+    // ok, now introduce a network partition between the leader and the replica
+    SocketProxy proxy0 = null;
+    proxy0 = getProxyForReplica(notLeaders.get(0));
+    
+    proxy0.close();
+    
+    // indexing during a partition
+    sendDoc(2);
+    
+    Thread.sleep(sleepMsBeforeHealPartition);
+    
+    proxy0.reopen();
+    
+    SocketProxy proxy1 = getProxyForReplica(notLeaders.get(1));
+    
+    proxy1.close();
+    
+    sendDoc(3);
+    
+    Thread.sleep(sleepMsBeforeHealPartition);
+    proxy1.reopen();
+    
+    // sent 4 docs in so far, verify they are on the leader and replica
+    notLeaders = ensureAllReplicasAreActive(testCollectionName, "shard1", 1, 3, maxWaitSecsToSeeAllActive); 
+    
+    sendDoc(4);
+    
+    assertDocsExistInAllReplicas(notLeaders, testCollectionName, 1, 4);    
+        
+    Replica leader = 
+        cloudClient.getZkStateReader().getLeaderRetry(testCollectionName, "shard1");
+    String leaderNode = leader.getNodeName();
+    assertNotNull("Could not find leader for shard1 of "+
+      testCollectionName+"; clusterState: "+printClusterStateInfo(testCollectionName), leader);
+    JettySolrRunner leaderJetty = getJettyOnPort(getReplicaPort(leader));
+    
+    // since maxShardsPerNode is 1, we're safe to kill the leader
+    notLeaders = ensureAllReplicasAreActive(testCollectionName, "shard1", 1, 3, maxWaitSecsToSeeAllActive);    
+    proxy0 = getProxyForReplica(notLeaders.get(0));
+    proxy0.close();
+        
+    // indexing during a partition
+    // doc should be on leader and 1 replica
+    sendDoc(5);
+
+    assertDocExists(getHttpSolrServer(leader, testCollectionName), testCollectionName, "5");
+    assertDocExists(getHttpSolrServer(notLeaders.get(1), testCollectionName), testCollectionName, "5");
+
+    Thread.sleep(sleepMsBeforeHealPartition);
+    
+    String shouldNotBeNewLeaderNode = notLeaders.get(0).getNodeName();
+
+    //chaosMonkey.expireSession(leaderJetty);
+    // kill the leader
+    leaderJetty.stop();
+    if (leaderJetty.isRunning())
+      fail("Failed to stop the leader on "+leaderNode);
+        
+    SocketProxy oldLeaderProxy = getProxyForReplica(leader);
+    if (oldLeaderProxy != null) {
+      oldLeaderProxy.close();      
+    } else {
+      log.warn("No SocketProxy found for old leader node "+leaderNode);      
+    }
+
+    Thread.sleep(10000); // give chance for new leader to be elected.
+    
+    Replica newLeader = 
+        cloudClient.getZkStateReader().getLeaderRetry(testCollectionName, "shard1", 60000);
+        
+    assertNotNull("No new leader was elected after 60 seconds; clusterState: "+
+      printClusterStateInfo(testCollectionName),newLeader);
+        
+    assertTrue("Expected node "+shouldNotBeNewLeaderNode+
+        " to NOT be the new leader b/c it was out-of-sync with the old leader! ClusterState: "+
+        printClusterStateInfo(testCollectionName),
+        !shouldNotBeNewLeaderNode.equals(newLeader.getNodeName()));
+    
+    proxy0.reopen();
+    
+    long timeout = System.nanoTime() + TimeUnit.NANOSECONDS.convert(60, TimeUnit.SECONDS);
+    while (System.nanoTime() < timeout) {
+      cloudClient.getZkStateReader().updateClusterState(true);
+
+      List<Replica> activeReps = getActiveOrRecoveringReplicas(testCollectionName, "shard1");
+      if (activeReps.size() >= 2) break;
+      Thread.sleep(1000);
+    }
+
+    List<Replica> participatingReplicas = getActiveOrRecoveringReplicas(testCollectionName, "shard1");
+    assertTrue("Expected 2 of 3 replicas to be active but only found "+
+            participatingReplicas.size()+"; "+participatingReplicas+"; clusterState: "+
+            printClusterStateInfo(testCollectionName),
+        participatingReplicas.size() >= 2);
+
+    sendDoc(6);
+
+    Set<String> replicasToCheck = new HashSet<>();
+    for (Replica stillUp : participatingReplicas)
+      replicasToCheck.add(stillUp.getName());
+    waitToSeeReplicasActive(testCollectionName, "shard1", replicasToCheck, 20);
+    assertDocsExistInAllReplicas(participatingReplicas, testCollectionName, 1, 6);
+
+    // try to clean up
+    try {
+      CollectionAdminRequest req = new CollectionAdminRequest.Delete();
+      req.setCollectionName(testCollectionName);
+      req.process(cloudClient);
+    } catch (Exception e) {
+      // don't fail the test
+      log.warn("Could not delete collection {} after test completed", testCollectionName);
+    }
+  }
+}
+

Modified: lucene/dev/branches/lucene_solr_4_10/solr/test-framework/src/java/org/apache/solr/cloud/AbstractFullDistribZkTestBase.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene_solr_4_10/solr/test-framework/src/java/org/apache/solr/cloud/AbstractFullDistribZkTestBase.java?rev=1630196&r1=1630195&r2=1630196&view=diff
==============================================================================
--- lucene/dev/branches/lucene_solr_4_10/solr/test-framework/src/java/org/apache/solr/cloud/AbstractFullDistribZkTestBase.java (original)
+++ lucene/dev/branches/lucene_solr_4_10/solr/test-framework/src/java/org/apache/solr/cloud/AbstractFullDistribZkTestBase.java Wed Oct  8 18:41:39 2014
@@ -25,9 +25,12 @@ import static org.apache.solr.common.clo
 
 import java.io.File;
 import java.io.IOException;
+import java.net.ServerSocket;
 import java.net.MalformedURLException;
 import java.net.URI;
+import java.net.URL;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -78,6 +81,8 @@ import org.junit.After;
 import org.junit.AfterClass;
 import org.junit.Before;
 import org.junit.BeforeClass;
+import org.noggit.CharArr;
+import org.noggit.JSONWriter;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -130,6 +135,8 @@ public abstract class AbstractFullDistri
   private boolean cloudInit;
   protected boolean checkCreatedVsState;
   protected boolean useJettyDataDir = true;
+
+  protected Map<URI,SocketProxy> proxies = new HashMap<URI,SocketProxy>();
   
   public static class CloudJettyRunner {
     public JettySolrRunner jetty;
@@ -503,6 +510,85 @@ public abstract class AbstractFullDistri
     return jetty;
   }
 
+  /**
+   * Creates a JettySolrRunner with a socket proxy sitting infront of the Jetty server,
+   * which gives us the ability to simulate network partitions without having to fuss
+   * with IPTables.
+   */
+  public JettySolrRunner createProxiedJetty(File solrHome, String dataDir,
+                                            String shardList, String solrConfigOverride, String schemaOverride)
+      throws Exception {
+
+    JettySolrRunner jetty = new JettySolrRunner(solrHome.getPath(), context,
+        0, solrConfigOverride, schemaOverride, false,
+        getExtraServlets(), sslConfig, getExtraRequestFilters());
+    jetty.setShards(shardList);
+    jetty.setDataDir(getDataDir(dataDir));
+
+    // setup to proxy Http requests to this server unless it is the control
+    // server
+    int proxyPort = getNextAvailablePort();
+    jetty.setProxyPort(proxyPort);
+    jetty.start();
+
+    // create a socket proxy for the jetty server ...
+    SocketProxy proxy = new SocketProxy(proxyPort, jetty.getBaseUrl().toURI());
+    proxies.put(proxy.getUrl(), proxy);
+
+    return jetty;
+  }
+
+  protected int getReplicaPort(Replica replica) {
+    String replicaNode = replica.getNodeName();
+    String tmp = replicaNode.substring(replicaNode.indexOf(':')+1);
+    if (tmp.indexOf('_') != -1)
+      tmp = tmp.substring(0,tmp.indexOf('_'));
+    return Integer.parseInt(tmp);
+  }
+
+  protected JettySolrRunner getJettyOnPort(int port) {
+    JettySolrRunner theJetty = null;
+    for (JettySolrRunner jetty : jettys) {
+      if (port == jetty.getLocalPort()) {
+        theJetty = jetty;
+        break;
+      }
+    }
+
+    if (theJetty == null) {
+      if (controlJetty.getLocalPort() == port) {
+        theJetty = controlJetty;
+      }
+    }
+
+    if (theJetty == null)
+      fail("Not able to find JettySolrRunner for port: "+port);
+
+    return theJetty;
+  }
+
+  protected SocketProxy getProxyForReplica(Replica replica) throws Exception {
+    String replicaBaseUrl = replica.getStr(ZkStateReader.BASE_URL_PROP);
+    assertNotNull(replicaBaseUrl);
+    URL baseUrl = new URL(replicaBaseUrl);
+
+    SocketProxy proxy = proxies.get(baseUrl.toURI());
+    if (proxy == null && !baseUrl.toExternalForm().endsWith("/")) {
+      baseUrl = new URL(baseUrl.toExternalForm() + "/");
+      proxy = proxies.get(baseUrl.toURI());
+    }
+    assertNotNull("No proxy found for " + baseUrl + "!", proxy);
+    return proxy;
+  }
+
+  protected int getNextAvailablePort() throws Exception {
+    int port = -1;
+    try (ServerSocket s = new ServerSocket(0)) {
+      port = s.getLocalPort();
+    }
+    return port;
+  }
+
   private File getRelativeSolrHomePath(File solrHome) {
     String path = SolrResourceLoader.normalizeDir(new File(".").getAbsolutePath());
     String base = new File(solrHome.getPath()).getAbsolutePath();
@@ -1455,6 +1541,13 @@ public abstract class AbstractFullDistri
     
     System.clearProperty("zkHost");
     System.clearProperty("numShards");
+
+    // close socket proxies after super.tearDown
+    if (!proxies.isEmpty()) {
+      for (SocketProxy proxy : proxies.values()) {
+        proxy.close();
+      }
+    }
   }
   
   @Override
@@ -1769,4 +1862,92 @@ public abstract class AbstractFullDistri
     baseServer.shutdown();
     return r;
   }
+
+  protected List<Replica> ensureAllReplicasAreActive(String testCollectionName, String shardId, int shards, int rf, int maxWaitSecs) throws Exception {
+    long startMs = System.currentTimeMillis();
+
+    Map<String,Replica> notLeaders = new HashMap<String,Replica>();
+
+    ZkStateReader zkr = cloudClient.getZkStateReader();
+    zkr.updateClusterState(true); // force the state to be fresh
+
+    ClusterState cs = zkr.getClusterState();
+    Collection<Slice> slices = cs.getActiveSlices(testCollectionName);
+    assertTrue(slices.size() == shards);
+    boolean allReplicasUp = false;
+    long waitMs = 0L;
+    long maxWaitMs = maxWaitSecs * 1000L;
+    Replica leader = null;
+    while (waitMs < maxWaitMs && !allReplicasUp) {
+      // refresh state every 2 secs
+      if (waitMs % 2000 == 0)
+        cloudClient.getZkStateReader().updateClusterState(true);
+
+      cs = cloudClient.getZkStateReader().getClusterState();
+      assertNotNull(cs);
+      Slice shard = cs.getSlice(testCollectionName, shardId);
+      assertNotNull("No Slice for "+shardId, shard);
+      allReplicasUp = true; // assume true
+      Collection<Replica> replicas = shard.getReplicas();
+      assertTrue(replicas.size() == rf);
+      leader = shard.getLeader();
+      assertNotNull(leader);
+      log.info("Found "+replicas.size()+" replicas and leader on "+
+          leader.getNodeName()+" for "+shardId+" in "+testCollectionName);
+
+      // ensure all replicas are "active" and identify the non-leader replica
+      for (Replica replica : replicas) {
+        String replicaState = replica.getStr(ZkStateReader.STATE_PROP);
+        if (!ZkStateReader.ACTIVE.equals(replicaState)) {
+          log.info("Replica " + replica.getName() + " is currently " + replicaState);
+          allReplicasUp = false;
+        }
+
+        if (!leader.equals(replica))
+          notLeaders.put(replica.getName(), replica);
+      }
+
+      if (!allReplicasUp) {
+        try {
+          Thread.sleep(500L);
+        } catch (Exception ignoreMe) {}
+        waitMs += 500L;
+      }
+    } // end while
+
+    if (!allReplicasUp)
+      fail("Didn't see all replicas for shard "+shardId+" in "+testCollectionName+
+          " come up within " + maxWaitMs + " ms! ClusterState: " + printClusterStateInfo());
+
+    if (notLeaders.isEmpty())
+      fail("Didn't isolate any replicas that are not the leader! ClusterState: " + printClusterStateInfo());
+
+    long diffMs = (System.currentTimeMillis() - startMs);
+    log.info("Took " + diffMs + " ms to see all replicas become active.");
+
+    List<Replica> replicas = new ArrayList<Replica>();
+    replicas.addAll(notLeaders.values());
+    return replicas;
+  }
+
+  protected String printClusterStateInfo() throws Exception {
+    return printClusterStateInfo(null);
+  }
+
+  protected String printClusterStateInfo(String collection) throws Exception {
+    cloudClient.getZkStateReader().updateClusterState(true);
+    String cs = null;
+    ClusterState clusterState = cloudClient.getZkStateReader().getClusterState();
+    if (collection != null) {
+      cs = clusterState.getCollection(collection).toString();
+    } else {
+      Map<String,DocCollection> map = new HashMap<String,DocCollection>();
+      for (String coll : clusterState.getCollections())
+        map.put(coll, clusterState.getCollection(coll));
+      CharArr out = new CharArr();
+      new JSONWriter(out, 2).write(map);
+      cs = out.toString();
+    }
+    return cs;
+  }
 }

Added: lucene/dev/branches/lucene_solr_4_10/solr/test-framework/src/java/org/apache/solr/cloud/SocketProxy.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/lucene_solr_4_10/solr/test-framework/src/java/org/apache/solr/cloud/SocketProxy.java?rev=1630196&view=auto
==============================================================================
--- lucene/dev/branches/lucene_solr_4_10/solr/test-framework/src/java/org/apache/solr/cloud/SocketProxy.java (added)
+++ lucene/dev/branches/lucene_solr_4_10/solr/test-framework/src/java/org/apache/solr/cloud/SocketProxy.java Wed Oct  8 18:41:39 2014
@@ -0,0 +1,407 @@
+package org.apache.solr.cloud;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.InetSocketAddress;
+import java.net.ServerSocket;
+import java.net.Socket;
+import java.net.SocketException;
+import java.net.SocketTimeoutException;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+import javax.net.ssl.SSLServerSocketFactory;
+import javax.net.ssl.SSLSocketFactory;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Kindly borrowed the idea and base implementation from the ActiveMQ project;
+ * useful for blocking traffic on a specified port.
+ */
+public class SocketProxy {
+  
+  private static final transient Logger log = LoggerFactory.getLogger(SocketProxy.class);
+  
+  public static final int ACCEPT_TIMEOUT_MILLIS = 100;
+  
+  private URI proxyUrl;
+  private URI target;
+  
+  private Acceptor acceptor;
+  private ServerSocket serverSocket;
+  
+  private CountDownLatch closed = new CountDownLatch(1);
+  
+  public List<Bridge> connections = new LinkedList<Bridge>();
+  
+  private int listenPort = 0;
+  
+  private int receiveBufferSize = -1;
+  
+  private boolean pauseAtStart = false;
+  
+  private int acceptBacklog = 50;
+  
+  public SocketProxy() throws Exception {}
+  
+  public SocketProxy(URI uri) throws Exception {
+    this(0, uri);
+  }
+  
+  public SocketProxy(int port, URI uri) throws Exception {
+    listenPort = port;
+    target = uri;
+    open();
+  }
+  
+  public String toString() {
+    return "SocketyProxy: port="+listenPort+"; target="+target;
+  }
+    
+  public void setReceiveBufferSize(int receiveBufferSize) {
+    this.receiveBufferSize = receiveBufferSize;
+  }
+  
+  public void setTarget(URI tcpBrokerUri) {
+    target = tcpBrokerUri;
+  }
+  
+  public void open() throws Exception {
+    serverSocket = createServerSocket(target);
+    serverSocket.setReuseAddress(true);
+    if (receiveBufferSize > 0) {
+      serverSocket.setReceiveBufferSize(receiveBufferSize);
+    }
+    if (proxyUrl == null) {
+      serverSocket.bind(new InetSocketAddress(listenPort), acceptBacklog);
+      proxyUrl = urlFromSocket(target, serverSocket);
+    } else {
+      serverSocket.bind(new InetSocketAddress(proxyUrl.getPort()));
+    }
+    acceptor = new Acceptor(serverSocket, target);
+    if (pauseAtStart) {
+      acceptor.pause();
+    }
+    new Thread(null, acceptor, "SocketProxy-Acceptor-"
+        + serverSocket.getLocalPort()).start();
+    closed = new CountDownLatch(1);
+  }
+  
+  private boolean isSsl(URI target) {
+    return "ssl".equals(target.getScheme());
+  }
+  
+  private ServerSocket createServerSocket(URI target) throws Exception {
+    if (isSsl(target)) {
+      return SSLServerSocketFactory.getDefault().createServerSocket();
+    }
+    return new ServerSocket();
+  }
+  
+  private Socket createSocket(URI target) throws Exception {
+    if (isSsl(target)) {
+      return SSLSocketFactory.getDefault().createSocket();
+    }
+    return new Socket();
+  }
+  
+  public URI getUrl() {
+    return proxyUrl;
+  }
+  
+  /*
+   * close all proxy connections and acceptor
+   */
+  public void close() {
+    List<Bridge> connections;
+    synchronized (this.connections) {
+      connections = new ArrayList<Bridge>(this.connections);
+    }
+    log.warn("Closing " + connections.size()+" connections to: "+getUrl());
+    for (Bridge con : connections) {
+      closeConnection(con);
+    }
+    acceptor.close();
+    closed.countDown();
+  }
+  
+  /*
+   * close all proxy receive connections, leaving acceptor open
+   */
+  public void halfClose() {
+    List<Bridge> connections;
+    synchronized (this.connections) {
+      connections = new ArrayList<Bridge>(this.connections);
+    }
+    log.info("halfClose, numConnections=" + connections.size());
+    for (Bridge con : connections) {
+      halfCloseConnection(con);
+    }
+  }
+  
+  public boolean waitUntilClosed(long timeoutSeconds)
+      throws InterruptedException {
+    return closed.await(timeoutSeconds, TimeUnit.SECONDS);
+  }
+  
+  /*
+   * called after a close to restart the acceptor on the same port
+   */
+  public void reopen() {
+    log.info("Re-opening connectivity to "+getUrl());
+    try {
+      open();
+    } catch (Exception e) {
+      log.debug("exception on reopen url:" + getUrl(), e);
+    }
+  }
+  
+  /*
+   * pause accepting new connections and data transfer through existing proxy
+   * connections. All sockets remain open
+   */
+  public void pause() {
+    synchronized (connections) {
+      log.info("pause, numConnections=" + connections.size());
+      acceptor.pause();
+      for (Bridge con : connections) {
+        con.pause();
+      }
+    }
+  }
+  
+  /*
+   * continue after pause
+   */
+  public void goOn() {
+    synchronized (connections) {
+      log.info("goOn, numConnections=" + connections.size());
+      for (Bridge con : connections) {
+        con.goOn();
+      }
+    }
+    acceptor.goOn();
+  }
+  
+  private void closeConnection(Bridge c) {
+    try {
+      c.close();
+    } catch (Exception e) {
+      log.debug("exception on close of: " + c, e);
+    }
+  }
+  
+  private void halfCloseConnection(Bridge c) {
+    try {
+      c.halfClose();
+    } catch (Exception e) {
+      log.debug("exception on half close of: " + c, e);
+    }
+  }
+  
+  public boolean isPauseAtStart() {
+    return pauseAtStart;
+  }
+  
+  public void setPauseAtStart(boolean pauseAtStart) {
+    this.pauseAtStart = pauseAtStart;
+  }
+  
+  public int getAcceptBacklog() {
+    return acceptBacklog;
+  }
+  
+  public void setAcceptBacklog(int acceptBacklog) {
+    this.acceptBacklog = acceptBacklog;
+  }
+  
+  private URI urlFromSocket(URI uri, ServerSocket serverSocket)
+      throws Exception {
+    int listenPort = serverSocket.getLocalPort();
+    
+    return new URI(uri.getScheme(), uri.getUserInfo(), uri.getHost(),
+        listenPort, uri.getPath(), uri.getQuery(), uri.getFragment());
+  }
+  
+  public class Bridge {
+    
+    private Socket receiveSocket;
+    private Socket sendSocket;
+    private Pump requestThread;
+    private Pump responseThread;
+    
+    public Bridge(Socket socket, URI target) throws Exception {
+      receiveSocket = socket;
+      sendSocket = createSocket(target);
+      if (receiveBufferSize > 0) {
+        sendSocket.setReceiveBufferSize(receiveBufferSize);
+      }
+      sendSocket.connect(new InetSocketAddress(target.getHost(), target
+          .getPort()));
+      linkWithThreads(receiveSocket, sendSocket);
+      log.info("proxy connection " + sendSocket + ", receiveBufferSize="
+          + sendSocket.getReceiveBufferSize());
+    }
+    
+    public void goOn() {
+      responseThread.goOn();
+      requestThread.goOn();
+    }
+    
+    public void pause() {
+      requestThread.pause();
+      responseThread.pause();
+    }
+    
+    public void close() throws Exception {
+      synchronized (connections) {
+        connections.remove(this);
+      }
+      receiveSocket.close();
+      sendSocket.close();
+    }
+    
+    public void halfClose() throws Exception {
+      receiveSocket.close();
+    }
+    
+    private void linkWithThreads(Socket source, Socket dest) {
+      requestThread = new Pump(source, dest);
+      requestThread.start();
+      responseThread = new Pump(dest, source);
+      responseThread.start();
+    }
+    
+    public class Pump extends Thread {
+      
+      protected Socket src;
+      private Socket destination;
+      private AtomicReference<CountDownLatch> pause = new AtomicReference<CountDownLatch>();
+      
+      public Pump(Socket source, Socket dest) {
+        super("SocketProxy-DataTransfer-" + source.getPort() + ":"
+            + dest.getPort());
+        src = source;
+        destination = dest;
+        pause.set(new CountDownLatch(0));
+      }
+      
+      public void pause() {
+        pause.set(new CountDownLatch(1));
+      }
+      
+      public void goOn() {
+        pause.get().countDown();
+      }
+      
+      public void run() {
+        byte[] buf = new byte[1024];
+        try {
+          InputStream in = src.getInputStream();
+          OutputStream out = destination.getOutputStream();
+          while (true) {
+            int len = in.read(buf);
+            if (len == -1) {
+              log.debug("read eof from:" + src);
+              break;
+            }
+            pause.get().await();
+            out.write(buf, 0, len);
+          }
+        } catch (Exception e) {
+          log.debug("read/write failed, reason: " + e.getLocalizedMessage());
+          try {
+            if (!receiveSocket.isClosed()) {
+              // for halfClose, on read/write failure if we close the
+              // remote end will see a close at the same time.
+              close();
+            }
+          } catch (Exception ignore) {}
+        }
+      }
+    }
+  }
+  
+  public class Acceptor implements Runnable {
+    
+    private ServerSocket socket;
+    private URI target;
+    private AtomicReference<CountDownLatch> pause = new AtomicReference<CountDownLatch>();
+    
+    public Acceptor(ServerSocket serverSocket, URI uri) {
+      socket = serverSocket;
+      target = uri;
+      pause.set(new CountDownLatch(0));
+      try {
+        socket.setSoTimeout(ACCEPT_TIMEOUT_MILLIS);
+      } catch (SocketException e) {
+        e.printStackTrace();
+      }
+    }
+    
+    public void pause() {
+      pause.set(new CountDownLatch(1));
+    }
+    
+    public void goOn() {
+      pause.get().countDown();
+    }
+    
+    public void run() {
+      try {
+        while (!socket.isClosed()) {
+          pause.get().await();
+          try {
+            Socket source = socket.accept();
+            pause.get().await();
+            if (receiveBufferSize > 0) {
+              source.setReceiveBufferSize(receiveBufferSize);
+            }
+            log.info("accepted " + source + ", receiveBufferSize:"
+                + source.getReceiveBufferSize());
+            synchronized (connections) {
+              connections.add(new Bridge(source, target));
+            }
+          } catch (SocketTimeoutException expected) {}
+        }
+      } catch (Exception e) {
+        log.debug("acceptor: finished for reason: " + e.getLocalizedMessage());
+      }
+    }
+    
+    public void close() {
+      try {
+        socket.close();
+        closed.countDown();
+        goOn();
+      } catch (IOException ignored) {}
+    }
+  }
+  
+}
+