You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by sh...@apache.org on 2014/10/02 13:31:17 UTC

svn commit: r1628945 - in /lucene/dev/trunk/solr: CHANGES.txt core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java core/src/test/org/apache/solr/cloud/LeaderInitiatedRecoveryOnCommitTest.java

Author: shalin
Date: Thu Oct  2 11:31:16 2014
New Revision: 1628945

URL: http://svn.apache.org/r1628945
Log:
SOLR-6530: Commits under network partitions can put any node in down state

Added:
    lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/LeaderInitiatedRecoveryOnCommitTest.java   (with props)
Modified:
    lucene/dev/trunk/solr/CHANGES.txt
    lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java

Modified: lucene/dev/trunk/solr/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/CHANGES.txt?rev=1628945&r1=1628944&r2=1628945&view=diff
==============================================================================
--- lucene/dev/trunk/solr/CHANGES.txt (original)
+++ lucene/dev/trunk/solr/CHANGES.txt Thu Oct  2 11:31:16 2014
@@ -200,6 +200,9 @@ Bug Fixes
 
 * SOLR-6511: Fencepost error in LeaderInitiatedRecoveryThread (Timothy Potter)
 
+* SOLR-6530: Commits under network partitions can put any node in down state.
+  (Ramkumar Aiyengar, Alan Woodward, Mark Miller, shalin)
+
 
 Other Changes
 ----------------------

Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java?rev=1628945&r1=1628944&r2=1628945&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java Thu Oct  2 11:31:16 2014
@@ -808,6 +808,11 @@ public class DistributedUpdateProcessor 
       if (phase != DistribPhase.FROMLEADER)
         continue; // don't have non-leaders try to recovery other nodes
 
+      // commits are special -- they can run on any node irrespective of whether it is a leader or not
+      // we don't want to run recovery on a node which missed a commit command
+      if (error.req.uReq.getParams().get(COMMIT_END_POINT) != null)
+        continue;
+
       final String replicaUrl = error.req.node.getUrl();
 
       // if the remote replica failed the request because of leader change (SOLR-6511), then fail the request
@@ -839,7 +844,17 @@ public class DistributedUpdateProcessor 
               " " + shardId + " before putting " + replicaUrl + " into leader-initiated recovery due to: " + exc);
         }
 
-        if (cloudDesc.getCoreNodeName().equals(leaderCoreNodeName)) {
+        List<ZkCoreNodeProps> myReplicas = zkController.getZkStateReader().getReplicaProps(collection,
+            cloudDesc.getShardId(), cloudDesc.getCoreNodeName());
+        boolean foundErrorNodeInReplicaList = false;
+        for (ZkCoreNodeProps replicaProp : myReplicas) {
+          if (((Replica) replicaProp.getNodeProps()).getName().equals(((Replica)stdNode.getNodeProps().getNodeProps()).getName()))  {
+            foundErrorNodeInReplicaList = true;
+            break;
+          }
+        }
+
+        if (cloudDesc.getCoreNodeName().equals(leaderCoreNodeName) && foundErrorNodeInReplicaList) {
           try {
             // if false, then the node is probably not "live" anymore
             sendRecoveryCommand =
@@ -866,10 +881,16 @@ public class DistributedUpdateProcessor 
             // will go ahead and try to send the recovery command once after this error
           }
         } else {
-          // not the leader anymore maybe?
+          // not the leader anymore maybe or the error'd node is not my replica?
           sendRecoveryCommand = false;
-          log.warn("Core "+cloudDesc.getCoreNodeName()+" is no longer the leader for "+collection+" "+
-              shardId+", no request recovery command will be sent!");
+          if (!foundErrorNodeInReplicaList) {
+            log.warn("Core "+cloudDesc.getCoreNodeName()+" belonging to "+collection+" "+
+                shardId+", does not have error'd node " + stdNode.getNodeProps().getCoreUrl() + " as a replica. " +
+                "No request recovery command will be sent!");
+          } else  {
+            log.warn("Core "+cloudDesc.getCoreNodeName()+" is no longer the leader for "+collection+" "+
+                shardId+", no request recovery command will be sent!");
+          }
         }
       } // else not a StdNode, recovery command still gets sent once
             

