You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by th...@apache.org on 2014/09/24 17:21:47 UTC

svn commit: r1627347 - in /lucene/dev/trunk/solr: ./ core/src/java/org/apache/solr/cloud/ core/src/java/org/apache/solr/update/processor/ core/src/test/org/apache/solr/cloud/ test-framework/src/java/org/apache/solr/cloud/

Author: thelabdude
Date: Wed Sep 24 15:21:47 2014
New Revision: 1627347

URL: http://svn.apache.org/r1627347
Log:
SOLR-6511: Fencepost error in LeaderInitiatedRecoveryThread; refactor HttpPartitionTest to resolve jenkins failures.

Added:
    lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/LeaderFailoverAfterPartitionTest.java   (with props)
    lucene/dev/trunk/solr/test-framework/src/java/org/apache/solr/cloud/SocketProxy.java
      - copied unchanged from r1627328, lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/SocketProxy.java
Removed:
    lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/SocketProxy.java
Modified:
    lucene/dev/trunk/solr/CHANGES.txt
    lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java
    lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/LeaderInitiatedRecoveryThread.java
    lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java
    lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/HttpPartitionTest.java
    lucene/dev/trunk/solr/test-framework/src/java/org/apache/solr/cloud/AbstractFullDistribZkTestBase.java

Modified: lucene/dev/trunk/solr/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/CHANGES.txt?rev=1627347&r1=1627346&r2=1627347&view=diff
==============================================================================
--- lucene/dev/trunk/solr/CHANGES.txt (original)
+++ lucene/dev/trunk/solr/CHANGES.txt Wed Sep 24 15:21:47 2014
@@ -201,6 +201,8 @@ Bug Fixes
 
 * SOLR-6509: Solr start scripts interactive mode doesn't honor -z argument (Timothy Potter)
 
+* SOLR-6511: Fencepost error in LeaderInitiatedRecoveryThread (Timothy Potter)
+
 
 Other Changes
 ----------------------

Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java?rev=1627347&r1=1627346&r2=1627347&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java Wed Sep 24 15:21:47 2014
@@ -387,7 +387,8 @@ final class ShardLeaderElectionContext e
                                                     collection,
                                                     shardId,
                                                     coreNodeProps,
-                                                    120);
+                                                    120,
+                                                    coreNodeName);
               zkController.ensureReplicaInLeaderInitiatedRecovery(
                   collection, shardId, coreNodeProps.getCoreUrl(), coreNodeProps, false);
               

Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/LeaderInitiatedRecoveryThread.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/LeaderInitiatedRecoveryThread.java?rev=1627347&r1=1627346&r2=1627347&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/LeaderInitiatedRecoveryThread.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/LeaderInitiatedRecoveryThread.java Wed Sep 24 15:21:47 2014
@@ -50,13 +50,15 @@ public class LeaderInitiatedRecoveryThre
   protected String shardId;
   protected ZkCoreNodeProps nodeProps;
   protected int maxTries;
+  protected String leaderCoreNodeName;
   
   public LeaderInitiatedRecoveryThread(ZkController zkController, 
                                        CoreContainer cc, 
                                        String collection, 
                                        String shardId, 
                                        ZkCoreNodeProps nodeProps,
-                                       int maxTries)
+                                       int maxTries,
+                                       String leaderCoreNodeName)
   {
     super("LeaderInitiatedRecoveryThread-"+nodeProps.getCoreName());
     this.zkController = zkController;
@@ -65,6 +67,7 @@ public class LeaderInitiatedRecoveryThre
     this.shardId = shardId;    
     this.nodeProps = nodeProps;
     this.maxTries = maxTries;
+    this.leaderCoreNodeName = leaderCoreNodeName;
     
     setDaemon(true);
   }
@@ -103,7 +106,7 @@ public class LeaderInitiatedRecoveryThre
     recoverRequestCmd.setAction(CoreAdminAction.REQUESTRECOVERY);
     recoverRequestCmd.setCoreName(coreNeedingRecovery);
     
-    while (continueTrying && ++tries < maxTries) {
+    while (continueTrying && ++tries <= maxTries) {
       if (tries > 1) {
         log.warn("Asking core={} coreNodeName={} on " + recoveryUrl +
             " to recover; unsuccessful after "+tries+" of "+maxTries+" attempts so far ...", coreNeedingRecovery, replicaCoreNodeName);
@@ -150,7 +153,7 @@ public class LeaderInitiatedRecoveryThre
         
         if (coreContainer.isShutDown()) {
           log.warn("Stop trying to send recovery command to downed replica core={} coreNodeName={} on "
-              + replicaNodeName + " because my core container is close.", coreNeedingRecovery, replicaCoreNodeName);
+              + replicaNodeName + " because my core container is closed.", coreNeedingRecovery, replicaCoreNodeName);
           continueTrying = false;
           break;
         }
@@ -170,6 +173,24 @@ public class LeaderInitiatedRecoveryThre
           break;
         }
 
+        // stop trying if I'm no longer the leader
+        if (leaderCoreNodeName != null && collection != null) {
+          String leaderCoreNodeNameFromZk = null;
+          try {
+            leaderCoreNodeNameFromZk = zkController.getZkStateReader().getLeaderRetry(collection, shardId, 1000).getName();
+          } catch (Exception exc) {
+            log.error("Failed to determine if " + leaderCoreNodeName + " is still the leader for " + collection +
+                " " + shardId + " before starting leader-initiated recovery thread for " + replicaUrl + " due to: " + exc);
+          }
+          if (!leaderCoreNodeName.equals(leaderCoreNodeNameFromZk)) {
+            log.warn("Stop trying to send recovery command to downed replica core=" + coreNeedingRecovery +
+                ",coreNodeName=" + replicaCoreNodeName + " on " + replicaNodeName + " because " +
+                leaderCoreNodeName + " is no longer the leader! New leader is " + leaderCoreNodeNameFromZk);
+            continueTrying = false;
+            break;
+          }
+        }
+
         // additional safeguard against the replica trying to be in the active state
         // before acknowledging the leader initiated recovery command
         if (continueTrying && collection != null && shardId != null) {

Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java?rev=1627347&r1=1627346&r2=1627347&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java Wed Sep 24 15:21:47 2014
@@ -595,7 +595,9 @@ public class DistributedUpdateProcessor 
         String fromCollection = req.getParams().get(DISTRIB_FROM_COLLECTION); // is it because of a routing rule?
         if (fromCollection == null)  {
           log.error("Request says it is coming from leader, but we are the leader: " + req.getParamString());
-          throw new SolrException(ErrorCode.SERVICE_UNAVAILABLE, "Request says it is coming from leader, but we are the leader");
+          SolrException solrExc = new SolrException(ErrorCode.SERVICE_UNAVAILABLE, "Request says it is coming from leader, but we are the leader");
+          solrExc.setMetadata("cause", "LeaderChanged");
+          throw solrExc;
         }
       }
     }
