You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ab...@apache.org on 2019/10/10 21:59:15 UTC

[lucene-solr] branch master updated: SOLR-13828: Improve ExecutePlanAction error handling.

This is an automated email from the ASF dual-hosted git repository.

ab pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git


The following commit(s) were added to refs/heads/master by this push:
     new 430267e  SOLR-13828: Improve ExecutePlanAction error handling.
430267e is described below

commit 430267ecc9d83dc36cda3255a1a86710d2647f2b
Author: Andrzej Bialecki <ab...@apache.org>
AuthorDate: Thu Oct 10 23:58:40 2019 +0200

    SOLR-13828: Improve ExecutePlanAction error handling.
---
 solr/CHANGES.txt                                   |   2 +
 .../solr/cloud/autoscaling/ExecutePlanAction.java  |  82 ++++++++++--
 .../java/org/apache/solr/util/TestInjection.java   |   6 +
 .../cloud/autoscaling/ExecutePlanActionTest.java   | 144 +++++++++++++++++++++
 .../src/solrcloud-autoscaling-trigger-actions.adoc |  15 ++-
 5 files changed, 236 insertions(+), 13 deletions(-)

diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt
index d4602b3..f778e6f 100644
--- a/solr/CHANGES.txt
+++ b/solr/CHANGES.txt
@@ -308,6 +308,8 @@ Bug Fixes
 * SOLR-13293: ConcurrentUpdateHttp2SolrClient always log AsynchronousCloseException exception error on indexing.
   (Cao Manh Dat)
 
+* SOLR-13828: Improve ExecutePlanAction error handling. (ab)
+
 Other Changes
 ----------------------
 
diff --git a/solr/core/src/java/org/apache/solr/cloud/autoscaling/ExecutePlanAction.java b/solr/core/src/java/org/apache/solr/cloud/autoscaling/ExecutePlanAction.java
index 6179bcc..2a7e026 100644
--- a/solr/core/src/java/org/apache/solr/cloud/autoscaling/ExecutePlanAction.java
+++ b/solr/core/src/java/org/apache/solr/cloud/autoscaling/ExecutePlanAction.java
@@ -22,6 +22,8 @@ import java.lang.invoke.MethodHandles;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
+import java.util.Locale;
+import java.util.Map;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
@@ -38,6 +40,8 @@ import org.apache.solr.common.SolrException;
 import org.apache.solr.common.cloud.ZkStateReader;
 import org.apache.solr.common.util.NamedList;
 import org.apache.solr.common.util.Utils;
+import org.apache.solr.core.SolrResourceLoader;
+import org.apache.solr.util.TestInjection;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
 import org.slf4j.Logger;
