You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ab...@apache.org on 2019/10/10 22:48:44 UTC
[lucene-solr] branch branch_8x updated: SOLR-13828: Improve
ExecutePlanAction error handling.
This is an automated email from the ASF dual-hosted git repository.
ab pushed a commit to branch branch_8x
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git
The following commit(s) were added to refs/heads/branch_8x by this push:
new 9c96834 SOLR-13828: Improve ExecutePlanAction error handling.
9c96834 is described below
commit 9c96834ca7ed6d6bb4210e97c03cb3594cd98c19
Author: Andrzej Bialecki <ab...@apache.org>
AuthorDate: Thu Oct 10 23:58:40 2019 +0200
SOLR-13828: Improve ExecutePlanAction error handling.
---
solr/CHANGES.txt | 2 +
.../solr/cloud/autoscaling/ExecutePlanAction.java | 82 ++++++++++--
.../java/org/apache/solr/util/TestInjection.java | 6 +
.../cloud/autoscaling/ExecutePlanActionTest.java | 144 +++++++++++++++++++++
.../src/solrcloud-autoscaling-trigger-actions.adoc | 15 ++-
5 files changed, 236 insertions(+), 13 deletions(-)
diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt
index c3c267c..9f4af9d 100644
--- a/solr/CHANGES.txt
+++ b/solr/CHANGES.txt
@@ -246,6 +246,8 @@ Bug Fixes
* SOLR-13293: ConcurrentUpdateHttp2SolrClient always log AsynchronousCloseException exception error on indexing.
(Cao Manh Dat)
+* SOLR-13828: Improve ExecutePlanAction error handling. (ab)
+
Other Changes
----------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/autoscaling/ExecutePlanAction.java b/solr/core/src/java/org/apache/solr/cloud/autoscaling/ExecutePlanAction.java
index 6179bcc..2a7e026 100644
--- a/solr/core/src/java/org/apache/solr/cloud/autoscaling/ExecutePlanAction.java
+++ b/solr/core/src/java/org/apache/solr/cloud/autoscaling/ExecutePlanAction.java
@@ -22,6 +22,8 @@ import java.lang.invoke.MethodHandles;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
+import java.util.Locale;
+import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
@@ -38,6 +40,8 @@ import org.apache.solr.common.SolrException;
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.common.util.Utils;
+import org.apache.solr.core.SolrResourceLoader;
+import org.apache.solr.util.TestInjection;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
@@ -52,6 +56,24 @@ public class ExecutePlanAction extends TriggerActionBase {
private static final String PREFIX = "op-";
static final int DEFAULT_TASK_TIMEOUT_SECONDS = 120;
+ public static final String TASK_TIMEOUT_SECONDS = "taskTimeoutSeconds";
+ public static final String TASK_TIMEOUT_FAIL = "taskTimeoutFail";
+
+ int taskTimeoutSeconds;
+ boolean taskTimeoutFail;
+
+ public ExecutePlanAction() {
+ TriggerUtils.validProperties(validProperties, TASK_TIMEOUT_SECONDS, TASK_TIMEOUT_FAIL);
+ }
+
+ @Override
+ public void configure(SolrResourceLoader loader, SolrCloudManager cloudManager, Map<String, Object> properties) throws TriggerValidationException {
+ super.configure(loader, cloudManager, properties);
+ String str = String.valueOf(properties.getOrDefault(TASK_TIMEOUT_SECONDS, DEFAULT_TASK_TIMEOUT_SECONDS));
+ taskTimeoutSeconds = Integer.parseInt(str);
+ str = String.valueOf(properties.getOrDefault(TASK_TIMEOUT_FAIL, false));
+ taskTimeoutFail = Boolean.parseBoolean(str);
+ }
@Override
public void process(TriggerEvent event, ActionContext context) throws Exception {
@@ -63,11 +85,11 @@ public class ExecutePlanAction extends TriggerActionBase {
return;
}
try {
+ int counter = 0;
for (SolrRequest operation : operations) {
log.debug("Executing operation: {}", operation.getParams());
try {
SolrResponse response = null;
- int counter = 0;
if (operation instanceof CollectionAdminRequest.AsyncCollectionAdminRequest) {
CollectionAdminRequest.AsyncCollectionAdminRequest req = (CollectionAdminRequest.AsyncCollectionAdminRequest) operation;
// waitForFinalState so that the end effects of operations are visible
@@ -77,16 +99,34 @@ public class ExecutePlanAction extends TriggerActionBase {
log.trace("Saved requestId: {} in znode: {}", asyncId, znode);
// TODO: find a better way of using async calls using dataProvider API !!!
req.setAsyncId(asyncId);
- SolrResponse asyncResponse = cloudManager.request(req);
- if (asyncResponse.getResponse().get("error") != null) {
- throw new IOException("" + asyncResponse.getResponse().get("error"));
+ if (TestInjection.delayInExecutePlanAction != null) {
+ cloudManager.getTimeSource().sleep(TestInjection.delayInExecutePlanAction);
+ }
+ CollectionAdminRequest.RequestStatusResponse statusResponse = null;
+ RequestStatusState state = RequestStatusState.FAILED;
+ if (!TestInjection.failInExecutePlanAction) {
+ SolrResponse asyncResponse = cloudManager.request(req);
+ if (asyncResponse.getResponse().get("error") != null) {
+ throw new IOException("" + asyncResponse.getResponse().get("error"));
+ }
+ asyncId = (String)asyncResponse.getResponse().get("requestid");
+ statusResponse = waitForTaskToFinish(cloudManager, asyncId,
+ taskTimeoutSeconds, TimeUnit.SECONDS);
}
- asyncId = (String)asyncResponse.getResponse().get("requestid");
- CollectionAdminRequest.RequestStatusResponse statusResponse = waitForTaskToFinish(cloudManager, asyncId,
- DEFAULT_TASK_TIMEOUT_SECONDS, TimeUnit.SECONDS);
if (statusResponse != null) {
- RequestStatusState state = statusResponse.getRequestStatus();
+ state = statusResponse.getRequestStatus();
+ // overwrite to test a long-running task
+ if (TestInjection.delayInExecutePlanAction != null &&
+ TestInjection.delayInExecutePlanAction > TimeUnit.MILLISECONDS.convert(taskTimeoutSeconds, TimeUnit.SECONDS)) {
+ state = RequestStatusState.RUNNING;
+ }
+ if (TestInjection.failInExecutePlanAction) {
+ state = RequestStatusState.FAILED;
+ }
+ // should we accept partial success here? i.e. some operations won't be completed
+ // successfully but the event processing will still be declared a success
if (state == RequestStatusState.COMPLETED || state == RequestStatusState.FAILED || state == RequestStatusState.NOT_FOUND) {
+ // remove pending task marker for this request
try {
cloudManager.getDistribStateManager().removeData(znode, -1);
} catch (Exception e) {
@@ -95,7 +135,26 @@ public class ExecutePlanAction extends TriggerActionBase {
}
response = statusResponse;
}
+ if (state == RequestStatusState.RUNNING || state == RequestStatusState.SUBMITTED) {
+ String msg = String.format(Locale.ROOT, "Task %s is still running after " + taskTimeoutSeconds + " seconds. Consider increasing " +
+ TASK_TIMEOUT_SECONDS + " action property or `waitFor` of the trigger %s. Operation: %s",
+ asyncId, event.source, req);
+ if (taskTimeoutFail) {
+ throw new IOException(msg);
+ } else {
+ log.warn(msg);
+ }
+ } else if (state == RequestStatusState.FAILED) {
+ // remove it as a pending task
+ try {
+ cloudManager.getDistribStateManager().removeData(znode, -1);
+ } catch (Exception e) {
+ log.warn("Unexpected exception while trying to delete znode: " + znode, e);
+ }
+ throw new IOException("Task " + asyncId + " failed: " + (statusResponse != null ? statusResponse : " timed out. Operation: " + req));
+ }
} else {
+ // generic response - can't easily determine success or failure
response = cloudManager.request(operation);
}
NamedList<Object> result = response.getResponse();
@@ -105,6 +164,7 @@ public class ExecutePlanAction extends TriggerActionBase {
responses.add(result);
return responses;
});
+ counter++;
} catch (IOException e) {
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
"Unexpected exception executing operation: " + operation.getParams(), e);
@@ -160,12 +220,14 @@ public class ExecutePlanAction extends TriggerActionBase {
}
cloudManager.getTimeSource().sleep(5000);
}
- log.debug("Task with requestId={} did not complete within 5 minutes. Last state={}", requestId, state);
+ log.debug("Task with requestId={} did not complete within {} seconds. Last state={}", timeoutSeconds, requestId, state);
return statusResponse;
}
/**
- * Saves the given asyncId in ZK as a persistent sequential node.
+ * Saves the given asyncId in ZK as a persistent sequential node. This allows us to wait for the completion
+ * of pending tasks from this event in {@link ScheduledTriggers}
+ * before starting the actions of the next event.
*
* @return the path of the newly created node in ZooKeeper
*/
diff --git a/solr/core/src/java/org/apache/solr/util/TestInjection.java b/solr/core/src/java/org/apache/solr/util/TestInjection.java
index 12de7fe..ef140d0 100644
--- a/solr/core/src/java/org/apache/solr/util/TestInjection.java
+++ b/solr/core/src/java/org/apache/solr/util/TestInjection.java
@@ -141,6 +141,10 @@ public class TestInjection {
public volatile static Integer delayBeforeSlaveCommitRefresh=null;
+ public volatile static Integer delayInExecutePlanAction=null;
+
+ public volatile static boolean failInExecutePlanAction = false;
+
public volatile static boolean uifOutOfMemoryError = false;
public volatile static Map<String, String> additionalSystemProps = null;
@@ -171,6 +175,8 @@ public class TestInjection {
failIndexFingerprintRequests = null;
wrongIndexFingerprint = null;
delayBeforeSlaveCommitRefresh = null;
+ delayInExecutePlanAction = null;
+ failInExecutePlanAction = false;
uifOutOfMemoryError = false;
notifyPauseForeverDone();
newSearcherHooks.clear();
diff --git a/solr/core/src/test/org/apache/solr/cloud/autoscaling/ExecutePlanActionTest.java b/solr/core/src/test/org/apache/solr/cloud/autoscaling/ExecutePlanActionTest.java
index d6e44ca..d286faf 100644
--- a/solr/core/src/test/org/apache/solr/cloud/autoscaling/ExecutePlanActionTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/autoscaling/ExecutePlanActionTest.java
@@ -22,6 +22,9 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
@@ -33,6 +36,7 @@ import org.apache.solr.client.solrj.embedded.JettySolrRunner;
import org.apache.solr.client.solrj.impl.CloudSolrClient;
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
import org.apache.solr.cloud.CloudTestUtils.AutoScalingRequest;
+import org.apache.solr.cloud.CloudUtil;
import org.apache.solr.cloud.SolrCloudTestCase;
import org.apache.solr.common.cloud.ClusterState;
import org.apache.solr.common.cloud.DocCollection;
@@ -44,6 +48,7 @@ import org.apache.solr.common.util.NamedList;
import org.apache.solr.common.util.Utils;
import org.apache.solr.core.SolrResourceLoader;
import org.apache.solr.util.LogLevel;
+import org.apache.solr.util.TestInjection;
import org.apache.zookeeper.data.Stat;
import org.junit.After;
import org.junit.Before;
@@ -66,6 +71,26 @@ public class ExecutePlanActionTest extends SolrCloudTestCase {
private SolrResourceLoader loader;
private SolrCloudManager cloudManager;
+ public static class StartAction extends TriggerActionBase {
+
+ @Override
+ public void process(TriggerEvent event, ActionContext context) throws Exception {
+ startedProcessing.countDown();
+ }
+ }
+
+ private static CountDownLatch startedProcessing = new CountDownLatch(1);
+
+ public static class FinishAction extends TriggerActionBase {
+
+ @Override
+ public void process(TriggerEvent event, ActionContext context) throws Exception {
+ finishedProcessing.countDown();
+ }
+ }
+
+ private static CountDownLatch finishedProcessing = new CountDownLatch(1);
+
@BeforeClass
public static void setupCluster() throws Exception {
@@ -84,6 +109,9 @@ public class ExecutePlanActionTest extends SolrCloudTestCase {
cloudManager = cluster.getJettySolrRunner(0).getCoreContainer().getZkController().getSolrCloudManager();
+
+ finishedProcessing = new CountDownLatch(1);
+ startedProcessing = new CountDownLatch(1);
}
@@ -91,6 +119,7 @@ public class ExecutePlanActionTest extends SolrCloudTestCase {
public void tearDown() throws Exception {
shutdownCluster();
super.tearDown();
+ TestInjection.reset();
}
@Test
@@ -233,4 +262,119 @@ public class ExecutePlanActionTest extends SolrCloudTestCase {
assertNotNull(replicasOnSurvivor);
assertEquals(docCollection.toString(), 2, replicasOnSurvivor.size());
}
+
+ @Test
+ public void testTaskTimeout() throws Exception {
+ int DELAY = 2000;
+ boolean taskTimeoutFail = random().nextBoolean();
+ TestInjection.delayInExecutePlanAction = DELAY;
+ CloudSolrClient solrClient = cluster.getSolrClient();
+ String triggerName = "node_lost_trigger2";
+
+ String setTriggerCommand = "{" +
+ "'set-trigger' : {" +
+ "'name' : '" + triggerName + "'," +
+ "'event' : 'nodeLost'," +
+ "'waitFor' : '1s'," +
+ "'enabled' : true," +
+ "'actions' : [{'name':'compute_plan', 'class' : 'solr.ComputePlanAction'}," +
+ "{'name':'execute_plan','class':'solr.ExecutePlanAction', 'taskTimeoutSeconds' : '1','taskTimeoutFail':'" + taskTimeoutFail + "'}," +
+ "{'name':'finish','class':'" + FinishAction.class.getName() + "'}]" +
+ "}}";
+ SolrRequest req = AutoScalingRequest.create(SolrRequest.METHOD.POST, setTriggerCommand);
+ NamedList<Object> response = solrClient.request(req);
+ assertEquals(response.get("result").toString(), "success");
+
+ String collectionName = "testTaskTimeout";
+ CollectionAdminRequest.Create create = CollectionAdminRequest.createCollection(collectionName,
+ "conf", 1, 2);
+ create.setMaxShardsPerNode(1);
+ create.process(solrClient);
+
+ cluster.waitForActiveCollection(collectionName, 1, 2);
+
+ waitForState("Timed out waiting for replicas of new collection to be active",
+ collectionName, clusterShape(1, 2));
+
+ JettySolrRunner sourceNode = cluster.getRandomJetty(random());
+
+ for (int i = 0; i < cluster.getJettySolrRunners().size(); i++) {
+ JettySolrRunner runner = cluster.getJettySolrRunner(i);
+ if (runner == sourceNode) {
+ JettySolrRunner j = cluster.stopJettySolrRunner(i);
+ cluster.waitForJettyToStop(j);
+ }
+ }
+
+ boolean await = finishedProcessing.await(DELAY * 5, TimeUnit.MILLISECONDS);
+ if (taskTimeoutFail) {
+ assertFalse("finished processing event but should fail", await);
+ } else {
+ assertTrue("did not finish processing event in time", await);
+ }
+ String path = ZkStateReader.SOLR_AUTOSCALING_TRIGGER_STATE_PATH + "/" + triggerName + "/execute_plan";
+ assertTrue(path + " does not exist", zkClient().exists(path, true));
+ List<String> requests = zkClient().getChildren(path, null, true);
+ assertFalse("some requests should be still present", requests.isEmpty());
+
+ // in either case the task will complete and move the replica as needed
+ waitForState("Timed out waiting for replicas of collection to be 2 again",
+ collectionName, clusterShape(1, 2));
+ }
+
+ @Test
+ public void testTaskFail() throws Exception {
+ TestInjection.failInExecutePlanAction = true;
+ CloudSolrClient solrClient = cluster.getSolrClient();
+ String triggerName = "node_lost_trigger3";
+
+ String setTriggerCommand = "{" +
+ "'set-trigger' : {" +
+ "'name' : '" + triggerName + "'," +
+ "'event' : 'nodeLost'," +
+ "'waitFor' : '1s'," +
+ "'enabled' : true," +
+ "'actions' : [{'name':'start', 'class' : '" + StartAction.class.getName() + "'}," +
+ "{'name':'compute_plan','class':'solr.ComputePlanAction'}," +
+ "{'name':'execute_plan','class':'solr.ExecutePlanAction'}," +
+ "{'name':'finish','class':'" + FinishAction.class.getName() + "'}]" +
+ "}}";
+ SolrRequest req = AutoScalingRequest.create(SolrRequest.METHOD.POST, setTriggerCommand);
+ NamedList<Object> response = solrClient.request(req);
+ assertEquals(response.get("result").toString(), "success");
+
+ String collectionName = "testTaskFail";
+ CollectionAdminRequest.Create create = CollectionAdminRequest.createCollection(collectionName,
+ "conf", 1, 2);
+ create.setMaxShardsPerNode(1);
+ create.process(solrClient);
+
+ cluster.waitForActiveCollection(collectionName, 1, 2);
+
+ waitForState("Timed out waiting for replicas of new collection to be active",
+ collectionName, clusterShape(1, 2));
+
+ // don't stop the jetty that runs our SolrCloudManager
+ JettySolrRunner runner = cluster.stopJettySolrRunner(1);
+ cluster.waitForJettyToStop(runner);
+
+ boolean await = startedProcessing.await(10, TimeUnit.SECONDS);
+ assertTrue("did not start processing event in time", await);
+ await = finishedProcessing.await(2, TimeUnit.SECONDS);
+ assertFalse("finished processing event but should fail", await);
+
+ String path = ZkStateReader.SOLR_AUTOSCALING_TRIGGER_STATE_PATH + "/" + triggerName + "/execute_plan";
+ assertTrue(path + " does not exist", zkClient().exists(path, true));
+ List<String> requests = zkClient().getChildren(path, null, true);
+ assertTrue("there should be no requests pending but got " + requests, requests.isEmpty());
+
+ // the task never completed - we actually lost a replica
+ try {
+ CloudUtil.waitForState(cloudManager, collectionName, 5, TimeUnit.SECONDS,
+ CloudUtil.clusterShape(1, 2));
+ fail("completed a task that should have failed");
+ } catch (TimeoutException te) {
+ // expected
+ }
+ }
}
diff --git a/solr/solr-ref-guide/src/solrcloud-autoscaling-trigger-actions.adoc b/solr/solr-ref-guide/src/solrcloud-autoscaling-trigger-actions.adoc
index 5571377..3ad3772 100644
--- a/solr/solr-ref-guide/src/solrcloud-autoscaling-trigger-actions.adoc
+++ b/solr/solr-ref-guide/src/solrcloud-autoscaling-trigger-actions.adoc
@@ -69,10 +69,19 @@ The `ExecutePlanAction` executes the Collection API commands emitted by the `Com
the cluster using SolrJ. It executes the commands serially, waiting for each of them to succeed before
continuing with the next one.
-Currently, it has no configurable parameters.
+Currently, it has the following configurable parameters:
-If any one of the commands fail, then the complete chain of actions are
-executed again at the next run of the trigger. If the Overseer node fails while `ExecutePlanAction` is running,
+`taskTimeoutSeconds`::
+Default value of this parameter is 120 seconds. This value defines how long the action will wait for a
+command to complete its execution. If a timeout is reached while the command is still running then
+the command status is provisionally considered a success but a warning is logged, unless `taskTimeoutFail`
+is set to true.
+
+`taskTimeoutFail`::
+Boolean with a default value of false. If this value is true then a timeout in command processing will be
+marked as failure and an exception will be thrown.
+
+If the Overseer node fails while `ExecutePlanAction` is running,
then the new Overseer node will run the chain of actions for the same event again after waiting for any
running Collection API operations belonging to the event to complete.