You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by sh...@apache.org on 2015/03/15 19:41:26 UTC

svn commit: r1666826 - in /lucene/dev/branches/branch_5x: ./ solr/ solr/core/ solr/core/src/java/org/apache/solr/cloud/ solr/core/src/java/org/apache/solr/update/processor/ solr/core/src/test/org/apache/solr/cloud/ solr/solrj/ solr/solrj/src/java/org/a...

Author: shalin
Date: Sun Mar 15 18:41:25 2015
New Revision: 1666826

URL: http://svn.apache.org/r1666826
Log:
SOLR-7109: Indexing threads stuck during network partition can put leader into down state

Modified:
    lucene/dev/branches/branch_5x/   (props changed)
    lucene/dev/branches/branch_5x/solr/   (props changed)
    lucene/dev/branches/branch_5x/solr/CHANGES.txt   (contents, props changed)
    lucene/dev/branches/branch_5x/solr/core/   (props changed)
    lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java
    lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/cloud/LeaderInitiatedRecoveryThread.java
    lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/cloud/ZkController.java
    lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java
    lucene/dev/branches/branch_5x/solr/core/src/test/org/apache/solr/cloud/HttpPartitionTest.java
    lucene/dev/branches/branch_5x/solr/core/src/test/org/apache/solr/cloud/ZkControllerTest.java
    lucene/dev/branches/branch_5x/solr/solrj/   (props changed)
    lucene/dev/branches/branch_5x/solr/solrj/src/java/org/apache/solr/common/cloud/SolrZkClient.java

Modified: lucene/dev/branches/branch_5x/solr/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_5x/solr/CHANGES.txt?rev=1666826&r1=1666825&r2=1666826&view=diff
==============================================================================
--- lucene/dev/branches/branch_5x/solr/CHANGES.txt (original)
+++ lucene/dev/branches/branch_5x/solr/CHANGES.txt Sun Mar 15 18:41:25 2015
@@ -185,6 +185,9 @@ Bug Fixes
 * SOLR-6682: Fix response when using EnumField with StatsComponent
   (Xu Zhang via hossman)
 
+* SOLR-7109: Indexing threads stuck during network partition can put leader into down state.
+  (Mark Miller, Anshum Gupta, Ramkumar Aiyengar, yonik, shalin)
+
 Optimizations
 ----------------------
 

Modified: lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java?rev=1666826&r1=1666825&r2=1666826&view=diff
==============================================================================
--- lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java (original)
+++ lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java Sun Mar 15 18:41:25 2015
@@ -402,7 +402,7 @@ final class ShardLeaderElectionContext e
                                                     120,
                                                     coreNodeName);
               zkController.ensureReplicaInLeaderInitiatedRecovery(
-                  collection, shardId, coreNodeProps.getCoreUrl(), coreNodeProps, false);
+                  collection, shardId, coreNodeProps, false, coreNodeName);
               
               ExecutorService executor = cc.getUpdateShardHandler().getUpdateExecutor();
               executor.execute(lirThread);

Modified: lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/cloud/LeaderInitiatedRecoveryThread.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/cloud/LeaderInitiatedRecoveryThread.java?rev=1666826&r1=1666825&r2=1666826&view=diff
==============================================================================
--- lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/cloud/LeaderInitiatedRecoveryThread.java (original)
+++ lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/cloud/LeaderInitiatedRecoveryThread.java Sun Mar 15 18:41:25 2015
@@ -226,8 +226,8 @@ public class LeaderInitiatedRecoveryThre
                         // so its state cannot be trusted and it needs to be told to recover again ... and we keep looping here
                         log.warn("Replica core={} coreNodeName={} set to active but the leader thinks it should be in recovery;"
                             + " forcing it back to down state to re-run the leader-initiated recovery process; props: "+replicaProps.get(0), coreNeedingRecovery, replicaCoreNodeName);
-                        zkController.ensureReplicaInLeaderInitiatedRecovery(collection,
-                            shardId, replicaUrl, nodeProps, true); // force republish state to "down"
+                        // force republish state to "down"
+                        zkController.ensureReplicaInLeaderInitiatedRecovery(collection, shardId, nodeProps, true, leaderCoreNodeName);
                       }
                     }
                     break;

