You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by sh...@apache.org on 2015/09/09 20:07:46 UTC
svn commit: r1702067 - in /lucene/dev/trunk/solr: ./
core/src/java/org/apache/solr/cloud/
core/src/java/org/apache/solr/update/processor/
core/src/test/org/apache/solr/cloud/
Author: shalin
Date: Wed Sep 9 18:07:45 2015
New Revision: 1702067
URL: http://svn.apache.org/r1702067
Log:
SOLR-7819: ZK connection loss or session timeout do not stall indexing threads anymore and LIR activity is moved to a background thread
Added:
lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/TestLeaderInitiatedRecoveryThread.java (with props)
Modified:
lucene/dev/trunk/solr/CHANGES.txt
lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java
lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/LeaderInitiatedRecoveryThread.java
lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/ZkController.java
lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java
lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/HttpPartitionTest.java
lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/ZkControllerTest.java
Modified: lucene/dev/trunk/solr/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/CHANGES.txt?rev=1702067&r1=1702066&r2=1702067&view=diff
==============================================================================
--- lucene/dev/trunk/solr/CHANGES.txt (original)
+++ lucene/dev/trunk/solr/CHANGES.txt Wed Sep 9 18:07:45 2015
@@ -182,6 +182,10 @@ Bug Fixes
* SOLR-8001: Fixed bugs in field(foo,min) and field(foo,max) when some docs have no values
(David Smiley, hossman)
+* SOLR-7819: ZK connection loss or session timeout do not stall indexing threads anymore. All activity
+ related to leader initiated recovery is performed by a dedicated LIR thread in the background.
+ (Ramkumar Aiyengar, shalin)
+
Optimizations
----------------------
Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java?rev=1702067&r1=1702066&r2=1702067&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java Wed Sep 9 18:07:45 2015
@@ -4,7 +4,6 @@ import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
-import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.fs.Path;
@@ -467,20 +466,9 @@ final class ShardLeaderElectionContext e
}
}
- LeaderInitiatedRecoveryThread lirThread =
- new LeaderInitiatedRecoveryThread(zkController,
- cc,
- collection,
- shardId,
- coreNodeProps,
- 120,
- coreNodeName);
- zkController.ensureReplicaInLeaderInitiatedRecovery(
+ zkController.ensureReplicaInLeaderInitiatedRecovery(cc,
collection, shardId, coreNodeProps, coreNodeName,
- false /* forcePublishState */, true /* retryOnConnLoss */);
-
- ExecutorService executor = cc.getUpdateShardHandler().getUpdateExecutor();
- executor.execute(lirThread);
+ false /* forcePublishState */);
}
}
}
Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/LeaderInitiatedRecoveryThread.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/LeaderInitiatedRecoveryThread.java?rev=1702067&r1=1702066&r2=1702067&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/LeaderInitiatedRecoveryThread.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/LeaderInitiatedRecoveryThread.java Wed Sep 9 18:07:45 2015
@@ -8,9 +8,12 @@ import org.apache.solr.common.SolrExcept
import org.apache.solr.common.SolrException.ErrorCode;
import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.cloud.ZkCoreNodeProps;
+import org.apache.solr.common.cloud.ZkNodeProps;
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.params.CoreAdminParams.CoreAdminAction;
+import org.apache.solr.common.util.Utils;
import org.apache.solr.core.CoreContainer;
+import org.apache.zookeeper.KeeperException;
import org.apache.solr.util.RTimer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -75,24 +78,108 @@ public class LeaderInitiatedRecoveryThre
public void run() {
RTimer timer = new RTimer();
- try {
- sendRecoveryCommandWithRetry();
- } catch (Exception exc) {
- log.error(getName()+" failed due to: "+exc, exc);
- if (exc instanceof SolrException) {
- throw (SolrException)exc;
- } else {
- throw new SolrException(ErrorCode.SERVER_ERROR, exc);
+
+ String replicaCoreName = nodeProps.getCoreName();
+ String replicaCoreNodeName = ((Replica) nodeProps.getNodeProps()).getName();
+ String replicaNodeName = nodeProps.getNodeName();
+ final String replicaUrl = nodeProps.getCoreUrl();
+
+ if (!zkController.isReplicaInRecoveryHandling(replicaUrl)) {
+ throw new SolrException(ErrorCode.INVALID_STATE, "Replica: " + replicaUrl + " should have been marked under leader initiated recovery in ZkController but wasn't.");
+ }
+
+ boolean sendRecoveryCommand = publishDownState(replicaCoreName, replicaCoreNodeName, replicaNodeName, replicaUrl, false);
+
+ if (sendRecoveryCommand) {
+ try {
+ sendRecoveryCommandWithRetry();
+ } catch (Exception exc) {
+ log.error(getName()+" failed due to: "+exc, exc);
+ if (exc instanceof SolrException) {
+ throw (SolrException)exc;
+ } else {
+ throw new SolrException(ErrorCode.SERVER_ERROR, exc);
+ }
+ } finally {
+ zkController.removeReplicaFromLeaderInitiatedRecoveryHandling(replicaUrl);
}
+ } else {
+ // replica is no longer in recovery on this node (may be handled on another node)
+ zkController.removeReplicaFromLeaderInitiatedRecoveryHandling(replicaUrl);
}
log.info("{} completed successfully after running for {}ms", getName(), timer.getTime());
}
-
+
+ public boolean publishDownState(String replicaCoreName, String replicaCoreNodeName, String replicaNodeName, String replicaUrl, boolean forcePublishState) {
+ boolean sendRecoveryCommand = true;
+ boolean publishDownState = false;
+
+ if (zkController.getZkStateReader().getClusterState().liveNodesContain(replicaNodeName)) {
+ try {
+ // create a znode that requires the replica needs to "ack" to verify it knows it was out-of-sync
+ updateLIRState(replicaCoreNodeName);
+
+ log.info("Put replica core={} coreNodeName={} on " +
+ replicaNodeName + " into leader-initiated recovery.", replicaCoreName, replicaCoreNodeName);
+ publishDownState = true;
+ } catch (Exception e) {
+ Throwable setLirZnodeFailedCause = SolrException.getRootCause(e);
+ log.error("Leader failed to set replica " +
+ nodeProps.getCoreUrl() + " state to DOWN due to: " + setLirZnodeFailedCause, setLirZnodeFailedCause);
+ if (setLirZnodeFailedCause instanceof KeeperException.SessionExpiredException
+ || setLirZnodeFailedCause instanceof KeeperException.ConnectionLossException
+ || setLirZnodeFailedCause instanceof ZkController.NotLeaderException) {
+ // our session is expired, which means our state is suspect, so don't go
+ // putting other replicas in recovery (see SOLR-6511)
+ sendRecoveryCommand = false;
+ forcePublishState = false; // no need to force publish any state in this case
+ } // else will go ahead and try to send the recovery command once after this error
+ }
+ } else {
+ log.info("Node " + replicaNodeName +
+ " is not live, so skipping leader-initiated recovery for replica: core={} coreNodeName={}",
+ replicaCoreName, replicaCoreNodeName);
+ // publishDownState will be false to avoid publishing the "down" state too many times
+ // as many errors can occur together and will each call into this method (SOLR-6189)
+ forcePublishState = false; // no need to force publish the state because replica is not live
+ sendRecoveryCommand = false; // no need to send recovery messages as well
+ }
+
+ try {
+ if (publishDownState || forcePublishState) {
+ ZkNodeProps m = new ZkNodeProps(Overseer.QUEUE_OPERATION, "state",
+ ZkStateReader.STATE_PROP, Replica.State.DOWN.toString(),
+ ZkStateReader.BASE_URL_PROP, nodeProps.getBaseUrl(),
+ ZkStateReader.CORE_NAME_PROP, nodeProps.getCoreName(),
+ ZkStateReader.NODE_NAME_PROP, nodeProps.getNodeName(),
+ ZkStateReader.SHARD_ID_PROP, shardId,
+ ZkStateReader.COLLECTION_PROP, collection);
+ log.warn("Leader is publishing core={} coreNodeName ={} state={} on behalf of un-reachable replica {}",
+ replicaCoreName, replicaCoreNodeName, Replica.State.DOWN.toString(), replicaUrl);
+ zkController.getOverseerJobQueue().offer(Utils.toJSON(m));
+ }
+ } catch (Exception e) {
+ log.error("Could not publish 'down' state for replicaUrl: {}", replicaUrl, e);
+ }
+
+ return sendRecoveryCommand;
+ }
+
+ /*
+ protected scope for testing purposes
+ */
+ protected void updateLIRState(String replicaCoreNodeName) {
+ zkController.updateLeaderInitiatedRecoveryState(collection,
+ shardId,
+ replicaCoreNodeName, Replica.State.DOWN, leaderCoreNodeName, true);
+ }
+
protected void sendRecoveryCommandWithRetry() throws Exception {
int tries = 0;
long waitBetweenTriesMs = 5000L;
boolean continueTrying = true;
-
+
+ String replicaCoreName = nodeProps.getCoreName();
String recoveryUrl = nodeProps.getBaseUrl();
String replicaNodeName = nodeProps.getNodeName();
String coreNeedingRecovery = nodeProps.getCoreName();
@@ -224,11 +311,8 @@ public class LeaderInitiatedRecoveryThre
// OK, so the replica thinks it is active, but it never ack'd the leader initiated recovery
// so its state cannot be trusted and it needs to be told to recover again ... and we keep looping here
log.warn("Replica core={} coreNodeName={} set to active but the leader thinks it should be in recovery;"
- + " forcing it back to down state to re-run the leader-initiated recovery process; props: "+replicaProps.get(0), coreNeedingRecovery, replicaCoreNodeName);
- zkController.ensureReplicaInLeaderInitiatedRecovery(
- collection, shardId, nodeProps, leaderCoreNodeName,
- true /* forcePublishState */, true /* retryOnConnLoss */
- );
+ + " forcing it back to down state to re-run the leader-initiated recovery process; props: " + replicaProps.get(0), coreNeedingRecovery, replicaCoreNodeName);
+ publishDownState(replicaCoreName, replicaCoreNodeName, replicaNodeName, replicaUrl, true);
}
}
break;
Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/ZkController.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/ZkController.java?rev=1702067&r1=1702066&r2=1702067&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/ZkController.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/ZkController.java Wed Sep 9 18:07:45 2015
@@ -17,8 +17,6 @@ package org.apache.solr.cloud;
* limitations under the License.
*/
-import static org.apache.solr.common.cloud.ZkStateReader.*;
-
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.net.InetAddress;
@@ -46,6 +44,7 @@ import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
+import com.google.common.base.Strings;
import org.apache.commons.lang.StringUtils;
import org.apache.solr.client.solrj.impl.HttpSolrClient;
import org.apache.solr.client.solrj.request.CoreAdminRequest.WaitForState;
@@ -98,8 +97,16 @@ import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.slf4j.MDC;
-import com.google.common.base.Strings;
+import static org.apache.solr.common.cloud.ZkStateReader.BASE_URL_PROP;
+import static org.apache.solr.common.cloud.ZkStateReader.COLLECTION_PROP;
+import static org.apache.solr.common.cloud.ZkStateReader.CORE_NAME_PROP;
+import static org.apache.solr.common.cloud.ZkStateReader.ELECTION_NODE_PROP;
+import static org.apache.solr.common.cloud.ZkStateReader.NODE_NAME_PROP;
+import static org.apache.solr.common.cloud.ZkStateReader.REJOIN_AT_HEAD_PROP;
+import static org.apache.solr.common.cloud.ZkStateReader.SHARD_ID_PROP;
+import static org.apache.solr.common.cloud.ZkStateReader.CORE_NODE_NAME_PROP;
/**
* Handle ZooKeeper interactions.
@@ -1198,7 +1205,7 @@ public final class ZkController {
if (state == Replica.State.ACTIVE) {
// trying to become active, so leader-initiated state must be recovering
if (lirState == Replica.State.RECOVERING) {
- updateLeaderInitiatedRecoveryState(collection, shardId, coreNodeName, Replica.State.ACTIVE, null);
+ updateLeaderInitiatedRecoveryState(collection, shardId, coreNodeName, Replica.State.ACTIVE, null, true);
} else if (lirState == Replica.State.DOWN) {
throw new SolrException(ErrorCode.INVALID_STATE,
"Cannot publish state of core '" + cd.getName() + "' as active without recovering first!");
@@ -1206,7 +1213,7 @@ public final class ZkController {
} else if (state == Replica.State.RECOVERING) {
// if it is currently DOWN, then trying to enter into recovering state is good
if (lirState == Replica.State.DOWN) {
- updateLeaderInitiatedRecoveryState(collection, shardId, coreNodeName, Replica.State.RECOVERING, null);
+ updateLeaderInitiatedRecoveryState(collection, shardId, coreNodeName, Replica.State.RECOVERING, null, true);
}
}
}
@@ -1972,8 +1979,9 @@ public final class ZkController {
* to it.
*/
public boolean ensureReplicaInLeaderInitiatedRecovery(
+ final CoreContainer container,
final String collection, final String shardId, final ZkCoreNodeProps replicaCoreProps,
- String leaderCoreNodeName, boolean forcePublishState, boolean retryOnConnLoss)
+ String leaderCoreNodeName, boolean forcePublishState)
throws KeeperException, InterruptedException {
final String replicaUrl = replicaCoreProps.getCoreUrl();
@@ -1991,7 +1999,6 @@ public final class ZkController {
// about the same replica having trouble and we only need to send the "needs"
// recovery signal once
boolean nodeIsLive = true;
- boolean publishDownState = false;
String replicaNodeName = replicaCoreProps.getNodeName();
String replicaCoreNodeName = ((Replica) replicaCoreProps.getNodeProps()).getName();
assert replicaCoreNodeName != null : "No core name for replica " + replicaNodeName;
@@ -2003,16 +2010,30 @@ public final class ZkController {
}
}
- // if the replica's state is not DOWN right now, make it so ...
- // we only really need to try to send the recovery command if the node itself is "live"
+ // we only really need to try to start the LIR process if the node itself is "live"
if (getZkStateReader().getClusterState().liveNodesContain(replicaNodeName)) {
+
+ LeaderInitiatedRecoveryThread lirThread =
+ new LeaderInitiatedRecoveryThread(this,
+ container,
+ collection,
+ shardId,
+ replicaCoreProps,
+ 120,
+ leaderCoreNodeName); // core node name of current leader
+ ExecutorService executor = container.getUpdateShardHandler().getUpdateExecutor();
+ try {
+ MDC.put("DistributedUpdateProcessor.replicaUrlToRecover", replicaCoreProps.getCoreUrl());
+ executor.execute(lirThread);
+ } finally {
+ MDC.remove("DistributedUpdateProcessor.replicaUrlToRecover");
+ }
+
// create a znode that requires the replica needs to "ack" to verify it knows it was out-of-sync
- updateLeaderInitiatedRecoveryState(collection, shardId, replicaCoreNodeName, Replica.State.DOWN, leaderCoreNodeName);
replicasInLeaderInitiatedRecovery.put(replicaUrl,
getLeaderInitiatedRecoveryZnodePath(collection, shardId, replicaCoreNodeName));
log.info("Put replica core={} coreNodeName={} on " +
replicaNodeName + " into leader-initiated recovery.", replicaCoreProps.getCoreName(), replicaCoreNodeName);
- publishDownState = true;
} else {
nodeIsLive = false; // we really don't need to send the recovery request if the node is NOT live
log.info("Node " + replicaNodeName +
@@ -2023,20 +2044,6 @@ public final class ZkController {
}
}
- if (publishDownState || forcePublishState) {
- String replicaCoreName = replicaCoreProps.getCoreName();
- ZkNodeProps m = new ZkNodeProps(Overseer.QUEUE_OPERATION, "state",
- ZkStateReader.STATE_PROP, Replica.State.DOWN.toString(),
- ZkStateReader.BASE_URL_PROP, replicaCoreProps.getBaseUrl(),
- ZkStateReader.CORE_NAME_PROP, replicaCoreProps.getCoreName(),
- ZkStateReader.NODE_NAME_PROP, replicaCoreProps.getNodeName(),
- ZkStateReader.SHARD_ID_PROP, shardId,
- ZkStateReader.COLLECTION_PROP, collection);
- log.warn("Leader is publishing core={} coreNodeName ={} state={} on behalf of un-reachable replica {}; forcePublishState? " + forcePublishState,
- replicaCoreName, replicaCoreNodeName, Replica.State.DOWN.toString(), replicaUrl);
- overseerJobQueue.offer(Utils.toJSON(m));
- }
-
return nodeIsLive;
}
@@ -2107,8 +2114,8 @@ public final class ZkController {
return stateObj;
}
- private void updateLeaderInitiatedRecoveryState(String collection, String shardId, String coreNodeName,
- Replica.State state, String leaderCoreNodeName) {
+ public void updateLeaderInitiatedRecoveryState(String collection, String shardId, String coreNodeName,
+ Replica.State state, String leaderCoreNodeName, boolean retryOnConnLoss) {
if (collection == null || shardId == null || coreNodeName == null) {
log.warn("Cannot set leader-initiated recovery state znode to "
+ state.toString() + " using: collection=" + collection
@@ -2121,7 +2128,7 @@ public final class ZkController {
if (state == Replica.State.ACTIVE) {
// since we're marking it active, we don't need this znode anymore, so delete instead of update
try {
- zkClient.delete(znodePath, -1, false);
+ zkClient.delete(znodePath, -1, retryOnConnLoss);
} catch (Exception justLogIt) {
log.warn("Failed to delete znode " + znodePath, justLogIt);
}
@@ -2134,24 +2141,30 @@ public final class ZkController {
} catch (Exception exc) {
log.warn(exc.getMessage(), exc);
}
- if (stateObj == null)
+ if (stateObj == null) {
stateObj = Utils.makeMap();
+ }
stateObj.put(ZkStateReader.STATE_PROP, state.toString());
// only update the createdBy value if it's not set
- if (stateObj.get("createdByNodeName") == null)
- stateObj.put("createdByNodeName", String.valueOf(this.nodeName));
+ if (stateObj.get("createdByNodeName") == null) {
+ stateObj.put("createdByNodeName", this.nodeName);
+ }
+ if (stateObj.get("createdByCoreNodeName") == null && leaderCoreNodeName != null) {
+ stateObj.put("createdByCoreNodeName", leaderCoreNodeName);
+ }
byte[] znodeData = Utils.toJSON(stateObj);
try {
if (state == Replica.State.DOWN) {
- markShardAsDownIfLeader(collection, shardId, leaderCoreNodeName, znodePath, znodeData);
+ markShardAsDownIfLeader(collection, shardId, leaderCoreNodeName, znodePath, znodeData, retryOnConnLoss);
} else {
+ // must retry on conn loss otherwise future election attempts may assume wrong LIR state
if (zkClient.exists(znodePath, true)) {
- zkClient.setData(znodePath, znodeData, true);
+ zkClient.setData(znodePath, znodeData, retryOnConnLoss);
} else {
- zkClient.makePath(znodePath, znodeData, true);
+ zkClient.makePath(znodePath, znodeData, retryOnConnLoss);
}
}
log.info("Wrote {} to {}", state.toString(), znodePath);
@@ -2172,22 +2185,23 @@ public final class ZkController {
* doesn't let us mark a node as down *after* we've already lost our session
*/
private void markShardAsDownIfLeader(String collection, String shardId, String leaderCoreNodeName,
- String znodePath, byte[] znodeData) throws KeeperException, InterruptedException {
+ String znodePath, byte[] znodeData,
+ boolean retryOnConnLoss) throws KeeperException, InterruptedException {
String leaderSeqPath = getLeaderSeqPath(collection, leaderCoreNodeName);
if (leaderSeqPath == null) {
- throw new SolrException(ErrorCode.SERVER_ERROR,
+ throw new NotLeaderException(ErrorCode.SERVER_ERROR,
"Failed to update data to 'down' for znode: " + znodePath +
" because the zookeeper leader sequence for leader: " + leaderCoreNodeName + " is null");
}
- if (zkClient.exists(znodePath, true)) {
+ if (zkClient.exists(znodePath, retryOnConnLoss)) {
List<Op> ops = new ArrayList<>(2);
ops.add(Op.check(leaderSeqPath, -1)); // version doesn't matter, the seq path is unique
ops.add(Op.setData(znodePath, znodeData, -1));
- zkClient.multi(ops, true);
+ zkClient.multi(ops, retryOnConnLoss);
} else {
String parentZNodePath = getLeaderInitiatedRecoveryZnodePath(collection, shardId);
try {
- zkClient.makePath(parentZNodePath, true);
+ zkClient.makePath(parentZNodePath, retryOnConnLoss);
} catch (KeeperException.NodeExistsException nee) {
// if it exists, that's great!
}
@@ -2195,7 +2209,7 @@ public final class ZkController {
ops.add(Op.check(leaderSeqPath, -1)); // version doesn't matter, the seq path is unique
ops.add(Op.create(znodePath, znodeData, zkClient.getZkACLProvider().getACLsToAdd(znodePath),
CreateMode.PERSISTENT));
- zkClient.multi(ops, true);
+ zkClient.multi(ops, retryOnConnLoss);
}
}
@@ -2473,4 +2487,13 @@ public final class ZkController {
ElectionContext context = electionContexts.get(key);
return context != null ? context.leaderSeqPath : null;
}
+
+ /**
+ * Thrown during leader initiated recovery process if current node is not leader
+ */
+ public static class NotLeaderException extends SolrException {
+ public NotLeaderException(ErrorCode code, String msg) {
+ super(code, msg);
+ }
+ }
}
Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java?rev=1702067&r1=1702066&r2=1702067&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java Wed Sep 9 18:07:45 2015
@@ -17,6 +17,21 @@ package org.apache.solr.update.processor
* limitations under the License.
*/
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.ReentrantLock;
+
import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.CharsRefBuilder;
import org.apache.solr.client.solrj.request.UpdateRequest;
@@ -49,7 +64,6 @@ import org.apache.solr.common.params.Upd
import org.apache.solr.common.util.Hash;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.common.util.Utils;
-import org.apache.solr.core.CoreContainer;
import org.apache.solr.core.CoreDescriptor;
import org.apache.solr.handler.component.RealTimeGetComponent;
import org.apache.solr.request.SolrQueryRequest;
@@ -72,23 +86,6 @@ import org.apache.solr.update.VersionInf
import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.slf4j.MDC;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Set;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.locks.ReentrantLock;
import static org.apache.solr.update.processor.DistributingUpdateProcessorFactory.DISTRIB_UPDATE_PARAM;
@@ -836,8 +833,6 @@ public class DistributedUpdateProcessor
break;
}
- int maxTries = 1;
- boolean sendRecoveryCommand = true;
String collection = null;
String shardId = null;
@@ -878,33 +873,24 @@ public class DistributedUpdateProcessor
if (cloudDesc.getCoreNodeName().equals(leaderCoreNodeName) && foundErrorNodeInReplicaList) {
try {
// if false, then the node is probably not "live" anymore
- sendRecoveryCommand =
- zkController.ensureReplicaInLeaderInitiatedRecovery(collection,
- shardId,
- stdNode.getNodeProps(),
- leaderCoreNodeName,
- false /* forcePublishState */,
- false /* retryOnConnLoss */
- );
-
- // we want to try more than once, ~10 minutes
- if (sendRecoveryCommand) {
- maxTries = 120;
- } // else the node is no longer "live" so no need to send any recovery command
+ // and we do not need to send a recovery message
+ Throwable rootCause = SolrException.getRootCause(error.e);
+ log.error("Setting up to try to start recovery on replica {}", replicaUrl, rootCause);
+ zkController.ensureReplicaInLeaderInitiatedRecovery(
+ req.getCore().getCoreDescriptor().getCoreContainer(),
+ collection,
+ shardId,
+ stdNode.getNodeProps(),
+ leaderCoreNodeName,
+ false /* forcePublishState */
+ );
} catch (Exception exc) {
Throwable setLirZnodeFailedCause = SolrException.getRootCause(exc);
log.error("Leader failed to set replica " +
error.req.node.getUrl() + " state to DOWN due to: " + setLirZnodeFailedCause, setLirZnodeFailedCause);
- if (setLirZnodeFailedCause instanceof KeeperException.SessionExpiredException ||
- setLirZnodeFailedCause instanceof KeeperException.ConnectionLossException) {
- // our session is expired, which means our state is suspect, so don't go
- // putting other replicas in recovery (see SOLR-6511)
- sendRecoveryCommand = false;
- } // else will go ahead and try to send the recovery command once after this error
}
} else {
// not the leader anymore maybe or the error'd node is not my replica?
- sendRecoveryCommand = false;
if (!foundErrorNodeInReplicaList) {
log.warn("Core "+cloudDesc.getCoreNodeName()+" belonging to "+collection+" "+
shardId+", does not have error'd node " + stdNode.getNodeProps().getCoreUrl() + " as a replica. " +
@@ -914,30 +900,6 @@ public class DistributedUpdateProcessor
shardId+", no request recovery command will be sent!");
}
}
- } // else not a StdNode, recovery command still gets sent once
-
- if (!sendRecoveryCommand)
- continue; // the replica is already in recovery handling or is not live
-
- Throwable rootCause = SolrException.getRootCause(error.e);
- log.error("Setting up to try to start recovery on replica {}", replicaUrl, rootCause);
-
- // try to send the recovery command to the downed replica in a background thread
- CoreContainer coreContainer = req.getCore().getCoreDescriptor().getCoreContainer();
- LeaderInitiatedRecoveryThread lirThread =
- new LeaderInitiatedRecoveryThread(zkController,
- coreContainer,
- collection,
- shardId,
- error.req.node.getNodeProps(),
- maxTries,
- cloudDesc.getCoreNodeName()); // core node name of current leader
- ExecutorService executor = coreContainer.getUpdateShardHandler().getUpdateExecutor();
- try {
- MDC.put("DistributedUpdateProcessor.replicaUrlToRecover", error.req.node.getNodeProps().getCoreUrl());
- executor.execute(lirThread);
- } finally {
- MDC.remove("DistributedUpdateProcessor.replicaUrlToRecover");
}
}
Modified: lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/HttpPartitionTest.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/HttpPartitionTest.java?rev=1702067&r1=1702066&r2=1702067&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/HttpPartitionTest.java (original)
+++ lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/HttpPartitionTest.java Wed Sep 9 18:07:45 2015
@@ -140,14 +140,10 @@ public class HttpPartitionTest extends A
ZkCoreNodeProps replicaCoreNodeProps = new ZkCoreNodeProps(notLeader);
String replicaUrl = replicaCoreNodeProps.getCoreUrl();
- assertTrue(!zkController.isReplicaInRecoveryHandling(replicaUrl));
- assertTrue(zkController.ensureReplicaInLeaderInitiatedRecovery(testCollectionName, shardId, replicaCoreNodeProps, leader.getName(), false, true));
- assertTrue(zkController.isReplicaInRecoveryHandling(replicaUrl));
+ zkController.updateLeaderInitiatedRecoveryState(testCollectionName, shardId, notLeader.getName(), Replica.State.DOWN, leader.getName(), true);
Map<String,Object> lirStateMap = zkController.getLeaderInitiatedRecoveryStateObject(testCollectionName, shardId, notLeader.getName());
assertNotNull(lirStateMap);
assertSame(Replica.State.DOWN, Replica.State.getState((String) lirStateMap.get(ZkStateReader.STATE_PROP)));
- zkController.removeReplicaFromLeaderInitiatedRecoveryHandling(replicaUrl);
- assertTrue(!zkController.isReplicaInRecoveryHandling(replicaUrl));
// test old non-json format handling
SolrZkClient zkClient = zkController.getZkClient();
Added: lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/TestLeaderInitiatedRecoveryThread.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/TestLeaderInitiatedRecoveryThread.java?rev=1702067&view=auto
==============================================================================
--- lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/TestLeaderInitiatedRecoveryThread.java (added)
+++ lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/TestLeaderInitiatedRecoveryThread.java Wed Sep 9 18:07:45 2015
@@ -0,0 +1,224 @@
+package org.apache.solr.cloud;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.util.concurrent.TimeUnit;
+
+import org.apache.solr.SolrTestCaseJ4;
+import org.apache.solr.common.SolrException;
+import org.apache.solr.common.cloud.Replica;
+import org.apache.solr.common.cloud.ZkCoreNodeProps;
+import org.apache.solr.servlet.SolrDispatchFilter;
+import org.apache.solr.util.TimeOut;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.data.Stat;
+
+/**
+ * Test for {@link LeaderInitiatedRecoveryThread}
+ */
+@SolrTestCaseJ4.SuppressSSL
+public class TestLeaderInitiatedRecoveryThread extends AbstractFullDistribZkTestBase {
+
+ public TestLeaderInitiatedRecoveryThread() {
+ sliceCount = 1;
+ fixShardCount(2);
+ }
+
+ public void testPublishDownState() throws Exception {
+ waitForRecoveriesToFinish(true);
+
+ final String leaderCoreNodeName = shardToLeaderJetty.get(SHARD1).coreNodeName;
+ final CloudJettyRunner leaderRunner = shardToLeaderJetty.get(SHARD1);
+ SolrDispatchFilter filter = (SolrDispatchFilter) leaderRunner.jetty.getDispatchFilter().getFilter();
+ ZkController zkController = filter.getCores().getZkController();
+
+ CloudJettyRunner notLeader = null;
+ for (CloudJettyRunner cloudJettyRunner : shardToJetty.get(SHARD1)) {
+ if (cloudJettyRunner != leaderRunner) {
+ notLeader = cloudJettyRunner;
+ break;
+ }
+ }
+ assertNotNull(notLeader);
+ Replica replica = cloudClient.getZkStateReader().getClusterState().getReplica(DEFAULT_COLLECTION, notLeader.coreNodeName);
+ ZkCoreNodeProps replicaCoreNodeProps = new ZkCoreNodeProps(replica);
+
+ /*
+ 1. Test that publishDownState throws exception when zkController.isReplicaInRecoveryHandling == false
+ */
+ try {
+ LeaderInitiatedRecoveryThread thread = new LeaderInitiatedRecoveryThread(zkController, filter.getCores(),
+ DEFAULT_COLLECTION, SHARD1, replicaCoreNodeProps, 1, leaderCoreNodeName);
+ assertFalse(zkController.isReplicaInRecoveryHandling(replicaCoreNodeProps.getCoreUrl()));
+ thread.run();
+ fail("publishDownState should not have succeeded because replica url is not marked in leader initiated recovery in ZkController");
+ } catch (SolrException e) {
+ assertTrue(e.code() == SolrException.ErrorCode.INVALID_STATE.code);
+ }
+
+
+ /*
+ 2. Test that a non-live replica cannot be put into LIR or down state
+ */
+ LeaderInitiatedRecoveryThread thread = new LeaderInitiatedRecoveryThread(zkController, filter.getCores(),
+ DEFAULT_COLLECTION, SHARD1, replicaCoreNodeProps, 1, leaderCoreNodeName);
+ // kill the replica
+ int children = cloudClient.getZkStateReader().getZkClient().getChildren("/live_nodes", null, true).size();
+ ChaosMonkey.stop(notLeader.jetty);
+ TimeOut timeOut = new TimeOut(60, TimeUnit.SECONDS);
+ while (!timeOut.hasTimedOut()) {
+ if (children > cloudClient.getZkStateReader().getZkClient().getChildren("/live_nodes", null, true).size()) {
+ break;
+ }
+ Thread.sleep(500);
+ }
+ assertTrue(children > cloudClient.getZkStateReader().getZkClient().getChildren("/live_nodes", null, true).size());
+
+ int cversion = getOverseerCversion();
+ // Thread should not publish LIR and down state for node which is not live, regardless of whether forcePublish is true or false
+ assertFalse(thread.publishDownState(replicaCoreNodeProps.getCoreName(), replica.getName(), replica.getNodeName(), replicaCoreNodeProps.getCoreUrl(), false));
+ // lets assert that we did not publish anything to overseer queue, simplest way is to assert that cversion of overseer queue zk node is still the same
+ assertEquals(cversion, getOverseerCversion());
+
+ assertFalse(thread.publishDownState(replicaCoreNodeProps.getCoreName(), replica.getName(), replica.getNodeName(), replicaCoreNodeProps.getCoreUrl(), true));
+ // lets assert that we did not publish anything to overseer queue
+ assertEquals(cversion, getOverseerCversion());
+
+
+ /*
+ 3. Test that if ZK connection loss then thread should not attempt to publish down state even if forcePublish=true
+ */
+ ChaosMonkey.start(notLeader.jetty);
+ waitForRecoveriesToFinish(true);
+
+ thread = new LeaderInitiatedRecoveryThread(zkController, filter.getCores(),
+ DEFAULT_COLLECTION, SHARD1, replicaCoreNodeProps, 1, leaderCoreNodeName) {
+ @Override
+ protected void updateLIRState(String replicaCoreNodeName) {
+ throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "", new KeeperException.ConnectionLossException());
+ }
+ };
+ assertFalse(thread.publishDownState(replicaCoreNodeProps.getCoreName(), replica.getName(), replica.getNodeName(), replicaCoreNodeProps.getCoreUrl(), false));
+ assertFalse(thread.publishDownState(replicaCoreNodeProps.getCoreName(), replica.getName(), replica.getNodeName(), replicaCoreNodeProps.getCoreUrl(), true));
+ assertNull(zkController.getLeaderInitiatedRecoveryState(DEFAULT_COLLECTION, SHARD1, replica.getName()));
+
+
+ /*
+ 4. Test that if ZK connection loss or session expired then thread should not attempt to publish down state even if forcePublish=true
+ */
+ thread = new LeaderInitiatedRecoveryThread(zkController, filter.getCores(),
+ DEFAULT_COLLECTION, SHARD1, replicaCoreNodeProps, 1, leaderCoreNodeName) {
+ @Override
+ protected void updateLIRState(String replicaCoreNodeName) {
+ throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "", new KeeperException.SessionExpiredException());
+ }
+ };
+ assertFalse(thread.publishDownState(replicaCoreNodeProps.getCoreName(), replica.getName(), replica.getNodeName(), replicaCoreNodeProps.getCoreUrl(), false));
+ assertFalse(thread.publishDownState(replicaCoreNodeProps.getCoreName(), replica.getName(), replica.getNodeName(), replicaCoreNodeProps.getCoreUrl(), true));
+ assertNull(zkController.getLeaderInitiatedRecoveryState(DEFAULT_COLLECTION, SHARD1, replica.getName()));
+
+
+ /*
+ 5. Test that any exception other then ZK connection loss or session expired should publish down state only if forcePublish=true
+ */
+ thread = new LeaderInitiatedRecoveryThread(zkController, filter.getCores(),
+ DEFAULT_COLLECTION, SHARD1, replicaCoreNodeProps, 1, leaderCoreNodeName) {
+ @Override
+ protected void updateLIRState(String replicaCoreNodeName) {
+ throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "bogus exception");
+ }
+ };
+ // the following should return true because regardless of the bogus exception in setting LIR state, we still want recovery commands to be sent,
+ // however the following will not publish a down state
+ cversion = getOverseerCversion();
+ assertTrue(thread.publishDownState(replicaCoreNodeProps.getCoreName(), replica.getName(), replica.getNodeName(), replicaCoreNodeProps.getCoreUrl(), false));
+
+ // lets assert that we did not publish anything to overseer queue, simplest way is to assert that cversion of overseer queue zk node is still the same
+ assertEquals(cversion, getOverseerCversion());
+
+ assertTrue(thread.publishDownState(replicaCoreNodeProps.getCoreName(), replica.getName(), replica.getNodeName(), replicaCoreNodeProps.getCoreUrl(), true));
+ // this should have published a down state so assert that cversion has incremented
+ assertTrue(getOverseerCversion() > cversion);
+
+ timeOut = new TimeOut(30, TimeUnit.SECONDS);
+ while (!timeOut.hasTimedOut()) {
+ cloudClient.getZkStateReader().updateClusterState();
+ Replica r = cloudClient.getZkStateReader().getClusterState().getReplica(DEFAULT_COLLECTION, replica.getName());
+ if (r.getState() == Replica.State.DOWN) {
+ break;
+ }
+ Thread.sleep(500);
+ }
+
+ assertNull(zkController.getLeaderInitiatedRecoveryState(DEFAULT_COLLECTION, SHARD1, replica.getName()));
+ assertEquals(Replica.State.DOWN, cloudClient.getZkStateReader().getClusterState().getReplica(DEFAULT_COLLECTION, replica.getName()).getState());
+
+ /*
+ 6. Test that non-leader cannot set LIR nodes
+ */
+ filter = (SolrDispatchFilter) notLeader.jetty.getDispatchFilter().getFilter();
+ zkController = filter.getCores().getZkController();
+
+ thread = new LeaderInitiatedRecoveryThread(zkController, filter.getCores(),
+ DEFAULT_COLLECTION, SHARD1, replicaCoreNodeProps, 1, leaderCoreNodeName) {
+ @Override
+ protected void updateLIRState(String replicaCoreNodeName) {
+ try {
+ super.updateLIRState(replicaCoreNodeName);
+ } catch (Exception e) {
+ assertTrue(e instanceof ZkController.NotLeaderException);
+ throw e;
+ }
+ }
+ };
+ cversion = getOverseerCversion();
+ assertFalse(thread.publishDownState(replicaCoreNodeProps.getCoreName(), replica.getName(), replica.getNodeName(), replicaCoreNodeProps.getCoreUrl(), false));
+ assertEquals(cversion, getOverseerCversion());
+
+ /*
+ 7. assert that we can write a LIR state if everything else is fine
+ */
+ // reset the zkcontroller to the one from the leader
+ filter = (SolrDispatchFilter) leaderRunner.jetty.getDispatchFilter().getFilter();
+ zkController = filter.getCores().getZkController();
+ thread = new LeaderInitiatedRecoveryThread(zkController, filter.getCores(),
+ DEFAULT_COLLECTION, SHARD1, replicaCoreNodeProps, 1, leaderCoreNodeName);
+ thread.publishDownState(replicaCoreNodeProps.getCoreName(), replica.getName(), replica.getNodeName(), replicaCoreNodeProps.getCoreUrl(), false);
+ timeOut = new TimeOut(30, TimeUnit.SECONDS);
+ while (!timeOut.hasTimedOut()) {
+ Replica.State state = zkController.getLeaderInitiatedRecoveryState(DEFAULT_COLLECTION, SHARD1, replica.getName());
+ if (state == Replica.State.DOWN) {
+ break;
+ }
+ Thread.sleep(500);
+ }
+ assertNotNull(zkController.getLeaderInitiatedRecoveryStateObject(DEFAULT_COLLECTION, SHARD1, replica.getName()));
+ assertEquals(Replica.State.DOWN, zkController.getLeaderInitiatedRecoveryState(DEFAULT_COLLECTION, SHARD1, replica.getName()));
+
+ /*
+ 7. Test that
+ */
+ }
+
+ protected int getOverseerCversion() throws KeeperException, InterruptedException {
+ Stat stat = new Stat();
+ cloudClient.getZkStateReader().getZkClient().getData("/overseer/queue", null, stat, true);
+ return stat.getCversion();
+ }
+
+}
Modified: lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/ZkControllerTest.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/ZkControllerTest.java?rev=1702067&r1=1702066&r2=1702067&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/ZkControllerTest.java (original)
+++ lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/ZkControllerTest.java Wed Sep 9 18:07:45 2015
@@ -49,6 +49,7 @@ import java.util.Map;
import java.util.concurrent.TimeUnit;
@Slow
+@SolrTestCaseJ4.SuppressSSL
public class ZkControllerTest extends SolrTestCaseJ4 {
private static final String COLLECTION_NAME = "collection1";
@@ -243,64 +244,6 @@ public class ZkControllerTest extends So
} finally {
if (zkController != null)
zkController.close();
- }
- } finally {
- if (cc != null) {
- cc.shutdown();
- }
- server.shutdown();
- }
- }
-
- /*
- Test that:
- 1) LIR state to 'down' is not set unless publishing node is a leader
- 1a) Test that leader can publish when LIR node already exists in zk
- 1b) Test that leader can publish when LIR node does not exist - TODO
- 2) LIR state to 'active' or 'recovery' can be set regardless of whether publishing
- node is leader or not - TODO
- */
- public void testEnsureReplicaInLeaderInitiatedRecovery() throws Exception {
- String zkDir = createTempDir("testEnsureReplicaInLeaderInitiatedRecovery").toFile().getAbsolutePath();
- CoreContainer cc = null;
-
- ZkTestServer server = new ZkTestServer(zkDir);
- try {
- server.run();
-
- AbstractZkTestCase.tryCleanSolrZkNode(server.getZkHost());
- AbstractZkTestCase.makeSolrZkNode(server.getZkHost());
-
- cc = getCoreContainer();
- ZkController zkController = null;
-
- try {
- CloudConfig cloudConfig = new CloudConfig.CloudConfigBuilder("127.0.0.1", 8983, "solr").build();
- zkController = new ZkController(cc, server.getZkAddress(), TIMEOUT, cloudConfig, new CurrentCoreDescriptorProvider() {
-
- @Override
- public List<CoreDescriptor> getCurrentDescriptors() {
- // do nothing
- return null;
- }
- });
- HashMap<String, Object> propMap = new HashMap<>();
- propMap.put(ZkStateReader.BASE_URL_PROP, "http://127.0.0.1:8983/solr");
- propMap.put(ZkStateReader.CORE_NAME_PROP, "replica1");
- propMap.put(ZkStateReader.NODE_NAME_PROP, "127.0.0.1:8983_solr");
- Replica replica = new Replica("replica1", propMap);
- try {
- // this method doesn't throw exception when node isn't leader
- zkController.ensureReplicaInLeaderInitiatedRecovery("c1", "shard1",
- new ZkCoreNodeProps(replica), "non_existent_leader", false, false);
- fail("ZkController should not write LIR state for node which is not leader");
- } catch (Exception e) {
- assertNull("ZkController should not write LIR state for node which is not leader",
- zkController.getLeaderInitiatedRecoveryState("c1", "shard1", "replica1"));
- }
- } finally {
- if (zkController != null)
- zkController.close();
}
} finally {
if (cc != null) {