You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by at...@apache.org on 2023/02/02 17:04:26 UTC

[druid] branch master updated: When a task fails and doesn't throw an exception, report it correctly… (#13668)

This is an automated email from the ASF dual-hosted git repository.

atul pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new f022a9f246 When a task fails and doesn't throw an exception, report it correctly… (#13668)
f022a9f246 is described below

commit f022a9f2469d36666a98087f577d19d52fb06223
Author: Churro <88...@users.noreply.github.com>
AuthorDate: Thu Feb 2 09:04:18 2023 -0800

    When a task fails and doesn't throw an exception, report it correctly… (#13668)
    
    * When a task fails and doesn't throw an exception, report it correctly in mm-less druid
    
    * Removing unthrown exception from test
---
 .../common/actions/UpdateStatusAction.java         | 21 +++++++++++
 .../druid/indexing/common/task/AbstractTask.java   |  6 ++-
 .../indexing/common/task/AbstractTaskTest.java     | 43 +++++++++++++++++++++-
 3 files changed, 67 insertions(+), 3 deletions(-)

diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/UpdateStatusAction.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/UpdateStatusAction.java
index 38dc47d651..55199ac9d0 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/UpdateStatusAction.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/UpdateStatusAction.java
@@ -28,6 +28,8 @@ import org.apache.druid.indexer.TaskStatus;
 import org.apache.druid.indexing.common.task.Task;
 import org.apache.druid.indexing.overlord.TaskRunner;
 
+import java.util.Objects;
+
 public class UpdateStatusAction implements TaskAction<Void>
 {
   @JsonIgnore
@@ -82,4 +84,23 @@ public class UpdateStatusAction implements TaskAction<Void>
            "status=" + status +
            '}';
   }
+
+  @Override
+  public boolean equals(Object o)
+  {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    UpdateStatusAction that = (UpdateStatusAction) o;
+    return Objects.equals(status, that.status);
+  }
+
+  @Override
+  public int hashCode()
+  {
+    return Objects.hash(status);
+  }
 }
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractTask.java
index e76873e978..72bff3784b 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractTask.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractTask.java
@@ -166,7 +166,11 @@ public abstract class AbstractTask implements Task
       if (org.apache.commons.lang3.StringUtils.isNotBlank(errorMessage)) {
         return TaskStatus.failure(getId(), errorMessage);
       }
-      return runTask(taskToolbox);
+      TaskStatus taskStatus = runTask(taskToolbox);
+      if (taskStatus.isFailure()) {
+        failure = true;
+      }
+      return taskStatus;
     }
     catch (Exception e) {
       failure = true;
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/AbstractTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/AbstractTaskTest.java
index fa375a9c8f..9210006a9a 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/AbstractTaskTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/AbstractTaskTest.java
@@ -20,8 +20,10 @@
 package org.apache.druid.indexing.common.task;
 
 import org.apache.commons.io.FileUtils;
+import org.apache.druid.indexer.TaskStatus;
 import org.apache.druid.indexing.common.TaskToolbox;
 import org.apache.druid.indexing.common.actions.TaskActionClient;
+import org.apache.druid.indexing.common.actions.UpdateStatusAction;
 import org.apache.druid.indexing.common.config.TaskConfig;
 import org.apache.druid.server.DruidNode;
 import org.apache.druid.tasklogs.TaskLogPusher;
@@ -73,7 +75,8 @@ public class AbstractTaskTest
     when(toolbox.getTaskActionClient()).thenReturn(taskActionClient);
 
 
-    AbstractTask task = new NoopTask("myID", null, null, 1, 0, null, null, null) {
+    AbstractTask task = new NoopTask("myID", null, null, 1, 0, null, null, null)
+    {
       @Nullable
       @Override
       public String setup(TaskToolbox toolbox) throws Exception
@@ -116,7 +119,8 @@ public class AbstractTaskTest
     when(toolbox.getTaskActionClient()).thenReturn(taskActionClient);
 
 
-    AbstractTask task = new NoopTask("myID", null, null, 1, 0, null, null, null) {
+    AbstractTask task = new NoopTask("myID", null, null, 1, 0, null, null, null)
+    {
       @Nullable
       @Override
       public String setup(TaskToolbox toolbox) throws Exception
@@ -136,6 +140,41 @@ public class AbstractTaskTest
     verify(pusher, never()).pushTaskReports(eq("myID"), any());
   }
 
+  @Test
+  public void testTaskFailureWithoutExceptionGetsReportedCorrectly() throws Exception
+  {
+    TaskToolbox toolbox = mock(TaskToolbox.class);
+    when(toolbox.getAttemptId()).thenReturn("1");
+
+    DruidNode node = new DruidNode("foo", "foo", false, 1, 2, true, true);
+    when(toolbox.getTaskExecutorNode()).thenReturn(node);
+
+    TaskLogPusher pusher = mock(TaskLogPusher.class);
+    when(toolbox.getTaskLogPusher()).thenReturn(pusher);
+
+    TaskConfig config = mock(TaskConfig.class);
+    when(config.isEncapsulatedTask()).thenReturn(true);
+    File folder = temporaryFolder.newFolder();
+    when(config.getTaskDir(eq("myID"))).thenReturn(folder);
+    when(toolbox.getConfig()).thenReturn(config);
+
+    TaskActionClient taskActionClient = mock(TaskActionClient.class);
+    when(taskActionClient.submit(any())).thenReturn(TaskConfig.class);
+    when(toolbox.getTaskActionClient()).thenReturn(taskActionClient);
+
+    AbstractTask task = new NoopTask("myID", null, null, 1, 0, null, null, null)
+    {
+      @Override
+      public TaskStatus runTask(TaskToolbox toolbox) 
+      {
+        return TaskStatus.failure("myId", "failed");
+      }
+    };
+    task.run(toolbox);
+    UpdateStatusAction action = new UpdateStatusAction("failure");
+    verify(taskActionClient).submit(eq(action));
+  }
+
   @Test
   public void testBatchIOConfigAppend()
   {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org