Modified: lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/cloud/ZkController.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/cloud/ZkController.java?rev=1666826&r1=1666825&r2=1666826&view=diff
==============================================================================
--- lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/cloud/ZkController.java (original)
+++ lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/cloud/ZkController.java Sun Mar 15 18:41:25 2015
@@ -68,6 +68,7 @@ import org.apache.zookeeper.KeeperExcept
 import org.apache.zookeeper.KeeperException.ConnectionLossException;
 import org.apache.zookeeper.KeeperException.NoNodeException;
 import org.apache.zookeeper.KeeperException.SessionExpiredException;
+import org.apache.zookeeper.Op;
 import org.apache.zookeeper.WatchedEvent;
 import org.apache.zookeeper.Watcher;
 import org.apache.zookeeper.data.Stat;
@@ -169,7 +170,7 @@ public final class ZkController {
       return true;
     }
   }
-  private final Map<ContextKey, ElectionContext> electionContexts = Collections.synchronizedMap(new HashMap<ContextKey, ElectionContext>());
+  private final Map<ContextKey, ElectionContext> electionContexts = Collections.synchronizedMap(new HashMap<>());
   
   private final SolrZkClient zkClient;
   private final ZkCmdExecutor cmdExecutor;
@@ -1135,10 +1136,10 @@ public final class ZkController {
     if (!ZkStateReader.DOWN.equals(state)) {
       String lirState = getLeaderInitiatedRecoveryState(collection, shardId, coreNodeName);
       if (lirState != null) {
-        if ("active".equals(state)) {
+        if (ZkStateReader.ACTIVE.equals(state)) {
           // trying to become active, so leader-initiated state must be recovering
           if (ZkStateReader.RECOVERING.equals(lirState)) {
-            updateLeaderInitiatedRecoveryState(collection, shardId, coreNodeName, ZkStateReader.ACTIVE);
+            updateLeaderInitiatedRecoveryState(collection, shardId, coreNodeName, ZkStateReader.ACTIVE, null);
           } else if (ZkStateReader.DOWN.equals(lirState)) {
             throw new SolrException(ErrorCode.INVALID_STATE, 
                 "Cannot publish state of core '"+cd.getName()+"' as active without recovering first!");
@@ -1146,13 +1147,13 @@ public final class ZkController {
         } else if (ZkStateReader.RECOVERING.equals(state)) {
           // if it is currently DOWN, then trying to enter into recovering state is good
           if (ZkStateReader.DOWN.equals(lirState)) {
-            updateLeaderInitiatedRecoveryState(collection, shardId, coreNodeName, ZkStateReader.RECOVERING);
+            updateLeaderInitiatedRecoveryState(collection, shardId, coreNodeName, ZkStateReader.RECOVERING, null);
           }
         }
       }
     }
     
-    Map<String, Object> props = new HashMap<String, Object>();
+    Map<String, Object> props = new HashMap<>();
     props.put(Overseer.QUEUE_OPERATION, "state");
     props.put(ZkStateReader.STATE_PROP, state);
     props.put(ZkStateReader.BASE_URL_PROP, getBaseUrl());
@@ -1891,9 +1892,11 @@ public final class ZkController {
    * to it.
    */
   public boolean ensureReplicaInLeaderInitiatedRecovery(final String collection, 
-      final String shardId, final String replicaUrl, final ZkCoreNodeProps replicaCoreProps, boolean forcePublishState) 
+      final String shardId, final ZkCoreNodeProps replicaCoreProps, boolean forcePublishState, String leaderCoreNodeName)
           throws KeeperException, InterruptedException 
-  {    
+  {
+    final String replicaUrl = replicaCoreProps.getCoreUrl();
+
     if (collection == null)
       throw new IllegalArgumentException("collection parameter cannot be null for starting leader-initiated recovery for replica: "+replicaUrl);
 
@@ -1924,7 +1927,7 @@ public final class ZkController {
       // we only really need to try to send the recovery command if the node itself is "live"
       if (getZkStateReader().getClusterState().liveNodesContain(replicaNodeName)) {
         // create a znode that requires the replica needs to "ack" to verify it knows it was out-of-sync
-        updateLeaderInitiatedRecoveryState(collection, shardId, replicaCoreNodeName, ZkStateReader.DOWN);
+        updateLeaderInitiatedRecoveryState(collection, shardId, replicaCoreNodeName, ZkStateReader.DOWN, leaderCoreNodeName);
         replicasInLeaderInitiatedRecovery.put(replicaUrl,
             getLeaderInitiatedRecoveryZnodePath(collection, shardId, replicaCoreNodeName));
         log.info("Put replica core={} coreNodeName={} on "+
@@ -2020,7 +2023,8 @@ public final class ZkController {
     return stateObj;
   }
   
-  private void updateLeaderInitiatedRecoveryState(String collection, String shardId, String coreNodeName, String state) {
+  private void updateLeaderInitiatedRecoveryState(String collection, String shardId, String coreNodeName, String state,
+                                                  String leaderCoreNodeName) {
     if (collection == null || shardId == null || coreNodeName == null) {
       log.warn("Cannot set leader-initiated recovery state znode to "+state+" using: collection="+collection+
           "; shardId="+shardId+"; coreNodeName="+coreNodeName);
@@ -2034,7 +2038,7 @@ public final class ZkController {
       try {
         zkClient.delete(znodePath, -1, false);
       } catch (Exception justLogIt) {
-        log.warn("Failed to delete znode "+znodePath+" due to: "+justLogIt);
+        log.warn("Failed to delete znode " + znodePath, justLogIt);
       }
       return;
     }
@@ -2054,24 +2058,62 @@ public final class ZkController {
       stateObj.put("createdByNodeName", String.valueOf(this.nodeName));
 
     byte[] znodeData = ZkStateReader.toJSON(stateObj);
-    boolean retryOnConnLoss = true; // be a little more robust when trying to write data
+
     try {
-      if (zkClient.exists(znodePath, retryOnConnLoss)) {
-        zkClient.setData(znodePath, znodeData, retryOnConnLoss);
-      } else {
-        zkClient.makePath(znodePath, znodeData, retryOnConnLoss);
+      if (ZkStateReader.DOWN.equals(state)) {
+        markShardAsDownIfLeader(collection, shardId, leaderCoreNodeName, znodePath, znodeData);
+      } else  {
+        if (zkClient.exists(znodePath, true)) {
+          zkClient.setData(znodePath, znodeData, true);
+        } else {
+          zkClient.makePath(znodePath, znodeData, true);
+        }
       }
       log.info("Wrote "+state+" to "+znodePath);
     } catch (Exception exc) {
       if (exc instanceof SolrException) {
         throw (SolrException)exc;
       } else {
-        throw new SolrException(ErrorCode.SERVER_ERROR, 
-            "Failed to update data to "+state+" for znode: "+znodePath, exc);        
+        throw new SolrException(ErrorCode.SERVER_ERROR,
+            "Failed to update data to "+state+" for znode: "+znodePath, exc);
       }
     }
   }
-  
+
+  /**
+   * we use ZK's multi-transactional semantics to ensure that we are able to
+   * publish a replica as 'down' only if our leader election node still exists
+   * in ZK. This ensures that a long running network partition caused by GC etc
+   * doesn't let us mark a node as down *after* we've already lost our session
+   */
+  private void markShardAsDownIfLeader(String collection, String shardId, String leaderCoreNodeName,
+                                       String znodePath, byte[] znodeData) throws KeeperException, InterruptedException {
+    String leaderSeqPath = getLeaderSeqPath(collection, leaderCoreNodeName);
+    if (leaderSeqPath == null) {
+      throw new SolrException(ErrorCode.SERVER_ERROR,
+          "Failed to update data to 'down' for znode: " + znodePath +
+              " because the zookeeper leader sequence for leader: " + leaderCoreNodeName + " is null");
+    }
+    if (zkClient.exists(znodePath, true)) {
+      List<Op> ops = new ArrayList<>(2);
+      ops.add(Op.check(leaderSeqPath, -1)); // version doesn't matter, the seq path is unique
+      ops.add(Op.setData(znodePath, znodeData, -1));
+      zkClient.multi(ops, true);
+    } else {
+      String parentZNodePath = getLeaderInitiatedRecoveryZnodePath(collection, shardId);
+      try {
+        zkClient.makePath(parentZNodePath, true);
+      } catch (KeeperException.NodeExistsException nee) {
+        // if it exists, that's great!
+      }
+      List<Op> ops = new ArrayList<>(2);
+      ops.add(Op.check(leaderSeqPath, -1)); // version doesn't matter, the seq path is unique
+      ops.add(Op.create(znodePath, znodeData, zkClient.getZkACLProvider().getACLsToAdd(znodePath),
+          CreateMode.PERSISTENT));
+      zkClient.multi(ops, true);
+    }
+  }
+
   public String getLeaderInitiatedRecoveryZnodePath(String collection, String shardId) {
     return "/collections/"+collection+"/leader_initiated_recovery/"+shardId;
   }  
@@ -2309,4 +2351,9 @@ public final class ZkController {
     };
   }
 
+  public String getLeaderSeqPath(String collection, String coreNodeName) {
+    ContextKey key = new ContextKey(collection, coreNodeName);
+    ElectionContext context = electionContexts.get(key);
+    return context != null ? context.leaderSeqPath : null;
+  }
 }

Modified: lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java?rev=1666826&r1=1666825&r2=1666826&view=diff
==============================================================================
--- lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java (original)
+++ lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java Sun Mar 15 18:41:25 2015
@@ -872,9 +872,9 @@ public class DistributedUpdateProcessor
             sendRecoveryCommand =
                 zkController.ensureReplicaInLeaderInitiatedRecovery(collection,
                     shardId,
-                    replicaUrl,
                     stdNode.getNodeProps(),
-                    false);
+                    false,
+                    leaderCoreNodeName);
 
             // we want to try more than once, ~10 minutes
             if (sendRecoveryCommand) {

Modified: lucene/dev/branches/branch_5x/solr/core/src/test/org/apache/solr/cloud/HttpPartitionTest.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_5x/solr/core/src/test/org/apache/solr/cloud/HttpPartitionTest.java?rev=1666826&r1=1666825&r2=1666826&view=diff
==============================================================================
--- lucene/dev/branches/branch_5x/solr/core/src/test/org/apache/solr/cloud/HttpPartitionTest.java (original)
+++ lucene/dev/branches/branch_5x/solr/core/src/test/org/apache/solr/cloud/HttpPartitionTest.java Sun Mar 15 18:41:25 2015
@@ -146,7 +146,7 @@ public class HttpPartitionTest extends A
     String replicaUrl = replicaCoreNodeProps.getCoreUrl();
 
     assertTrue(!zkController.isReplicaInRecoveryHandling(replicaUrl));
-    assertTrue(zkController.ensureReplicaInLeaderInitiatedRecovery(testCollectionName, shardId, replicaUrl, replicaCoreNodeProps, false));
+    assertTrue(zkController.ensureReplicaInLeaderInitiatedRecovery(testCollectionName, shardId, replicaCoreNodeProps, false, leader.getName()));
     assertTrue(zkController.isReplicaInRecoveryHandling(replicaUrl));
     Map<String,Object> lirStateMap = zkController.getLeaderInitiatedRecoveryStateObject(testCollectionName, shardId, notLeader.getName());
     assertNotNull(lirStateMap);

Modified: lucene/dev/branches/branch_5x/solr/core/src/test/org/apache/solr/cloud/ZkControllerTest.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_5x/solr/core/src/test/org/apache/solr/cloud/ZkControllerTest.java?rev=1666826&r1=1666825&r2=1666826&view=diff
==============================================================================
--- lucene/dev/branches/branch_5x/solr/core/src/test/org/apache/solr/cloud/ZkControllerTest.java (original)
+++ lucene/dev/branches/branch_5x/solr/core/src/test/org/apache/solr/cloud/ZkControllerTest.java Sun Mar 15 18:41:25 2015
@@ -19,8 +19,10 @@ package org.apache.solr.cloud;
 
 import org.apache.lucene.util.LuceneTestCase.Slow;
 import org.apache.solr.SolrTestCaseJ4;
+import org.apache.solr.common.cloud.Replica;
 import org.apache.solr.common.cloud.SolrZkClient;
 import org.apache.solr.common.cloud.ZkConfigManager;
+import org.apache.solr.common.cloud.ZkCoreNodeProps;
 import org.apache.solr.common.cloud.ZkNodeProps;
 import org.apache.solr.common.cloud.ZkStateReader;
 import org.apache.solr.core.CloudConfig;
@@ -244,6 +246,65 @@ public class ZkControllerTest extends So
     }
   }
 
+  public void testEnsureReplicaInLeaderInitiatedRecovery() throws Exception  {
+    String zkDir = createTempDir("testEnsureReplicaInLeaderInitiatedRecovery").toFile().getAbsolutePath();
+    CoreContainer cc = null;
+
+    ZkTestServer server = new ZkTestServer(zkDir);
+    try {
+      server.run();
+
+      AbstractZkTestCase.tryCleanSolrZkNode(server.getZkHost());
+      AbstractZkTestCase.makeSolrZkNode(server.getZkHost());
+
+      cc = getCoreContainer();
+      ZkController zkController = null;
+
+      try {
+        CloudConfig cloudConfig = new CloudConfig.CloudConfigBuilder("127.0.0.1", 8983, "solr").build();
+        zkController = new ZkController(cc, server.getZkAddress(), TIMEOUT, cloudConfig, new CurrentCoreDescriptorProvider() {
+
+          @Override
+          public List<CoreDescriptor> getCurrentDescriptors() {
+            // do nothing
+            return null;
+          }
+        });
+        HashMap<String, Object> propMap = new HashMap<>();
+        propMap.put(ZkStateReader.BASE_URL_PROP, "http://127.0.0.1:8983/solr");
+        propMap.put(ZkStateReader.CORE_NAME_PROP, "replica1");
+        propMap.put(ZkStateReader.NODE_NAME_PROP, "127.0.0.1:8983_solr");
+        Replica replica = new Replica("replica1", propMap);
+        try {
+          // this method doesn't throw exception when node isn't leader
+          zkController.ensureReplicaInLeaderInitiatedRecovery("c1", "shard1",
+              new ZkCoreNodeProps(replica), false, "non_existent_leader");
+          fail("ZkController should not write LIR state for node which is not leader");
+        } catch (Exception e) {
+          assertNull("ZkController should not write LIR state for node which is not leader",
+              zkController.getLeaderInitiatedRecoveryState("c1", "shard1", "replica1"));
+        }
+      } finally {
+        if (zkController != null)
+          zkController.close();
+      }
+    } finally {
+      if (cc != null) {
+        cc.shutdown();
+      }
+      server.shutdown();
+    }
+  }
+
+  /*
+  Test that:
+  1) LIR state to 'down' is not set unless publishing node is a leader
+    1a) Test that leader can publish when LIR node already exists in zk
+    1b) Test that leader can publish when LIR node does not exist
+  2) LIR state to 'active' or 'recovery' can be set regardless of whether publishing
+    node is leader or not
+   */
+
   private CoreContainer getCoreContainer() {
     return new MockCoreContainer();
   }

Modified: lucene/dev/branches/branch_5x/solr/solrj/src/java/org/apache/solr/common/cloud/SolrZkClient.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_5x/solr/solrj/src/java/org/apache/solr/common/cloud/SolrZkClient.java?rev=1666826&r1=1666825&r2=1666826&view=diff
==============================================================================
--- lucene/dev/branches/branch_5x/solr/solrj/src/java/org/apache/solr/common/cloud/SolrZkClient.java (original)
+++ lucene/dev/branches/branch_5x/solr/solrj/src/java/org/apache/solr/common/cloud/SolrZkClient.java Sun Mar 15 18:41:25 2015
@@ -29,6 +29,8 @@ import org.apache.zookeeper.KeeperExcept
 import org.apache.zookeeper.KeeperException.NoNodeException;
 import org.apache.zookeeper.KeeperException.NodeExistsException;
 import org.apache.zookeeper.KeeperException.NotEmptyException;
+import org.apache.zookeeper.Op;
+import org.apache.zookeeper.OpResult;
 import org.apache.zookeeper.WatchedEvent;
 import org.apache.zookeeper.Watcher;
 import org.apache.zookeeper.ZooKeeper;
@@ -560,6 +562,19 @@ public class SolrZkClient implements Clo
     return setData(path, data, retryOnConnLoss);
   }
 
+  public List<OpResult> multi(final Iterable<Op> ops, boolean retryOnConnLoss) throws InterruptedException, KeeperException  {
+    if (retryOnConnLoss) {
+      return zkCmdExecutor.retryOperation(new ZkOperation() {
+        @Override
+        public List<OpResult> execute() throws KeeperException, InterruptedException {
+          return keeper.multi(ops);
+        }
+      });
+    } else {
+      return keeper.multi(ops);
+    }
+  }
+
   /**
    * Fills string with printout of current ZooKeeper layout.
    */
@@ -740,4 +755,7 @@ public class SolrZkClient implements Clo
     return zkServerAddress;
   }
 
+  public ZkACLProvider getZkACLProvider() {
+    return zkACLProvider;
+  }
 }