@@ -805,57 +807,92 @@ public class DistributedUpdateProcessor 
           DistribPhase.parseParam(error.req.uReq.getParams().get(DISTRIB_UPDATE_PARAM));       
       if (phase != DistribPhase.FROMLEADER)
         continue; // don't have non-leaders try to recovery other nodes
-      
-      final String replicaUrl = error.req.node.getUrl();      
+
+      final String replicaUrl = error.req.node.getUrl();
+
+      // if the remote replica failed the request because of leader change (SOLR-6511), then fail the request
+      String cause = (error.e instanceof SolrException) ? ((SolrException)error.e).getMetadata("cause") : null;
+      if ("LeaderChanged".equals(cause)) {
+        // let's just fail this request and let the client retry? or just call processAdd again?
+        log.error("On "+cloudDesc.getCoreNodeName()+", replica "+replicaUrl+
+            " now thinks it is the leader! Failing the request to let the client retry! "+error.e);
+        rsp.setException(error.e);
+        break;
+      }
 
       int maxTries = 1;       
       boolean sendRecoveryCommand = true;
       String collection = null;
       String shardId = null;
-      
+
       if (error.req.node instanceof StdNode) {
         StdNode stdNode = (StdNode)error.req.node;
         collection = stdNode.getCollection();
         shardId = stdNode.getShardId();
+
+        // before we go setting other replicas to down, make sure we're still the leader!
+        String leaderCoreNodeName = null;
         try {
-          // if false, then the node is probably not "live" anymore
-          sendRecoveryCommand = 
-              zkController.ensureReplicaInLeaderInitiatedRecovery(collection, 
-                                                                  shardId, 
-                                                                  replicaUrl, 
-                                                                  stdNode.getNodeProps(), 
-                                                                  false);
-          
-          // we want to try more than once, ~10 minutes
-          if (sendRecoveryCommand) {
-            maxTries = 120;
-          } // else the node is no longer "live" so no need to send any recovery command
-          
-        } catch (Exception e) {
-          log.error("Leader failed to set replica "+
-              error.req.node.getUrl()+" state to DOWN due to: "+e, e);
+          leaderCoreNodeName = zkController.getZkStateReader().getLeaderRetry(collection, shardId).getName();
+        } catch (Exception exc) {
+          log.error("Failed to determine if " + cloudDesc.getCoreNodeName() + " is still the leader for " + collection +
+              " " + shardId + " before putting " + replicaUrl + " into leader-initiated recovery due to: " + exc);
+        }
+
+        if (cloudDesc.getCoreNodeName().equals(leaderCoreNodeName)) {
+          try {
+            // if false, then the node is probably not "live" anymore
+            sendRecoveryCommand =
+                zkController.ensureReplicaInLeaderInitiatedRecovery(collection,
+                    shardId,
+                    replicaUrl,
+                    stdNode.getNodeProps(),
+                    false);
+
+            // we want to try more than once, ~10 minutes
+            if (sendRecoveryCommand) {
+              maxTries = 120;
+            } // else the node is no longer "live" so no need to send any recovery command
+
+          } catch (KeeperException.SessionExpiredException see) {
+            log.error("Leader failed to set replica " +
+                error.req.node.getUrl() + " state to DOWN due to: " + see, see);
+            // our session is expired, which means our state is suspect, so don't go
+            // putting other replicas in recovery (see SOLR-6511)
+            sendRecoveryCommand = false;
+          } catch (Exception e) {
+            log.error("Leader failed to set replica " +
+                error.req.node.getUrl() + " state to DOWN due to: " + e, e);
+            // will go ahead and try to send the recovery command once after this error
+          }
+        } else {
+          // not the leader anymore maybe?
+          sendRecoveryCommand = false;
+          log.warn("Core "+cloudDesc.getCoreNodeName()+" is no longer the leader for "+collection+" "+
+              shardId+", no request recovery command will be sent!");
         }
       } // else not a StdNode, recovery command still gets sent once
             
       if (!sendRecoveryCommand)
         continue; // the replica is already in recovery handling or is not live   