@@ -52,6 +56,24 @@ public class ExecutePlanAction extends TriggerActionBase {
   private static final String PREFIX = "op-";
 
   static final int DEFAULT_TASK_TIMEOUT_SECONDS = 120;
+  public static final String TASK_TIMEOUT_SECONDS = "taskTimeoutSeconds";
+  public static final String TASK_TIMEOUT_FAIL = "taskTimeoutFail";
+
+  int taskTimeoutSeconds;
+  boolean taskTimeoutFail;
+
+  public ExecutePlanAction() {
+    TriggerUtils.validProperties(validProperties, TASK_TIMEOUT_SECONDS, TASK_TIMEOUT_FAIL);
+  }
+
+  @Override
+  public void configure(SolrResourceLoader loader, SolrCloudManager cloudManager, Map<String, Object> properties) throws TriggerValidationException {
+    super.configure(loader, cloudManager, properties);
+    String str = String.valueOf(properties.getOrDefault(TASK_TIMEOUT_SECONDS, DEFAULT_TASK_TIMEOUT_SECONDS));
+    taskTimeoutSeconds = Integer.parseInt(str);
+    str = String.valueOf(properties.getOrDefault(TASK_TIMEOUT_FAIL, false));
+    taskTimeoutFail = Boolean.parseBoolean(str);
+  }
 
   @Override
   public void process(TriggerEvent event, ActionContext context) throws Exception {
@@ -63,11 +85,11 @@ public class ExecutePlanAction extends TriggerActionBase {
       return;
     }
     try {
+      int counter = 0;
       for (SolrRequest operation : operations) {
         log.debug("Executing operation: {}", operation.getParams());
         try {
           SolrResponse response = null;
-          int counter = 0;
           if (operation instanceof CollectionAdminRequest.AsyncCollectionAdminRequest) {
             CollectionAdminRequest.AsyncCollectionAdminRequest req = (CollectionAdminRequest.AsyncCollectionAdminRequest) operation;
             // waitForFinalState so that the end effects of operations are visible
@@ -77,16 +99,34 @@ public class ExecutePlanAction extends TriggerActionBase {
             log.trace("Saved requestId: {} in znode: {}", asyncId, znode);
             // TODO: find a better way of using async calls using dataProvider API !!!
             req.setAsyncId(asyncId);
-            SolrResponse asyncResponse = cloudManager.request(req);
-            if (asyncResponse.getResponse().get("error") != null) {
-              throw new IOException("" + asyncResponse.getResponse().get("error"));
+            if (TestInjection.delayInExecutePlanAction != null) {
+              cloudManager.getTimeSource().sleep(TestInjection.delayInExecutePlanAction);
+            }
+            CollectionAdminRequest.RequestStatusResponse statusResponse = null;
+            RequestStatusState state = RequestStatusState.FAILED;
+            if (!TestInjection.failInExecutePlanAction) {
+              SolrResponse asyncResponse = cloudManager.request(req);
+              if (asyncResponse.getResponse().get("error") != null) {
+                throw new IOException("" + asyncResponse.getResponse().get("error"));
+              }
+              asyncId = (String)asyncResponse.getResponse().get("requestid");
+              statusResponse = waitForTaskToFinish(cloudManager, asyncId,
+                  taskTimeoutSeconds, TimeUnit.SECONDS);
             }
-            asyncId = (String)asyncResponse.getResponse().get("requestid");
-            CollectionAdminRequest.RequestStatusResponse statusResponse = waitForTaskToFinish(cloudManager, asyncId,
-                DEFAULT_TASK_TIMEOUT_SECONDS, TimeUnit.SECONDS);
             if (statusResponse != null) {
-              RequestStatusState state = statusResponse.getRequestStatus();
+              state = statusResponse.getRequestStatus();
+              // overwrite to test a long-running task
+              if (TestInjection.delayInExecutePlanAction != null &&
+                  TestInjection.delayInExecutePlanAction > TimeUnit.MILLISECONDS.convert(taskTimeoutSeconds, TimeUnit.SECONDS)) {
+                state = RequestStatusState.RUNNING;
+              }
+              if (TestInjection.failInExecutePlanAction) {
+                state = RequestStatusState.FAILED;
+              }
+              // should we accept partial success here? i.e. some operations won't be completed
+              // successfully but the event processing will still be declared a success
               if (state == RequestStatusState.COMPLETED || state == RequestStatusState.FAILED || state == RequestStatusState.NOT_FOUND) {
+                // remove pending task marker for this request
                 try {
                   cloudManager.getDistribStateManager().removeData(znode, -1);
                 } catch (Exception e) {
@@ -95,7 +135,26 @@ public class ExecutePlanAction extends TriggerActionBase {
               }
               response = statusResponse;
             }
+            if (state == RequestStatusState.RUNNING || state == RequestStatusState.SUBMITTED) {
+              String msg = String.format(Locale.ROOT, "Task %s is still running after " + taskTimeoutSeconds + " seconds. Consider increasing " +
+                      TASK_TIMEOUT_SECONDS + " action property or `waitFor` of the trigger %s. Operation: %s",
+                  asyncId, event.source, req);
+              if (taskTimeoutFail) {
+                throw new IOException(msg);
+              } else {
+                log.warn(msg);
+              }
+            } else if (state == RequestStatusState.FAILED) {
+              // remove it as a pending task
+              try {
+                cloudManager.getDistribStateManager().removeData(znode, -1);
+              } catch (Exception e) {
+                log.warn("Unexpected exception while trying to delete znode: " + znode, e);
+              }
+              throw new IOException("Task " + asyncId + " failed: " + (statusResponse != null ? statusResponse : " timed out. Operation: " + req));
+            }
           } else {
+            // generic response - can't easily determine success or failure
             response = cloudManager.request(operation);
           }
           NamedList<Object> result = response.getResponse();
@@ -105,6 +164,7 @@ public class ExecutePlanAction extends TriggerActionBase {
             responses.add(result);
             return responses;
           });
+          counter++;
         } catch (IOException e) {
           throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
               "Unexpected exception executing operation: " + operation.getParams(), e);
@@ -160,12 +220,14 @@ public class ExecutePlanAction extends TriggerActionBase {
       }
       cloudManager.getTimeSource().sleep(5000);
     }
-    log.debug("Task with requestId={} did not complete within 5 minutes. Last state={}", requestId, state);
+    log.debug("Task with requestId={} did not complete within {} seconds. Last state={}", timeoutSeconds, requestId, state);
     return statusResponse;
   }
 
   /**
-   * Saves the given asyncId in ZK as a persistent sequential node.
+   * Saves the given asyncId in ZK as a persistent sequential node. This allows us to wait for the completion
+   * of pending tasks from this event in {@link ScheduledTriggers}
+   * before starting the actions of the next event.
    *
    * @return the path of the newly created node in ZooKeeper
    */
diff --git a/solr/core/src/java/org/apache/solr/util/TestInjection.java b/solr/core/src/java/org/apache/solr/util/TestInjection.java
index 12de7fe..ef140d0 100644
--- a/solr/core/src/java/org/apache/solr/util/TestInjection.java
+++ b/solr/core/src/java/org/apache/solr/util/TestInjection.java
@@ -141,6 +141,10 @@ public class TestInjection {
 
   public volatile static Integer delayBeforeSlaveCommitRefresh=null;
 
+  public volatile static Integer delayInExecutePlanAction=null;
+
+  public volatile static boolean failInExecutePlanAction = false;
+
   public volatile static boolean uifOutOfMemoryError = false;
 
   public volatile static Map<String, String> additionalSystemProps = null;
@@ -171,6 +175,8 @@ public class TestInjection {
     failIndexFingerprintRequests = null;
     wrongIndexFingerprint = null;
     delayBeforeSlaveCommitRefresh = null;
+    delayInExecutePlanAction = null;
+    failInExecutePlanAction = false;
     uifOutOfMemoryError = false;
     notifyPauseForeverDone();
     newSearcherHooks.clear();
diff --git a/solr/core/src/test/org/apache/solr/cloud/autoscaling/ExecutePlanActionTest.java b/solr/core/src/test/org/apache/solr/cloud/autoscaling/ExecutePlanActionTest.java
index d6e44ca..d286faf 100644
--- a/solr/core/src/test/org/apache/solr/cloud/autoscaling/ExecutePlanActionTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/autoscaling/ExecutePlanActionTest.java
@@ -22,6 +22,9 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.stream.Collectors;
 
@@ -33,6 +36,7 @@ import org.apache.solr.client.solrj.embedded.JettySolrRunner;
 import org.apache.solr.client.solrj.impl.CloudSolrClient;
 import org.apache.solr.client.solrj.request.CollectionAdminRequest;
 import org.apache.solr.cloud.CloudTestUtils.AutoScalingRequest;
+import org.apache.solr.cloud.CloudUtil;
 import org.apache.solr.cloud.SolrCloudTestCase;
 import org.apache.solr.common.cloud.ClusterState;
 import org.apache.solr.common.cloud.DocCollection;
@@ -44,6 +48,7 @@ import org.apache.solr.common.util.NamedList;
 import org.apache.solr.common.util.Utils;
 import org.apache.solr.core.SolrResourceLoader;
 import org.apache.solr.util.LogLevel;
+import org.apache.solr.util.TestInjection;
 import org.apache.zookeeper.data.Stat;
 import org.junit.After;
 import org.junit.Before;
@@ -66,6 +71,26 @@ public class ExecutePlanActionTest extends SolrCloudTestCase {
   private SolrResourceLoader loader;
   private SolrCloudManager cloudManager;
 
+  public static class StartAction extends TriggerActionBase {
+
+    @Override
+    public void process(TriggerEvent event, ActionContext context) throws Exception {
+      startedProcessing.countDown();
+    }
+  }
+
+  private static CountDownLatch startedProcessing = new CountDownLatch(1);
+
+  public static class FinishAction extends TriggerActionBase {
+
+    @Override
+    public void process(TriggerEvent event, ActionContext context) throws Exception {
+      finishedProcessing.countDown();
+    }
+  }
+
+  private static CountDownLatch finishedProcessing = new CountDownLatch(1);
+
   @BeforeClass
   public static void setupCluster() throws Exception {
 
@@ -84,6 +109,9 @@ public class ExecutePlanActionTest extends SolrCloudTestCase {
 
 
     cloudManager = cluster.getJettySolrRunner(0).getCoreContainer().getZkController().getSolrCloudManager();
+
+    finishedProcessing = new CountDownLatch(1);
+    startedProcessing = new CountDownLatch(1);
   }
   
 
@@ -91,6 +119,7 @@ public class ExecutePlanActionTest extends SolrCloudTestCase {
   public void tearDown() throws Exception  {
     shutdownCluster();
     super.tearDown();
+    TestInjection.reset();
   }
 
   @Test
@@ -233,4 +262,119 @@ public class ExecutePlanActionTest extends SolrCloudTestCase {
     assertNotNull(replicasOnSurvivor);
     assertEquals(docCollection.toString(), 2, replicasOnSurvivor.size());
   }
+
+  @Test
+  public void testTaskTimeout() throws Exception  {
+    int DELAY = 2000;
+    boolean taskTimeoutFail = random().nextBoolean();
+    TestInjection.delayInExecutePlanAction = DELAY;
+    CloudSolrClient solrClient = cluster.getSolrClient();
+    String triggerName = "node_lost_trigger2";
+
+    String setTriggerCommand = "{" +
+        "'set-trigger' : {" +
+        "'name' : '" + triggerName + "'," +
+        "'event' : 'nodeLost'," +
+        "'waitFor' : '1s'," +
+        "'enabled' : true," +
+        "'actions' : [{'name':'compute_plan', 'class' : 'solr.ComputePlanAction'}," +
+        "{'name':'execute_plan','class':'solr.ExecutePlanAction', 'taskTimeoutSeconds' : '1','taskTimeoutFail':'" + taskTimeoutFail + "'}," +
+        "{'name':'finish','class':'" + FinishAction.class.getName() + "'}]" +
+        "}}";
+    SolrRequest req = AutoScalingRequest.create(SolrRequest.METHOD.POST, setTriggerCommand);
+    NamedList<Object> response = solrClient.request(req);
+    assertEquals(response.get("result").toString(), "success");
+
+    String collectionName = "testTaskTimeout";
+    CollectionAdminRequest.Create create = CollectionAdminRequest.createCollection(collectionName,
+        "conf", 1, 2);
+    create.setMaxShardsPerNode(1);
+    create.process(solrClient);
+
+    cluster.waitForActiveCollection(collectionName, 1, 2);
+
+    waitForState("Timed out waiting for replicas of new collection to be active",
+        collectionName, clusterShape(1, 2));
+
+    JettySolrRunner sourceNode = cluster.getRandomJetty(random());
+
+    for (int i = 0; i < cluster.getJettySolrRunners().size(); i++) {
+      JettySolrRunner runner = cluster.getJettySolrRunner(i);
+      if (runner == sourceNode) {
+        JettySolrRunner j = cluster.stopJettySolrRunner(i);
+        cluster.waitForJettyToStop(j);
+      }
+    }
+
+    boolean await = finishedProcessing.await(DELAY * 5, TimeUnit.MILLISECONDS);
+    if (taskTimeoutFail) {
+      assertFalse("finished processing event but should fail", await);
+    } else {
+      assertTrue("did not finish processing event in time", await);
+    }
+    String path = ZkStateReader.SOLR_AUTOSCALING_TRIGGER_STATE_PATH + "/" + triggerName + "/execute_plan";
+    assertTrue(path + " does not exist", zkClient().exists(path, true));
+    List<String> requests = zkClient().getChildren(path, null, true);
+    assertFalse("some requests should be still present", requests.isEmpty());
+
+    // in either case the task will complete and move the replica as needed
+    waitForState("Timed out waiting for replicas of collection to be 2 again",
+        collectionName, clusterShape(1, 2));
+  }
+
+  @Test
+  public void testTaskFail() throws Exception  {
+    TestInjection.failInExecutePlanAction = true;
+    CloudSolrClient solrClient = cluster.getSolrClient();
+    String triggerName = "node_lost_trigger3";
+
+    String setTriggerCommand = "{" +
+        "'set-trigger' : {" +
+        "'name' : '" + triggerName + "'," +
+        "'event' : 'nodeLost'," +
+        "'waitFor' : '1s'," +
+        "'enabled' : true," +
+        "'actions' : [{'name':'start', 'class' : '" + StartAction.class.getName() + "'}," +
+        "{'name':'compute_plan','class':'solr.ComputePlanAction'}," +
+        "{'name':'execute_plan','class':'solr.ExecutePlanAction'}," +
+        "{'name':'finish','class':'" + FinishAction.class.getName() + "'}]" +
+        "}}";
+    SolrRequest req = AutoScalingRequest.create(SolrRequest.METHOD.POST, setTriggerCommand);
+    NamedList<Object> response = solrClient.request(req);
+    assertEquals(response.get("result").toString(), "success");
+
+    String collectionName = "testTaskFail";
+    CollectionAdminRequest.Create create = CollectionAdminRequest.createCollection(collectionName,
+        "conf", 1, 2);
+    create.setMaxShardsPerNode(1);
+    create.process(solrClient);
+
+    cluster.waitForActiveCollection(collectionName, 1, 2);
+
+    waitForState("Timed out waiting for replicas of new collection to be active",
+        collectionName, clusterShape(1, 2));
+
+    // don't stop the jetty that runs our SolrCloudManager
+    JettySolrRunner runner = cluster.stopJettySolrRunner(1);
+    cluster.waitForJettyToStop(runner);
+
+    boolean await = startedProcessing.await(10, TimeUnit.SECONDS);
+    assertTrue("did not start processing event in time", await);
+    await = finishedProcessing.await(2, TimeUnit.SECONDS);
+    assertFalse("finished processing event but should fail", await);
+
+    String path = ZkStateReader.SOLR_AUTOSCALING_TRIGGER_STATE_PATH + "/" + triggerName + "/execute_plan";
+    assertTrue(path + " does not exist", zkClient().exists(path, true));
+    List<String> requests = zkClient().getChildren(path, null, true);
+    assertTrue("there should be no requests pending but got " + requests, requests.isEmpty());
+
+    // the task never completed - we actually lost a replica
+    try {
+      CloudUtil.waitForState(cloudManager, collectionName, 5, TimeUnit.SECONDS,
+          CloudUtil.clusterShape(1, 2));
+      fail("completed a task that should have failed");
+    } catch (TimeoutException te) {
+      // expected
+    }
+  }
 }
diff --git a/solr/solr-ref-guide/src/solrcloud-autoscaling-trigger-actions.adoc b/solr/solr-ref-guide/src/solrcloud-autoscaling-trigger-actions.adoc
index 5571377..3ad3772 100644
--- a/solr/solr-ref-guide/src/solrcloud-autoscaling-trigger-actions.adoc
+++ b/solr/solr-ref-guide/src/solrcloud-autoscaling-trigger-actions.adoc
@@ -69,10 +69,19 @@ The `ExecutePlanAction` executes the Collection API commands emitted by the `Com
 the cluster using SolrJ. It executes the commands serially, waiting for each of them to succeed before
 continuing with the next one.
 
-Currently, it has no configurable parameters.
+Currently, it has the following configurable parameters:
 
-If any one of the commands fail, then the complete chain of actions are
-executed again at the next run of the trigger. If the Overseer node fails while `ExecutePlanAction` is running,
+`taskTimeoutSeconds`::
+Default value of this parameter is 120 seconds. This value defines how long the action will wait for a
+command to complete its execution. If a timeout is reached while the command is still running then
+the command status is provisionally considered a success but a warning is logged, unless `taskTimeoutFail`
+is set to true.
+
+`taskTimeoutFail`::
+Boolean with a default value of false. If this value is true then a timeout in command processing will be
+marked as failure and an exception will be thrown.
+
+If the Overseer node fails while `ExecutePlanAction` is running,
 then the new Overseer node will run the chain of actions for the same event again after waiting for any
 running Collection API operations belonging to the event to complete.