Added: lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/LeaderInitiatedRecoveryOnCommitTest.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/LeaderInitiatedRecoveryOnCommitTest.java?rev=1628945&view=auto
==============================================================================
--- lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/LeaderInitiatedRecoveryOnCommitTest.java (added)
+++ lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/LeaderInitiatedRecoveryOnCommitTest.java Thu Oct  2 11:31:16 2014
@@ -0,0 +1,163 @@
+package org.apache.solr.cloud;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.io.File;
+import java.util.List;
+
+import org.apache.solr.client.solrj.embedded.JettySolrRunner;
+import org.apache.solr.client.solrj.impl.HttpSolrServer;
+import org.apache.solr.client.solrj.request.CollectionAdminRequest;
+import org.apache.solr.common.cloud.Replica;
+import org.apache.solr.common.cloud.ZkCoreNodeProps;
+import org.junit.After;
+import org.junit.Before;
+
+public class LeaderInitiatedRecoveryOnCommitTest extends BasicDistributedZkTest {
+
+  private static final long sleepMsBeforeHealPartition = 2000L;
+
+  public LeaderInitiatedRecoveryOnCommitTest() {
+    super();
+    sliceCount = 1;
+    shardCount = 4;
+  }
+
+  @Before
+  @Override
+  public void setUp() throws Exception {
+    super.setUp();
+    System.setProperty("numShards", Integer.toString(sliceCount));
+  }
+
+  @Override
+  @After
+  public void tearDown() throws Exception {
+    System.clearProperty("numShards");
+
+    try {
+      super.tearDown();
+    } catch (Exception exc) {
+    }
+
+    resetExceptionIgnores();
+
+    // close socket proxies after super.tearDown
+    if (!proxies.isEmpty()) {
+      for (SocketProxy proxy : proxies.values()) {
+        proxy.close();
+      }
+    }
+  }
+
+  @Override
+  public void doTest() throws Exception {
+    oneShardTest();
+    multiShardTest();
+  }
+
+  private void multiShardTest() throws Exception {
+    // create a collection that has 1 shard and 3 replicas
+    String testCollectionName = "c8n_2x2_commits";
+    createCollection(testCollectionName, 2, 2, 1);
+    cloudClient.setDefaultCollection(testCollectionName);
+
+    List<Replica> notLeaders =
+        ensureAllReplicasAreActive(testCollectionName, "shard1", 2, 2, 30);
+    assertTrue("Expected 1 replicas for collection " + testCollectionName
+            + " but found " + notLeaders.size() + "; clusterState: "
+            + printClusterStateInfo(),
+        notLeaders.size() == 1);
+
+    // let's put the leader in it's own partition, no replicas can contact it now
+    Replica leader = cloudClient.getZkStateReader().getLeaderRetry(testCollectionName, "shard1");
+    SocketProxy leaderProxy = getProxyForReplica(leader);
+    leaderProxy.close();
+
+    // let's find the leader of shard2 and ask him to commit
+    Replica shard2Leader = cloudClient.getZkStateReader().getLeaderRetry(testCollectionName, "shard2");
+    HttpSolrServer server = new HttpSolrServer(ZkCoreNodeProps.getCoreUrl(shard2Leader.getStr("base_url"), shard2Leader.getStr("core")));
+    server.commit();
+
+    Thread.sleep(sleepMsBeforeHealPartition);
+
+    cloudClient.getZkStateReader().updateClusterState(true); // get the latest state
+    leader = cloudClient.getZkStateReader().getLeaderRetry(testCollectionName, "shard1");
+    assertEquals("Leader was not active", "active", leader.getStr("state"));
+
+    // try to clean up
+    try {
+      CollectionAdminRequest req = new CollectionAdminRequest.Delete();
+      req.setCollectionName(testCollectionName);
+      req.process(cloudClient);
+    } catch (Exception e) {
+      // don't fail the test
+      log.warn("Could not delete collection {} after test completed", testCollectionName);
+    }
+  }
+
+  private void oneShardTest() throws Exception {
+    // create a collection that has 1 shard and 3 replicas
+    String testCollectionName = "c8n_1x3_commits";
+    createCollection(testCollectionName, 1, 3, 1);
+    cloudClient.setDefaultCollection(testCollectionName);
+
+    List<Replica> notLeaders =
+        ensureAllReplicasAreActive(testCollectionName, "shard1", 1, 3, 30);
+    assertTrue("Expected 2 replicas for collection " + testCollectionName
+            + " but found " + notLeaders.size() + "; clusterState: "
+            + printClusterStateInfo(),
+        notLeaders.size() == 2);
+
+    // let's put the leader in it's own partition, no replicas can contact it now
+    Replica leader = cloudClient.getZkStateReader().getLeaderRetry(testCollectionName, "shard1");
+    SocketProxy leaderProxy = getProxyForReplica(leader);
+    leaderProxy.close();
+
+    Replica replica = notLeaders.get(0);
+    HttpSolrServer server = new HttpSolrServer(ZkCoreNodeProps.getCoreUrl(replica.getStr("base_url"), replica.getStr("core")));
+    server.commit();
+
+    Thread.sleep(sleepMsBeforeHealPartition);
+
+    cloudClient.getZkStateReader().updateClusterState(true); // get the latest state
+    leader = cloudClient.getZkStateReader().getLeaderRetry(testCollectionName, "shard1");
+    assertEquals("Leader was not active", "active", leader.getStr("state"));
+
+    // try to clean up
+    try {
+      CollectionAdminRequest req = new CollectionAdminRequest.Delete();
+      req.setCollectionName(testCollectionName);
+      req.process(cloudClient);
+    } catch (Exception e) {
+      // don't fail the test
+      log.warn("Could not delete collection {} after test completed", testCollectionName);
+    }
+  }
+
+  /**
+   * Overrides the parent implementation to install a SocketProxy in-front of the Jetty server.
+   */
+  @Override
+  public JettySolrRunner createJetty(File solrHome, String dataDir,
+                                     String shardList, String solrConfigOverride, String schemaOverride)
+      throws Exception {
+    return createProxiedJetty(solrHome, dataDir, shardList, solrConfigOverride, schemaOverride);
+  }
+
+}
\ No newline at end of file