You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by sh...@apache.org on 2015/03/15 19:41:26 UTC
svn commit: r1666826 - in /lucene/dev/branches/branch_5x: ./ solr/
solr/core/ solr/core/src/java/org/apache/solr/cloud/
solr/core/src/java/org/apache/solr/update/processor/
solr/core/src/test/org/apache/solr/cloud/ solr/solrj/
solr/solrj/src/java/org/a...
Author: shalin
Date: Sun Mar 15 18:41:25 2015
New Revision: 1666826
URL: http://svn.apache.org/r1666826
Log:
SOLR-7109: Indexing threads stuck during network partition can put leader into down state
Modified:
lucene/dev/branches/branch_5x/ (props changed)
lucene/dev/branches/branch_5x/solr/ (props changed)
lucene/dev/branches/branch_5x/solr/CHANGES.txt (contents, props changed)
lucene/dev/branches/branch_5x/solr/core/ (props changed)
lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java
lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/cloud/LeaderInitiatedRecoveryThread.java
lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/cloud/ZkController.java
lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java
lucene/dev/branches/branch_5x/solr/core/src/test/org/apache/solr/cloud/HttpPartitionTest.java
lucene/dev/branches/branch_5x/solr/core/src/test/org/apache/solr/cloud/ZkControllerTest.java
lucene/dev/branches/branch_5x/solr/solrj/ (props changed)
lucene/dev/branches/branch_5x/solr/solrj/src/java/org/apache/solr/common/cloud/SolrZkClient.java
Modified: lucene/dev/branches/branch_5x/solr/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_5x/solr/CHANGES.txt?rev=1666826&r1=1666825&r2=1666826&view=diff
==============================================================================
--- lucene/dev/branches/branch_5x/solr/CHANGES.txt (original)
+++ lucene/dev/branches/branch_5x/solr/CHANGES.txt Sun Mar 15 18:41:25 2015
@@ -185,6 +185,9 @@ Bug Fixes
* SOLR-6682: Fix response when using EnumField with StatsComponent
(Xu Zhang via hossman)
+* SOLR-7109: Indexing threads stuck during network partition can put leader into down state.
+ (Mark Miller, Anshum Gupta, Ramkumar Aiyengar, yonik, shalin)
+
Optimizations
----------------------
Modified: lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java?rev=1666826&r1=1666825&r2=1666826&view=diff
==============================================================================
--- lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java (original)
+++ lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java Sun Mar 15 18:41:25 2015
@@ -402,7 +402,7 @@ final class ShardLeaderElectionContext e
120,
coreNodeName);
zkController.ensureReplicaInLeaderInitiatedRecovery(
- collection, shardId, coreNodeProps.getCoreUrl(), coreNodeProps, false);
+ collection, shardId, coreNodeProps, false, coreNodeName);
ExecutorService executor = cc.getUpdateShardHandler().getUpdateExecutor();
executor.execute(lirThread);
Modified: lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/cloud/LeaderInitiatedRecoveryThread.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/cloud/LeaderInitiatedRecoveryThread.java?rev=1666826&r1=1666825&r2=1666826&view=diff
==============================================================================
--- lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/cloud/LeaderInitiatedRecoveryThread.java (original)
+++ lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/cloud/LeaderInitiatedRecoveryThread.java Sun Mar 15 18:41:25 2015
@@ -226,8 +226,8 @@ public class LeaderInitiatedRecoveryThre
// so its state cannot be trusted and it needs to be told to recover again ... and we keep looping here
log.warn("Replica core={} coreNodeName={} set to active but the leader thinks it should be in recovery;"
+ " forcing it back to down state to re-run the leader-initiated recovery process; props: "+replicaProps.get(0), coreNeedingRecovery, replicaCoreNodeName);
- zkController.ensureReplicaInLeaderInitiatedRecovery(collection,
- shardId, replicaUrl, nodeProps, true); // force republish state to "down"
+ // force republish state to "down"
+ zkController.ensureReplicaInLeaderInitiatedRecovery(collection, shardId, nodeProps, true, leaderCoreNodeName);
}
}
break;
Modified: lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/cloud/ZkController.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/cloud/ZkController.java?rev=1666826&r1=1666825&r2=1666826&view=diff
==============================================================================
--- lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/cloud/ZkController.java (original)
+++ lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/cloud/ZkController.java Sun Mar 15 18:41:25 2015
@@ -68,6 +68,7 @@ import org.apache.zookeeper.KeeperExcept
import org.apache.zookeeper.KeeperException.ConnectionLossException;
import org.apache.zookeeper.KeeperException.NoNodeException;
import org.apache.zookeeper.KeeperException.SessionExpiredException;
+import org.apache.zookeeper.Op;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.data.Stat;
@@ -169,7 +170,7 @@ public final class ZkController {
return true;
}
}
- private final Map<ContextKey, ElectionContext> electionContexts = Collections.synchronizedMap(new HashMap<ContextKey, ElectionContext>());
+ private final Map<ContextKey, ElectionContext> electionContexts = Collections.synchronizedMap(new HashMap<>());
private final SolrZkClient zkClient;
private final ZkCmdExecutor cmdExecutor;
@@ -1135,10 +1136,10 @@ public final class ZkController {
if (!ZkStateReader.DOWN.equals(state)) {
String lirState = getLeaderInitiatedRecoveryState(collection, shardId, coreNodeName);
if (lirState != null) {
- if ("active".equals(state)) {
+ if (ZkStateReader.ACTIVE.equals(state)) {
// trying to become active, so leader-initiated state must be recovering
if (ZkStateReader.RECOVERING.equals(lirState)) {
- updateLeaderInitiatedRecoveryState(collection, shardId, coreNodeName, ZkStateReader.ACTIVE);
+ updateLeaderInitiatedRecoveryState(collection, shardId, coreNodeName, ZkStateReader.ACTIVE, null);
} else if (ZkStateReader.DOWN.equals(lirState)) {
throw new SolrException(ErrorCode.INVALID_STATE,
"Cannot publish state of core '"+cd.getName()+"' as active without recovering first!");
@@ -1146,13 +1147,13 @@ public final class ZkController {
} else if (ZkStateReader.RECOVERING.equals(state)) {
// if it is currently DOWN, then trying to enter into recovering state is good
if (ZkStateReader.DOWN.equals(lirState)) {
- updateLeaderInitiatedRecoveryState(collection, shardId, coreNodeName, ZkStateReader.RECOVERING);
+ updateLeaderInitiatedRecoveryState(collection, shardId, coreNodeName, ZkStateReader.RECOVERING, null);
}
}
}
}
- Map<String, Object> props = new HashMap<String, Object>();
+ Map<String, Object> props = new HashMap<>();
props.put(Overseer.QUEUE_OPERATION, "state");
props.put(ZkStateReader.STATE_PROP, state);
props.put(ZkStateReader.BASE_URL_PROP, getBaseUrl());
@@ -1891,9 +1892,11 @@ public final class ZkController {
* to it.
*/
public boolean ensureReplicaInLeaderInitiatedRecovery(final String collection,
- final String shardId, final String replicaUrl, final ZkCoreNodeProps replicaCoreProps, boolean forcePublishState)
+ final String shardId, final ZkCoreNodeProps replicaCoreProps, boolean forcePublishState, String leaderCoreNodeName)
throws KeeperException, InterruptedException
- {
+ {
+ final String replicaUrl = replicaCoreProps.getCoreUrl();
+
if (collection == null)
throw new IllegalArgumentException("collection parameter cannot be null for starting leader-initiated recovery for replica: "+replicaUrl);
@@ -1924,7 +1927,7 @@ public final class ZkController {
// we only really need to try to send the recovery command if the node itself is "live"
if (getZkStateReader().getClusterState().liveNodesContain(replicaNodeName)) {
// create a znode that requires the replica needs to "ack" to verify it knows it was out-of-sync
- updateLeaderInitiatedRecoveryState(collection, shardId, replicaCoreNodeName, ZkStateReader.DOWN);
+ updateLeaderInitiatedRecoveryState(collection, shardId, replicaCoreNodeName, ZkStateReader.DOWN, leaderCoreNodeName);
replicasInLeaderInitiatedRecovery.put(replicaUrl,
getLeaderInitiatedRecoveryZnodePath(collection, shardId, replicaCoreNodeName));
log.info("Put replica core={} coreNodeName={} on "+
@@ -2020,7 +2023,8 @@ public final class ZkController {
return stateObj;
}
- private void updateLeaderInitiatedRecoveryState(String collection, String shardId, String coreNodeName, String state) {
+ private void updateLeaderInitiatedRecoveryState(String collection, String shardId, String coreNodeName, String state,
+ String leaderCoreNodeName) {
if (collection == null || shardId == null || coreNodeName == null) {
log.warn("Cannot set leader-initiated recovery state znode to "+state+" using: collection="+collection+
"; shardId="+shardId+"; coreNodeName="+coreNodeName);
@@ -2034,7 +2038,7 @@ public final class ZkController {
try {
zkClient.delete(znodePath, -1, false);
} catch (Exception justLogIt) {
- log.warn("Failed to delete znode "+znodePath+" due to: "+justLogIt);
+ log.warn("Failed to delete znode " + znodePath, justLogIt);
}
return;
}
@@ -2054,24 +2058,62 @@ public final class ZkController {
stateObj.put("createdByNodeName", String.valueOf(this.nodeName));
byte[] znodeData = ZkStateReader.toJSON(stateObj);
- boolean retryOnConnLoss = true; // be a little more robust when trying to write data
+
try {
- if (zkClient.exists(znodePath, retryOnConnLoss)) {
- zkClient.setData(znodePath, znodeData, retryOnConnLoss);
- } else {
- zkClient.makePath(znodePath, znodeData, retryOnConnLoss);
+ if (ZkStateReader.DOWN.equals(state)) {
+ markShardAsDownIfLeader(collection, shardId, leaderCoreNodeName, znodePath, znodeData);
+ } else {
+ if (zkClient.exists(znodePath, true)) {
+ zkClient.setData(znodePath, znodeData, true);
+ } else {
+ zkClient.makePath(znodePath, znodeData, true);
+ }
}
log.info("Wrote "+state+" to "+znodePath);
} catch (Exception exc) {
if (exc instanceof SolrException) {
throw (SolrException)exc;
} else {
- throw new SolrException(ErrorCode.SERVER_ERROR,
- "Failed to update data to "+state+" for znode: "+znodePath, exc);
+ throw new SolrException(ErrorCode.SERVER_ERROR,
+ "Failed to update data to "+state+" for znode: "+znodePath, exc);
}
}
}
-
+
+ /**
+ * we use ZK's multi-transactional semantics to ensure that we are able to
+ * publish a replica as 'down' only if our leader election node still exists
+ * in ZK. This ensures that a long running network partition caused by GC etc
+ * doesn't let us mark a node as down *after* we've already lost our session
+ */
+ private void markShardAsDownIfLeader(String collection, String shardId, String leaderCoreNodeName,
+ String znodePath, byte[] znodeData) throws KeeperException, InterruptedException {
+ String leaderSeqPath = getLeaderSeqPath(collection, leaderCoreNodeName);
+ if (leaderSeqPath == null) {
+ throw new SolrException(ErrorCode.SERVER_ERROR,
+ "Failed to update data to 'down' for znode: " + znodePath +
+ " because the zookeeper leader sequence for leader: " + leaderCoreNodeName + " is null");
+ }
+ if (zkClient.exists(znodePath, true)) {
+ List<Op> ops = new ArrayList<>(2);
+ ops.add(Op.check(leaderSeqPath, -1)); // version doesn't matter, the seq path is unique
+ ops.add(Op.setData(znodePath, znodeData, -1));
+ zkClient.multi(ops, true);
+ } else {
+ String parentZNodePath = getLeaderInitiatedRecoveryZnodePath(collection, shardId);
+ try {
+ zkClient.makePath(parentZNodePath, true);
+ } catch (KeeperException.NodeExistsException nee) {
+ // if it exists, that's great!
+ }
+ List<Op> ops = new ArrayList<>(2);
+ ops.add(Op.check(leaderSeqPath, -1)); // version doesn't matter, the seq path is unique
+ ops.add(Op.create(znodePath, znodeData, zkClient.getZkACLProvider().getACLsToAdd(znodePath),
+ CreateMode.PERSISTENT));
+ zkClient.multi(ops, true);
+ }
+ }
+
public String getLeaderInitiatedRecoveryZnodePath(String collection, String shardId) {
return "/collections/"+collection+"/leader_initiated_recovery/"+shardId;
}
@@ -2309,4 +2351,9 @@ public final class ZkController {
};
}
+ public String getLeaderSeqPath(String collection, String coreNodeName) {
+ ContextKey key = new ContextKey(collection, coreNodeName);
+ ElectionContext context = electionContexts.get(key);
+ return context != null ? context.leaderSeqPath : null;
+ }
}
Modified: lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java?rev=1666826&r1=1666825&r2=1666826&view=diff
==============================================================================
--- lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java (original)
+++ lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java Sun Mar 15 18:41:25 2015
@@ -872,9 +872,9 @@ public class DistributedUpdateProcessor
sendRecoveryCommand =
zkController.ensureReplicaInLeaderInitiatedRecovery(collection,
shardId,
- replicaUrl,
stdNode.getNodeProps(),
- false);
+ false,
+ leaderCoreNodeName);
// we want to try more than once, ~10 minutes
if (sendRecoveryCommand) {
Modified: lucene/dev/branches/branch_5x/solr/core/src/test/org/apache/solr/cloud/HttpPartitionTest.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_5x/solr/core/src/test/org/apache/solr/cloud/HttpPartitionTest.java?rev=1666826&r1=1666825&r2=1666826&view=diff
==============================================================================
--- lucene/dev/branches/branch_5x/solr/core/src/test/org/apache/solr/cloud/HttpPartitionTest.java (original)
+++ lucene/dev/branches/branch_5x/solr/core/src/test/org/apache/solr/cloud/HttpPartitionTest.java Sun Mar 15 18:41:25 2015
@@ -146,7 +146,7 @@ public class HttpPartitionTest extends A
String replicaUrl = replicaCoreNodeProps.getCoreUrl();
assertTrue(!zkController.isReplicaInRecoveryHandling(replicaUrl));
- assertTrue(zkController.ensureReplicaInLeaderInitiatedRecovery(testCollectionName, shardId, replicaUrl, replicaCoreNodeProps, false));
+ assertTrue(zkController.ensureReplicaInLeaderInitiatedRecovery(testCollectionName, shardId, replicaCoreNodeProps, false, leader.getName()));
assertTrue(zkController.isReplicaInRecoveryHandling(replicaUrl));
Map<String,Object> lirStateMap = zkController.getLeaderInitiatedRecoveryStateObject(testCollectionName, shardId, notLeader.getName());
assertNotNull(lirStateMap);
Modified: lucene/dev/branches/branch_5x/solr/core/src/test/org/apache/solr/cloud/ZkControllerTest.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_5x/solr/core/src/test/org/apache/solr/cloud/ZkControllerTest.java?rev=1666826&r1=1666825&r2=1666826&view=diff
==============================================================================
--- lucene/dev/branches/branch_5x/solr/core/src/test/org/apache/solr/cloud/ZkControllerTest.java (original)
+++ lucene/dev/branches/branch_5x/solr/core/src/test/org/apache/solr/cloud/ZkControllerTest.java Sun Mar 15 18:41:25 2015
@@ -19,8 +19,10 @@ package org.apache.solr.cloud;
import org.apache.lucene.util.LuceneTestCase.Slow;
import org.apache.solr.SolrTestCaseJ4;
+import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.cloud.SolrZkClient;
import org.apache.solr.common.cloud.ZkConfigManager;
+import org.apache.solr.common.cloud.ZkCoreNodeProps;
import org.apache.solr.common.cloud.ZkNodeProps;
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.core.CloudConfig;
@@ -244,6 +246,65 @@ public class ZkControllerTest extends So
}
}
+ public void testEnsureReplicaInLeaderInitiatedRecovery() throws Exception {
+ String zkDir = createTempDir("testEnsureReplicaInLeaderInitiatedRecovery").toFile().getAbsolutePath();
+ CoreContainer cc = null;
+
+ ZkTestServer server = new ZkTestServer(zkDir);
+ try {
+ server.run();
+
+ AbstractZkTestCase.tryCleanSolrZkNode(server.getZkHost());
+ AbstractZkTestCase.makeSolrZkNode(server.getZkHost());
+
+ cc = getCoreContainer();
+ ZkController zkController = null;
+
+ try {
+ CloudConfig cloudConfig = new CloudConfig.CloudConfigBuilder("127.0.0.1", 8983, "solr").build();
+ zkController = new ZkController(cc, server.getZkAddress(), TIMEOUT, cloudConfig, new CurrentCoreDescriptorProvider() {
+
+ @Override
+ public List<CoreDescriptor> getCurrentDescriptors() {
+ // do nothing
+ return null;
+ }
+ });
+ HashMap<String, Object> propMap = new HashMap<>();
+ propMap.put(ZkStateReader.BASE_URL_PROP, "http://127.0.0.1:8983/solr");
+ propMap.put(ZkStateReader.CORE_NAME_PROP, "replica1");
+ propMap.put(ZkStateReader.NODE_NAME_PROP, "127.0.0.1:8983_solr");
+ Replica replica = new Replica("replica1", propMap);
+ try {
+ // this method doesn't throw exception when node isn't leader
+ zkController.ensureReplicaInLeaderInitiatedRecovery("c1", "shard1",
+ new ZkCoreNodeProps(replica), false, "non_existent_leader");
+ fail("ZkController should not write LIR state for node which is not leader");
+ } catch (Exception e) {
+ assertNull("ZkController should not write LIR state for node which is not leader",
+ zkController.getLeaderInitiatedRecoveryState("c1", "shard1", "replica1"));
+ }
+ } finally {
+ if (zkController != null)
+ zkController.close();
+ }
+ } finally {
+ if (cc != null) {
+ cc.shutdown();
+ }
+ server.shutdown();
+ }
+ }
+
+ /*
+ Test that:
+ 1) LIR state to 'down' is not set unless publishing node is a leader
+ 1a) Test that leader can publish when LIR node already exists in zk
+ 1b) Test that leader can publish when LIR node does not exist
+ 2) LIR state to 'active' or 'recovery' can be set regardless of whether publishing
+ node is leader or not
+ */
+
private CoreContainer getCoreContainer() {
return new MockCoreContainer();
}
Modified: lucene/dev/branches/branch_5x/solr/solrj/src/java/org/apache/solr/common/cloud/SolrZkClient.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_5x/solr/solrj/src/java/org/apache/solr/common/cloud/SolrZkClient.java?rev=1666826&r1=1666825&r2=1666826&view=diff
==============================================================================
--- lucene/dev/branches/branch_5x/solr/solrj/src/java/org/apache/solr/common/cloud/SolrZkClient.java (original)
+++ lucene/dev/branches/branch_5x/solr/solrj/src/java/org/apache/solr/common/cloud/SolrZkClient.java Sun Mar 15 18:41:25 2015
@@ -29,6 +29,8 @@ import org.apache.zookeeper.KeeperExcept
import org.apache.zookeeper.KeeperException.NoNodeException;
import org.apache.zookeeper.KeeperException.NodeExistsException;
import org.apache.zookeeper.KeeperException.NotEmptyException;
+import org.apache.zookeeper.Op;
+import org.apache.zookeeper.OpResult;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
@@ -560,6 +562,19 @@ public class SolrZkClient implements Clo
return setData(path, data, retryOnConnLoss);
}
+ public List<OpResult> multi(final Iterable<Op> ops, boolean retryOnConnLoss) throws InterruptedException, KeeperException {
+ if (retryOnConnLoss) {
+ return zkCmdExecutor.retryOperation(new ZkOperation() {
+ @Override
+ public List<OpResult> execute() throws KeeperException, InterruptedException {
+ return keeper.multi(ops);
+ }
+ });
+ } else {
+ return keeper.multi(ops);
+ }
+ }
+
/**
* Fills string with printout of current ZooKeeper layout.
*/
@@ -740,4 +755,7 @@ public class SolrZkClient implements Clo
return zkServerAddress;
}
+ public ZkACLProvider getZkACLProvider() {
+ return zkACLProvider;
+ }
}