-      
-      Throwable rootCause = SolrException.getRootCause(error.e);      
-      log.error("Setting up to try to start recovery on replica "+replicaUrl+" after: "+rootCause);
-      
+
+      Throwable rootCause = SolrException.getRootCause(error.e);
+      log.error("Setting up to try to start recovery on replica " + replicaUrl + " after: " + rootCause);
+
       // try to send the recovery command to the downed replica in a background thread
-      CoreContainer coreContainer = req.getCore().getCoreDescriptor().getCoreContainer();      
-      LeaderInitiatedRecoveryThread lirThread = 
+      CoreContainer coreContainer = req.getCore().getCoreDescriptor().getCoreContainer();
+      LeaderInitiatedRecoveryThread lirThread =
           new LeaderInitiatedRecoveryThread(zkController,
-                                            coreContainer,
-                                            collection,
-                                            shardId,
-                                            error.req.node.getNodeProps(),
-                                            maxTries);
+              coreContainer,
+              collection,
+              shardId,
+              error.req.node.getNodeProps(),
+              maxTries,
+              cloudDesc.getCoreNodeName()); // core node name of current leader
       ExecutorService executor = coreContainer.getUpdateShardHandler().getUpdateExecutor();
-      executor.execute(lirThread);      
+      executor.execute(lirThread);
     }
