You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by ka...@apache.org on 2023/03/27 18:07:35 UTC

[druid] branch master updated: Add timeout to TaskStartTimeoutFault. (#13970)

This is an automated email from the ASF dual-hosted git repository.

karan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 062d72b67e Add timeout to TaskStartTimeoutFault. (#13970)
062d72b67e is described below

commit 062d72b67eccbb754fe74c7328cd06e0026fe5bf
Author: Gian Merlino <gi...@gmail.com>
AuthorDate: Mon Mar 27 11:07:19 2023 -0700

    Add timeout to TaskStartTimeoutFault. (#13970)
    
    * Add timeout to TaskStartTimeoutFault.
    
    Makes the error message a bit more useful.
    
    * Update docs.
---
 docs/multi-stage-query/reference.md                |  2 +-
 .../druid/msq/indexing/MSQWorkerTaskLauncher.java  |  2 +-
 .../msq/indexing/error/TaskStartTimeoutFault.java  | 38 ++++++++++++++++++----
 .../org/apache/druid/msq/exec/MSQTasksTest.java    |  2 +-
 .../msq/indexing/error/MSQFaultSerdeTest.java      |  2 +-
 5 files changed, 35 insertions(+), 11 deletions(-)

diff --git a/docs/multi-stage-query/reference.md b/docs/multi-stage-query/reference.md
index 71fc1b43af..b1a25b80e4 100644
--- a/docs/multi-stage-query/reference.md
+++ b/docs/multi-stage-query/reference.md
@@ -751,7 +751,7 @@ The following table describes error codes you may encounter in the `multiStageQu
 | <a name="error_InvalidNullByte">`InvalidNullByte`</a> | A string column included a null byte. Null bytes in strings are not permitted. | `column`: The column that included the null byte |
 | <a name="error_QueryNotSupported">`QueryNotSupported`</a> | QueryKit could not translate the provided native query to a multi-stage query.<br /> <br />This can happen if the query uses features that aren't supported, like GROUPING SETS. | |
 | <a name="error_RowTooLarge">`RowTooLarge`</a> | The query tried to process a row that was too large to write to a single frame. See the [Limits](#limits) table for specific limits on frame size. Note that the effective maximum row size is smaller than the maximum frame size due to alignment considerations during frame writing. | `maxFrameSize`: The limit on the frame size. |
-| <a name="error_TaskStartTimeout">`TaskStartTimeout`</a> | Unable to launch all the worker tasks in time. <br /> <br />There might be insufficient available slots to start all the worker tasks simultaneously.<br /> <br /> Try splitting up the query into smaller chunks with lesser `maxNumTasks` number. Another option is to increase capacity. | `numTasks`: The number of tasks attempted to launch. |
+| <a name="error_TaskStartTimeout">`TaskStartTimeout`</a> | Unable to launch `numTasks` tasks within `timeout` milliseconds.<br /><br />There may be insufficient available slots to start all the worker tasks simultaneously. Try splitting up your query into smaller chunks using a smaller value of [`maxNumTasks`](#context-parameters). Another option is to increase capacity. | `numTasks`: The number of tasks attempted to launch.<br /><br />`timeout`: Timeout, in milliseconds, that was exceeded. |
 | <a name="error_TooManyAttemptsForJob">`TooManyAttemptsForJob`</a> | Total relaunch attempt count across all workers exceeded max relaunch attempt limit. See the [Limits](#limits) table for the specific limit. | `maxRelaunchCount`: Max number of relaunches across all the workers defined in the [Limits](#limits) section. <br /><br /> `currentRelaunchCount`: current relaunch counter for the job across all workers. <br /><br /> `taskId`: Latest task id which failed <br /> <br /> `rootError [...]
 | <a name="error_TooManyAttemptsForWorker">`TooManyAttemptsForWorker`</a> | Worker exceeded maximum relaunch attempt count as defined in the [Limits](#limits) section. |`maxPerWorkerRelaunchCount`: Max number of relaunches allowed per worker as defined in the [Limits](#limits) section. <br /><br /> `workerNumber`: the worker number for which the task failed <br /><br /> `taskId`: Latest task id which failed <br /> <br /> `rootErrorMessage`: Error message of the latest failed task.|
 | <a name="error_TooManyBuckets">`TooManyBuckets`</a> | Exceeded the maximum number of partition buckets for a stage (5,000 partition buckets).<br />< br />Partition buckets are created for each [`PARTITIONED BY`](#partitioned-by) time chunk for INSERT and REPLACE queries. The most common reason for this error is that your `PARTITIONED BY` is too narrow relative to your data. | `maxBuckets`: The limit on partition buckets. |
diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQWorkerTaskLauncher.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQWorkerTaskLauncher.java
index b6d1665015..d9870daf39 100644
--- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQWorkerTaskLauncher.java
+++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQWorkerTaskLauncher.java
@@ -494,7 +494,7 @@ public class MSQWorkerTaskLauncher
 
       } else if (tracker.didRunTimeOut(maxTaskStartDelayMillis) && !canceledWorkerTasks.contains(taskId)) {
         removeWorkerFromFullyStartedWorkers(tracker);
-        throw new MSQException(new TaskStartTimeoutFault(numTasks + 1));
+        throw new MSQException(new TaskStartTimeoutFault(numTasks + 1, maxTaskStartDelayMillis));
       } else if (tracker.didFail() && !canceledWorkerTasks.contains(taskId)) {
         removeWorkerFromFullyStartedWorkers(tracker);
         log.info("Task[%s] failed because %s. Trying to relaunch the worker", taskId, tracker.status.getErrorMsg());
diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/error/TaskStartTimeoutFault.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/error/TaskStartTimeoutFault.java
index dceb730393..43c5a802e5 100644
--- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/error/TaskStartTimeoutFault.java
+++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/error/TaskStartTimeoutFault.java
@@ -25,6 +25,7 @@ import com.fasterxml.jackson.annotation.JsonTypeName;
 import org.apache.druid.msq.util.MultiStageQueryContext;
 
 import java.util.Objects;
+import java.util.concurrent.TimeUnit;
 
 @JsonTypeName(TaskStartTimeoutFault.CODE)
 public class TaskStartTimeoutFault extends BaseMSQFault
@@ -32,18 +33,26 @@ public class TaskStartTimeoutFault extends BaseMSQFault
   static final String CODE = "TaskStartTimeout";
 
   private final int numTasks;
+  private final long timeout;
 
   @JsonCreator
-  public TaskStartTimeoutFault(@JsonProperty("numTasks") int numTasks)
+  public TaskStartTimeoutFault(
+      @JsonProperty("numTasks") int numTasks,
+      @JsonProperty("timeout") long timeout
+  )
   {
     super(
         CODE,
-        "Unable to launch all the worker tasks in time. There might be insufficient available slots to start all the worker tasks simultaneously."
-        + " Try lowering '%s' in your query context to lower than [%d] tasks, or increasing capacity.",
-        MultiStageQueryContext.CTX_MAX_NUM_TASKS,
-        numTasks
+        "Unable to launch [%d] worker tasks within [%,d] seconds. "
+        + "There might be insufficient available slots to start all worker tasks simultaneously. "
+        + "Try lowering '%s' in your query context to a number that fits within your available task capacity, "
+        + "or try increasing capacity.",
+        numTasks,
+        TimeUnit.MILLISECONDS.toSeconds(timeout),
+        MultiStageQueryContext.CTX_MAX_NUM_TASKS
     );
     this.numTasks = numTasks;
+    this.timeout = timeout;
   }
 
   @JsonProperty
@@ -52,6 +61,12 @@ public class TaskStartTimeoutFault extends BaseMSQFault
     return numTasks;
   }
 
+  @JsonProperty
+  public long getTimeout()
+  {
+    return timeout;
+  }
+
   @Override
   public boolean equals(Object o)
   {
@@ -65,12 +80,21 @@ public class TaskStartTimeoutFault extends BaseMSQFault
       return false;
     }
     TaskStartTimeoutFault that = (TaskStartTimeoutFault) o;
-    return numTasks == that.numTasks;
+    return numTasks == that.numTasks && timeout == that.timeout;
   }
 
   @Override
   public int hashCode()
   {
-    return Objects.hash(super.hashCode(), numTasks);
+    return Objects.hash(super.hashCode(), numTasks, timeout);
+  }
+
+  @Override
+  public String toString()
+  {
+    return "TaskStartTimeoutFault{" +
+           "numTasks=" + numTasks +
+           ", timeout=" + timeout +
+           '}';
   }
 }
diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQTasksTest.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQTasksTest.java
index bb1d1b1dbc..8371705dfd 100644
--- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQTasksTest.java
+++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQTasksTest.java
@@ -183,7 +183,7 @@ public class MSQTasksTest
     }
     catch (Exception e) {
       Assert.assertEquals(
-          MSQFaultUtils.generateMessageWithErrorCode(new TaskStartTimeoutFault(numTasks + 1)),
+          MSQFaultUtils.generateMessageWithErrorCode(new TaskStartTimeoutFault(numTasks + 1, 5000)),
           MSQFaultUtils.generateMessageWithErrorCode(((MSQException) e.getCause()).getFault())
       );
     }
diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/error/MSQFaultSerdeTest.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/error/MSQFaultSerdeTest.java
index 435c5de065..1ad4d08b42 100644
--- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/error/MSQFaultSerdeTest.java
+++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/error/MSQFaultSerdeTest.java
@@ -64,7 +64,7 @@ public class MSQFaultSerdeTest
     assertFaultSerde(new NotEnoughMemoryFault(1000, 1000, 900, 1, 2));
     assertFaultSerde(QueryNotSupportedFault.INSTANCE);
     assertFaultSerde(new RowTooLargeFault(1000));
-    assertFaultSerde(new TaskStartTimeoutFault(10));
+    assertFaultSerde(new TaskStartTimeoutFault(10, 11));
     assertFaultSerde(new TooManyBucketsFault(10));
     assertFaultSerde(new TooManyColumnsFault(10, 8));
     assertFaultSerde(new TooManyClusteredByColumnsFault(10, 8, 1));


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org