-    
+
     if (replicationTracker != null) {
       rsp.getResponseHeader().add(UpdateRequest.REPFACT, replicationTracker.getAchievedRf());
       rsp.getResponseHeader().add(UpdateRequest.MIN_REPFACT, replicationTracker.minRf);

Modified: lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/HttpPartitionTest.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/HttpPartitionTest.java?rev=1627347&r1=1627346&r2=1627347&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/HttpPartitionTest.java (original)
+++ lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/HttpPartitionTest.java Wed Sep 24 15:21:47 2014
@@ -18,9 +18,6 @@ package org.apache.solr.cloud;
  */
 
 import java.io.File;
-import java.net.ServerSocket;
-import java.net.URI;
-import java.net.URL;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
@@ -30,7 +27,6 @@ import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
 
-import org.apache.lucene.util.LuceneTestCase.AwaitsFix;
 import org.apache.lucene.util.LuceneTestCase.Slow;
 import org.apache.solr.JSONTestUtil;
 import org.apache.solr.SolrTestCaseJ4.SuppressSSL;
@@ -39,6 +35,7 @@ import org.apache.solr.client.solrj.impl
 import org.apache.solr.client.solrj.request.CollectionAdminRequest;
 import org.apache.solr.client.solrj.request.QueryRequest;
 import org.apache.solr.client.solrj.request.UpdateRequest;
+import org.apache.solr.common.SolrException;
 import org.apache.solr.common.SolrInputDocument;
 import org.apache.solr.common.cloud.ClusterState;
 import org.apache.solr.common.cloud.Replica;
@@ -55,26 +52,24 @@ import org.slf4j.LoggerFactory;
  * Simulates HTTP partitions between a leader and replica but the replica does
  * not lose its ZooKeeper connection.
  */
+
 @Slow
 @SuppressSSL(bugUrl = "https://issues.apache.org/jira/browse/SOLR-5776")
-@AwaitsFix(bugUrl = "https://issues.apache.org/jira/browse/SOLR-6241")
 public class HttpPartitionTest extends AbstractFullDistribZkTestBase {
   
-  private static final transient Logger log = 
+  protected static final transient Logger log =
       LoggerFactory.getLogger(HttpPartitionTest.class);
   
   // To prevent the test assertions firing too fast before cluster state
   // recognizes (and propagates) partitions
-  private static final long sleepMsBeforeHealPartition = 2000L;
-  
-  private static final int maxWaitSecsToSeeAllActive = 30;
-  
-  private Map<URI,SocketProxy> proxies = new HashMap<URI,SocketProxy>();
-  
+  protected static final long sleepMsBeforeHealPartition = 2000L;
+
+  protected static final int maxWaitSecsToSeeAllActive = 30;
+
   public HttpPartitionTest() {
     super();
     sliceCount = 2;
-    shardCount = 2;
+    shardCount = 3;
   }
   
   @Before
@@ -87,60 +82,24 @@ public class HttpPartitionTest extends A
   @Override
   @After
   public void tearDown() throws Exception {    
-    System.clearProperty("numShards");
-    
     try {
       super.tearDown();
     } catch (Exception exc) {}
     
     resetExceptionIgnores();
-    
-    // close socket proxies after super.tearDown
-    if (!proxies.isEmpty()) {
-      for (SocketProxy proxy : proxies.values()) {
-        proxy.close();
-      }
-    }    
   }
   
   /**
-   * Overrides the parent implementation so that we can configure a socket proxy
-   * to sit infront of each Jetty server, which gives us the ability to simulate
-   * network partitions without having to fuss with IPTables (which is not very
-   * cross platform friendly).
+   * Overrides the parent implementation to install a SocketProxy in-front of the Jetty server.
    */
   @Override
   public JettySolrRunner createJetty(File solrHome, String dataDir,
       String shardList, String solrConfigOverride, String schemaOverride)
-      throws Exception {
-    
-    JettySolrRunner jetty = new JettySolrRunner(solrHome.getPath(), context,
-        0, solrConfigOverride, schemaOverride, false,
-        getExtraServlets(), sslConfig, getExtraRequestFilters());
-    jetty.setShards(shardList);
-    jetty.setDataDir(getDataDir(dataDir));      
-    
-    // setup to proxy Http requests to this server unless it is the control
-    // server
-    int proxyPort = getNextAvailablePort();
-    jetty.setProxyPort(proxyPort);        
-    jetty.start();        
-    
-    // create a socket proxy for the jetty server ...
-    SocketProxy proxy = new SocketProxy(proxyPort, jetty.getBaseUrl().toURI());
-    proxies.put(proxy.getUrl(), proxy);
-    
-    return jetty;
+      throws Exception
+  {
+    return createProxiedJetty(solrHome, dataDir, shardList, solrConfigOverride, schemaOverride);
   }
-  
-  protected int getNextAvailablePort() throws Exception {    
-    int port = -1;
-    try (ServerSocket s = new ServerSocket(0)) {
-      port = s.getLocalPort();
-    }
-    return port;
-  }
-   
+
   @Override
   public void doTest() throws Exception {
     waitForThingsToLevelOut(30000);
@@ -148,12 +107,16 @@ public class HttpPartitionTest extends A
     // test a 1x2 collection
     testRf2();
 
+    waitForThingsToLevelOut(30000);
+
     // now do similar for a 1x3 collection while taking 2 replicas on-and-off
     // each time
     testRf3();
 
-    // kill a leader and make sure recovery occurs as expected
-    testRf3WithLeaderFailover();
+    waitForThingsToLevelOut(30000);
+
+    // have the leader lose its Zk session temporarily
+    testLeaderZkSessionLoss();
   }
   
   protected void testRf2() throws Exception {
@@ -247,11 +210,9 @@ public class HttpPartitionTest extends A
         ensureAllReplicasAreActive(testCollectionName, "shard1", 1, 3, maxWaitSecsToSeeAllActive);
     assertTrue("Expected 2 replicas for collection " + testCollectionName
         + " but found " + notLeaders.size() + "; clusterState: "
-        + printClusterStateInfo(),
+        + printClusterStateInfo(testCollectionName),
         notLeaders.size() == 2);
-    
-    sendDoc(1);
-    
+
     // ok, now introduce a network partition between the leader and the replica
     SocketProxy proxy0 = getProxyForReplica(notLeaders.get(0));
     
@@ -289,127 +250,112 @@ public class HttpPartitionTest extends A
       log.warn("Could not delete collection {} after test completed", testCollectionName);
     }
   }
-  
-  protected void testRf3WithLeaderFailover() throws Exception {
-    // now let's create a partition in one of the replicas and outright
-    // kill the leader ... see what happens
-    // create a collection that has 1 shard but 3 replicas
-    String testCollectionName = "c8n_1x3_lf"; // _lf is leader fails
-    createCollection(testCollectionName, 1, 3, 1);
+
+  // test inspired by SOLR-6511
+  protected void testLeaderZkSessionLoss() throws Exception {
+
+    String testCollectionName = "c8n_1x2_leader_session_loss";
+    createCollection(testCollectionName, 1, 2, 1);
     cloudClient.setDefaultCollection(testCollectionName);
-    
-    sendDoc(1);
-    
-    List<Replica> notLeaders = 
-        ensureAllReplicasAreActive(testCollectionName, "shard1", 1, 3, maxWaitSecsToSeeAllActive);
-    assertTrue("Expected 2 replicas for collection " + testCollectionName
-        + " but found " + notLeaders.size() + "; clusterState: "
-        + printClusterStateInfo(),
-        notLeaders.size() == 2);
-        
+
     sendDoc(1);
-    
-    // ok, now introduce a network partition between the leader and the replica
-    SocketProxy proxy0 = null;
-    proxy0 = getProxyForReplica(notLeaders.get(0));
-    
-    proxy0.close();
-    
-    // indexing during a partition
-    sendDoc(2);
-    
-    Thread.sleep(sleepMsBeforeHealPartition);
-    
-    proxy0.reopen();
-    
-    SocketProxy proxy1 = getProxyForReplica(notLeaders.get(1));
-    
-    proxy1.close();
-    
-    sendDoc(3);
-    
-    Thread.sleep(sleepMsBeforeHealPartition);
-    proxy1.reopen();
-    
-    // sent 4 docs in so far, verify they are on the leader and replica
-    notLeaders = ensureAllReplicasAreActive(testCollectionName, "shard1", 1, 3, maxWaitSecsToSeeAllActive); 
-    
-    sendDoc(4);
-    
-    assertDocsExistInAllReplicas(notLeaders, testCollectionName, 1, 4);    
-        
-    Replica leader = 
+
+    List<Replica> notLeaders =
+        ensureAllReplicasAreActive(testCollectionName, "shard1", 1, 2, maxWaitSecsToSeeAllActive);
+    assertTrue("Expected 1 replicas for collection " + testCollectionName
+            + " but found " + notLeaders.size() + "; clusterState: "
+            + printClusterStateInfo(testCollectionName),
+        notLeaders.size() == 1);
+
+    Replica leader =
         cloudClient.getZkStateReader().getLeaderRetry(testCollectionName, "shard1");
     String leaderNode = leader.getNodeName();
     assertNotNull("Could not find leader for shard1 of "+
-      testCollectionName+"; clusterState: "+printClusterStateInfo(), leader);
+        testCollectionName+"; clusterState: "+printClusterStateInfo(testCollectionName), leader);
     JettySolrRunner leaderJetty = getJettyOnPort(getReplicaPort(leader));
-    
-    // since maxShardsPerNode is 1, we're safe to kill the leader
-    notLeaders = ensureAllReplicasAreActive(testCollectionName, "shard1", 1, 3, maxWaitSecsToSeeAllActive);    
-    proxy0 = getProxyForReplica(notLeaders.get(0));
-    proxy0.close();
-        
-    // indexing during a partition
-    // doc should be on leader and 1 replica
-    sendDoc(5);
 
-    assertDocExists(getHttpSolrServer(leader, testCollectionName), testCollectionName, "5");
-    assertDocExists(getHttpSolrServer(notLeaders.get(1), testCollectionName), testCollectionName, "5");
-
-    Thread.sleep(sleepMsBeforeHealPartition);
-    
-    String shouldNotBeNewLeaderNode = notLeaders.get(0).getNodeName();
+    HttpSolrServer leaderSolr = getHttpSolrServer(leader, testCollectionName);
+    SolrInputDocument doc = new SolrInputDocument();
+    doc.addField(id, String.valueOf(2));
+    doc.addField("a_t", "hello" + 2);
 
-    // kill the leader
-    leaderJetty.stop();
-    
-    if (leaderJetty.isRunning())
-      fail("Failed to stop the leader on "+leaderNode);
-        
-    SocketProxy oldLeaderProxy = getProxyForReplica(leader);
-    if (oldLeaderProxy != null) {
-      oldLeaderProxy.close();      
-    } else {
-      log.warn("No SocketProxy found for old leader node "+leaderNode);      
-    }
+    // cause leader migration by expiring the current leader's zk session
+    chaosMonkey.expireSession(leaderJetty);
 
-    Thread.sleep(10000); // give chance for new leader to be elected.
-    
-    Replica newLeader = 
-        cloudClient.getZkStateReader().getLeaderRetry(testCollectionName, "shard1", 60000);
-        
-    assertNotNull("No new leader was elected after 60 seconds; clusterState: "+
-      printClusterStateInfo(),newLeader);
-        
-    assertTrue("Expected node "+shouldNotBeNewLeaderNode+
-        " to NOT be the new leader b/c it was out-of-sync with the old leader! ClusterState: "+
-        printClusterStateInfo(), 
-        !shouldNotBeNewLeaderNode.equals(newLeader.getNodeName()));
-    
-    proxy0.reopen();
-    
+    String expectedNewLeaderCoreNodeName = notLeaders.get(0).getName();
     long timeout = System.nanoTime() + TimeUnit.NANOSECONDS.convert(60, TimeUnit.SECONDS);
     while (System.nanoTime() < timeout) {
-      cloudClient.getZkStateReader().updateClusterState(true);
+      String currentLeaderName = null;
+      try {
+        Replica currentLeader =
+            cloudClient.getZkStateReader().getLeaderRetry(testCollectionName, "shard1");
+        currentLeaderName = currentLeader.getName();
+      } catch (Exception exc) {}
 
-      List<Replica> activeReps = getActiveOrRecoveringReplicas(testCollectionName, "shard1");
-      if (activeReps.size() == 2) break;
-      Thread.sleep(1000);
+      if (expectedNewLeaderCoreNodeName.equals(currentLeaderName))
+        break; // new leader was elected after zk session expiration
+
+      Thread.sleep(500);
     }
 
-    List<Replica> participatingReplicas = getActiveOrRecoveringReplicas(testCollectionName, "shard1");
-    assertTrue("Expected 2 of 3 replicas to be active but only found "+
-            participatingReplicas.size()+"; "+participatingReplicas+"; clusterState: "+printClusterStateInfo(),
-        participatingReplicas.size() == 2);
+    Replica currentLeader =
+        cloudClient.getZkStateReader().getLeaderRetry(testCollectionName, "shard1");
+    assertEquals(expectedNewLeaderCoreNodeName, currentLeader.getName());
+
+    log.info("Sending doc 2 to old leader "+leader.getName());
+    try {
+      leaderSolr.add(doc);
+      leaderSolr.shutdown();
 
-    sendDoc(6);
+      Replica oldLeaderInRecovery = null;
+      for (Replica next : getActiveOrRecoveringReplicas(testCollectionName, "shard1")) {
+        if (next.getName().equals(leader.getName()) &&
+            ZkStateReader.RECOVERING.equals(next.getStr(ZkStateReader.STATE_PROP)))
+        {
+          oldLeaderInRecovery = next;
+          break;
+        }
+      }
 
+      // if the old leader is not active or recovering, the add should have failed
+      if (oldLeaderInRecovery != null) {
+        HttpSolrServer oldLeaderSolr = getHttpSolrServer(oldLeaderInRecovery, testCollectionName);
+        try {
+          assertDocExists(oldLeaderSolr, testCollectionName, "2");
+        } finally {
+          oldLeaderSolr.shutdown();
+        }
+      } else {
+        fail("Send doc 2 to old leader " + leader.getName() +
+            " should have failed! ClusterState: " + printClusterStateInfo(testCollectionName));
+      }
+
+    } catch (SolrException exc) {
+      // this is expected ..
+      leaderSolr = getHttpSolrServer(currentLeader, testCollectionName);
+      try {
+        leaderSolr.add(doc); // this should work
+      } finally {
+        leaderSolr.shutdown();
+      }
+    }
+
+    List<Replica> participatingReplicas = getActiveOrRecoveringReplicas(testCollectionName, "shard1");
     Set<String> replicasToCheck = new HashSet<>();
     for (Replica stillUp : participatingReplicas)
       replicasToCheck.add(stillUp.getName());
     waitToSeeReplicasActive(testCollectionName, "shard1", replicasToCheck, 20);
-    assertDocsExistInAllReplicas(participatingReplicas, testCollectionName, 1, 6);
+    assertDocsExistInAllReplicas(participatingReplicas, testCollectionName, 1, 2);
+
+    // try to clean up
+    try {
+      CollectionAdminRequest req = new CollectionAdminRequest.Delete();
+      req.setCollectionName(testCollectionName);
+      req.process(cloudClient);
+    } catch (Exception e) {
+      // don't fail the test
+      log.warn("Could not delete collection {} after test completed", testCollectionName);
+    }
   }
 
   protected List<Replica> getActiveOrRecoveringReplicas(String testCollectionName, String shardId) throws Exception {    
@@ -431,21 +377,7 @@ public class HttpPartitionTest extends A
     replicas.addAll(activeReplicas.values());
     return replicas;
   }  
-  
-  protected SocketProxy getProxyForReplica(Replica replica) throws Exception {
-    String replicaBaseUrl = replica.getStr(ZkStateReader.BASE_URL_PROP);
-    assertNotNull(replicaBaseUrl);
-    URL baseUrl = new URL(replicaBaseUrl);
-    
-    SocketProxy proxy = proxies.get(baseUrl.toURI());
-    if (proxy == null && !baseUrl.toExternalForm().endsWith("/")) {
-      baseUrl = new URL(baseUrl.toExternalForm() + "/");
-      proxy = proxies.get(baseUrl.toURI());
-    }
-    assertNotNull("No proxy found for " + baseUrl + "!", proxy);
-    return proxy;
-  }
-  
+
   protected void assertDocsExistInAllReplicas(List<Replica> notLeaders,
       String testCollectionName, int firstDocId, int lastDocId)
       throws Exception {
@@ -501,33 +433,11 @@ public class HttpPartitionTest extends A
   protected void assertDocExists(HttpSolrServer solr, String coll, String docId) throws Exception {
     QueryRequest qr = new QueryRequest(params("qt", "/get", "id", docId, "distrib", "false"));
     NamedList rsp = solr.request(qr);
-    String match =
-        JSONTestUtil.matchObj("/id", rsp.get("doc"), new Integer(docId));
+    String match = JSONTestUtil.matchObj("/id", rsp.get("doc"), new Integer(docId));
     assertTrue("Doc with id=" + docId + " not found in " + solr.getBaseURL()
         + " due to: " + match + "; rsp="+rsp, match == null);
   }
 
-  protected JettySolrRunner getJettyOnPort(int port) {
-    JettySolrRunner theJetty = null;
-    for (JettySolrRunner jetty : jettys) {
-      if (port == jetty.getLocalPort()) {
-        theJetty = jetty;
-        break;
-      }
-    }    
-    
-    if (theJetty == null) {
-      if (controlJetty.getLocalPort() == port) {
-        theJetty = controlJetty;
-      }
-    }
-    
-    if (theJetty == null)
-      fail("Not able to find JettySolrRunner for port: "+port);
-    
-    return theJetty;
-  }
-  
   protected int getReplicaPort(Replica replica) {
     String replicaNode = replica.getNodeName();    
     String tmp = replicaNode.substring(replicaNode.indexOf(':')+1);
@@ -580,7 +490,7 @@ public class HttpPartitionTest extends A
 
     if (!allReplicasUp)
       fail("Didn't see replicas "+ replicasToCheck +
-          " come up within " + maxWaitMs + " ms! ClusterState: " + printClusterStateInfo());
+          " come up within " + maxWaitMs + " ms! ClusterState: " + printClusterStateInfo(testCollectionName));
 
     long diffMs = (System.currentTimeMillis() - startMs);
     log.info("Took " + diffMs + " ms to see replicas ["+replicasToCheck+"] become active.");

Added: lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/LeaderFailoverAfterPartitionTest.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/LeaderFailoverAfterPartitionTest.java?rev=1627347&view=auto
==============================================================================
--- lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/LeaderFailoverAfterPartitionTest.java (added)
+++ lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/LeaderFailoverAfterPartitionTest.java Wed Sep 24 15:21:47 2014
@@ -0,0 +1,181 @@
+package org.apache.solr.cloud;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import org.apache.lucene.util.LuceneTestCase.Slow;
+import org.apache.solr.SolrTestCaseJ4.SuppressSSL;
+import org.apache.solr.client.solrj.embedded.JettySolrRunner;
+import org.apache.solr.client.solrj.request.CollectionAdminRequest;
+import org.apache.solr.common.cloud.Replica;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Tests leader-initiated recovery scenarios after a leader node fails
+ * and one of the replicas is out-of-sync.
+ */
+@Slow
+@SuppressSSL(bugUrl = "https://issues.apache.org/jira/browse/SOLR-5776")
+public class LeaderFailoverAfterPartitionTest extends HttpPartitionTest {
+
+  public LeaderFailoverAfterPartitionTest() {
+    super();
+  }
+
+
+  @Override
+  public void doTest() throws Exception {
+    waitForThingsToLevelOut(30000);
+
+    // kill a leader and make sure recovery occurs as expected
+    testRf3WithLeaderFailover();
+  }
+
+  protected void testRf3WithLeaderFailover() throws Exception {
+    // now let's create a partition in one of the replicas and outright
+    // kill the leader ... see what happens
+    // create a collection that has 1 shard but 3 replicas
+    String testCollectionName = "c8n_1x3_lf"; // _lf is leader fails
+    createCollection(testCollectionName, 1, 3, 1);
+    cloudClient.setDefaultCollection(testCollectionName);
+    
+    sendDoc(1);
+    
+    List<Replica> notLeaders = 
+        ensureAllReplicasAreActive(testCollectionName, "shard1", 1, 3, maxWaitSecsToSeeAllActive);
+    assertTrue("Expected 2 replicas for collection " + testCollectionName
+        + " but found " + notLeaders.size() + "; clusterState: "
+        + printClusterStateInfo(testCollectionName),
+        notLeaders.size() == 2);
+        
+    // ok, now introduce a network partition between the leader and the replica
+    SocketProxy proxy0 = null;
+    proxy0 = getProxyForReplica(notLeaders.get(0));
+    
+    proxy0.close();
+    
+    // indexing during a partition
+    sendDoc(2);
+    
+    Thread.sleep(sleepMsBeforeHealPartition);
+    
+    proxy0.reopen();
+    
+    SocketProxy proxy1 = getProxyForReplica(notLeaders.get(1));
+    
+    proxy1.close();
+    
+    sendDoc(3);
+    
+    Thread.sleep(sleepMsBeforeHealPartition);
+    proxy1.reopen();
+    
+    // sent 4 docs in so far, verify they are on the leader and replica
+    notLeaders = ensureAllReplicasAreActive(testCollectionName, "shard1", 1, 3, maxWaitSecsToSeeAllActive); 
+    
+    sendDoc(4);
+    
+    assertDocsExistInAllReplicas(notLeaders, testCollectionName, 1, 4);    
+        
+    Replica leader = 
+        cloudClient.getZkStateReader().getLeaderRetry(testCollectionName, "shard1");
+    String leaderNode = leader.getNodeName();
+    assertNotNull("Could not find leader for shard1 of "+
+      testCollectionName+"; clusterState: "+printClusterStateInfo(testCollectionName), leader);
+    JettySolrRunner leaderJetty = getJettyOnPort(getReplicaPort(leader));
+    
+    // since maxShardsPerNode is 1, we're safe to kill the leader
+    notLeaders = ensureAllReplicasAreActive(testCollectionName, "shard1", 1, 3, maxWaitSecsToSeeAllActive);    
+    proxy0 = getProxyForReplica(notLeaders.get(0));
+    proxy0.close();
+        
+    // indexing during a partition
+    // doc should be on leader and 1 replica
+    sendDoc(5);
+
+    assertDocExists(getHttpSolrServer(leader, testCollectionName), testCollectionName, "5");
+    assertDocExists(getHttpSolrServer(notLeaders.get(1), testCollectionName), testCollectionName, "5");
+
+    Thread.sleep(sleepMsBeforeHealPartition);
+    
+    String shouldNotBeNewLeaderNode = notLeaders.get(0).getNodeName();
+
+    //chaosMonkey.expireSession(leaderJetty);
+    // kill the leader
+    leaderJetty.stop();
+    if (leaderJetty.isRunning())
+      fail("Failed to stop the leader on "+leaderNode);
+        
+    SocketProxy oldLeaderProxy = getProxyForReplica(leader);
+    if (oldLeaderProxy != null) {
+      oldLeaderProxy.close();      
+    } else {
+      log.warn("No SocketProxy found for old leader node "+leaderNode);      
+    }
+
+    Thread.sleep(10000); // give chance for new leader to be elected.
+    
+    Replica newLeader = 
+        cloudClient.getZkStateReader().getLeaderRetry(testCollectionName, "shard1", 60000);
+        
+    assertNotNull("No new leader was elected after 60 seconds; clusterState: "+
+      printClusterStateInfo(testCollectionName),newLeader);
+        
+    assertTrue("Expected node "+shouldNotBeNewLeaderNode+
+        " to NOT be the new leader b/c it was out-of-sync with the old leader! ClusterState: "+
+        printClusterStateInfo(testCollectionName),
+        !shouldNotBeNewLeaderNode.equals(newLeader.getNodeName()));
+    
+    proxy0.reopen();
+    
+    long timeout = System.nanoTime() + TimeUnit.NANOSECONDS.convert(60, TimeUnit.SECONDS);
+    while (System.nanoTime() < timeout) {
+      cloudClient.getZkStateReader().updateClusterState(true);
+
+      List<Replica> activeReps = getActiveOrRecoveringReplicas(testCollectionName, "shard1");
+      if (activeReps.size() >= 2) break;
+      Thread.sleep(1000);
+    }
+
+    List<Replica> participatingReplicas = getActiveOrRecoveringReplicas(testCollectionName, "shard1");
+    assertTrue("Expected 2 of 3 replicas to be active but only found "+
+            participatingReplicas.size()+"; "+participatingReplicas+"; clusterState: "+
+            printClusterStateInfo(testCollectionName),
+        participatingReplicas.size() >= 2);
+
+    sendDoc(6);
+
+    Set<String> replicasToCheck = new HashSet<>();
+    for (Replica stillUp : participatingReplicas)
+      replicasToCheck.add(stillUp.getName());
+    waitToSeeReplicasActive(testCollectionName, "shard1", replicasToCheck, 20);
+    assertDocsExistInAllReplicas(participatingReplicas, testCollectionName, 1, 6);
+
+    // try to clean up
+    try {
+      CollectionAdminRequest req = new CollectionAdminRequest.Delete();
+      req.setCollectionName(testCollectionName);
+      req.process(cloudClient);
+    } catch (Exception e) {
+      // don't fail the test
+      log.warn("Could not delete collection {} after test completed", testCollectionName);
+    }
+  }
+}

Modified: lucene/dev/trunk/solr/test-framework/src/java/org/apache/solr/cloud/AbstractFullDistribZkTestBase.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/test-framework/src/java/org/apache/solr/cloud/AbstractFullDistribZkTestBase.java?rev=1627347&r1=1627346&r2=1627347&view=diff
==============================================================================
--- lucene/dev/trunk/solr/test-framework/src/java/org/apache/solr/cloud/AbstractFullDistribZkTestBase.java (original)
+++ lucene/dev/trunk/solr/test-framework/src/java/org/apache/solr/cloud/AbstractFullDistribZkTestBase.java Wed Sep 24 15:21:47 2014
@@ -26,7 +26,9 @@ import static org.apache.solr.common.clo
 
 import java.io.File;
 import java.io.IOException;
+import java.net.ServerSocket;
 import java.net.URI;
+import java.net.URL;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
@@ -81,6 +83,8 @@ import org.junit.After;
 import org.junit.AfterClass;
 import org.junit.Before;
 import org.junit.BeforeClass;
+import org.noggit.CharArr;
+import org.noggit.JSONWriter;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -123,6 +127,8 @@ public abstract class AbstractFullDistri
   private boolean cloudInit;
   protected boolean checkCreatedVsState;
   protected boolean useJettyDataDir = true;
+
+  protected Map<URI,SocketProxy> proxies = new HashMap<URI,SocketProxy>();
   
   public static class CloudJettyRunner {
     public JettySolrRunner jetty;
@@ -515,6 +521,77 @@ public abstract class AbstractFullDistri
     return jetty;
   }
 
+  /**
+   * Creates a JettySolrRunner with a socket proxy sitting infront of the Jetty server,
+   * which gives us the ability to simulate network partitions without having to fuss
+   * with IPTables.
+   */
+  public JettySolrRunner createProxiedJetty(File solrHome, String dataDir,
+                                     String shardList, String solrConfigOverride, String schemaOverride)
+      throws Exception {
+
+    JettySolrRunner jetty = new JettySolrRunner(solrHome.getPath(), context,
+        0, solrConfigOverride, schemaOverride, false,
+        getExtraServlets(), sslConfig, getExtraRequestFilters());
+    jetty.setShards(shardList);
+    jetty.setDataDir(getDataDir(dataDir));
+
+    // setup to proxy Http requests to this server unless it is the control
+    // server
+    int proxyPort = getNextAvailablePort();
+    jetty.setProxyPort(proxyPort);
+    jetty.start();
+
+    // create a socket proxy for the jetty server ...
+    SocketProxy proxy = new SocketProxy(proxyPort, jetty.getBaseUrl().toURI());
+    proxies.put(proxy.getUrl(), proxy);
+
+    return jetty;
+  }
+
+  protected JettySolrRunner getJettyOnPort(int port) {
+    JettySolrRunner theJetty = null;
+    for (JettySolrRunner jetty : jettys) {
+      if (port == jetty.getLocalPort()) {
+        theJetty = jetty;
+        break;
+      }
+    }
+
+    if (theJetty == null) {
+      if (controlJetty.getLocalPort() == port) {
+        theJetty = controlJetty;
+      }
+    }
+
+    if (theJetty == null)
+      fail("Not able to find JettySolrRunner for port: "+port);
+
+    return theJetty;
+  }
+
+  protected SocketProxy getProxyForReplica(Replica replica) throws Exception {
+    String replicaBaseUrl = replica.getStr(ZkStateReader.BASE_URL_PROP);
+    assertNotNull(replicaBaseUrl);
+    URL baseUrl = new URL(replicaBaseUrl);
+
+    SocketProxy proxy = proxies.get(baseUrl.toURI());
+    if (proxy == null && !baseUrl.toExternalForm().endsWith("/")) {
+      baseUrl = new URL(baseUrl.toExternalForm() + "/");
+      proxy = proxies.get(baseUrl.toURI());
+    }
+    assertNotNull("No proxy found for " + baseUrl + "!", proxy);
+    return proxy;
+  }
+
+  protected int getNextAvailablePort() throws Exception {
+    int port = -1;
+    try (ServerSocket s = new ServerSocket(0)) {
+      port = s.getLocalPort();
+    }
+    return port;
+  }
+
   private File getRelativeSolrHomePath(File solrHome) {
     String path = SolrResourceLoader.normalizeDir(new File(".").getAbsolutePath());
     String base = new File(solrHome.getPath()).getAbsolutePath();
@@ -1467,6 +1544,13 @@ public abstract class AbstractFullDistri
     
     System.clearProperty("zkHost");
     System.clearProperty("numShards");
+
+    // close socket proxies after super.tearDown
+    if (!proxies.isEmpty()) {
+      for (SocketProxy proxy : proxies.values()) {
+        proxy.close();
+      }
+    }
   }
   
   @Override
@@ -1860,7 +1944,23 @@ public abstract class AbstractFullDistri
   }  
   
   protected String printClusterStateInfo() throws Exception {
+    return printClusterStateInfo(null);
+  }
+
+  protected String printClusterStateInfo(String collection) throws Exception {
     cloudClient.getZkStateReader().updateClusterState(true);
-    return String.valueOf(cloudClient.getZkStateReader().getClusterState());
-  }   
+    String cs = null;
+    ClusterState clusterState = cloudClient.getZkStateReader().getClusterState();
+    if (collection != null) {
+      cs = clusterState.getCollection(collection).toString();
+    } else {
+      Map<String,DocCollection> map = new HashMap<String,DocCollection>();
+      for (String coll : clusterState.getCollections())
+        map.put(coll, clusterState.getCollection(coll));
+      CharArr out = new CharArr();
+      new JSONWriter(out, 2).write(map);
+      cs = out.toString();
+    }
+    return cs;
+  }
 }