You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by at...@apache.org on 2023/02/02 17:04:26 UTC
[druid] branch master updated: When a task fails and doesn't throw an exception, report it correctly… (#13668)
This is an automated email from the ASF dual-hosted git repository.
atul pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new f022a9f246 When a task fails and doesn't throw an exception, report it correctly… (#13668)
f022a9f246 is described below
commit f022a9f2469d36666a98087f577d19d52fb06223
Author: Churro <88...@users.noreply.github.com>
AuthorDate: Thu Feb 2 09:04:18 2023 -0800
When a task fails and doesn't throw an exception, report it correctly… (#13668)
* When a task fails and doesn't throw an exception, report it correctly in mm-less druid
* Removing unthrown exception from test
---
.../common/actions/UpdateStatusAction.java | 21 +++++++++++
.../druid/indexing/common/task/AbstractTask.java | 6 ++-
.../indexing/common/task/AbstractTaskTest.java | 43 +++++++++++++++++++++-
3 files changed, 67 insertions(+), 3 deletions(-)
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/UpdateStatusAction.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/UpdateStatusAction.java
index 38dc47d651..55199ac9d0 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/UpdateStatusAction.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/UpdateStatusAction.java
@@ -28,6 +28,8 @@ import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.overlord.TaskRunner;
+import java.util.Objects;
+
public class UpdateStatusAction implements TaskAction<Void>
{
@JsonIgnore
@@ -82,4 +84,23 @@ public class UpdateStatusAction implements TaskAction<Void>
"status=" + status +
'}';
}
+
+ @Override
+ public boolean equals(Object o)
+ {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ UpdateStatusAction that = (UpdateStatusAction) o;
+ return Objects.equals(status, that.status);
+ }
+
+ @Override
+ public int hashCode()
+ {
+ return Objects.hash(status);
+ }
}
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractTask.java
index e76873e978..72bff3784b 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractTask.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractTask.java
@@ -166,7 +166,11 @@ public abstract class AbstractTask implements Task
if (org.apache.commons.lang3.StringUtils.isNotBlank(errorMessage)) {
return TaskStatus.failure(getId(), errorMessage);
}
- return runTask(taskToolbox);
+ TaskStatus taskStatus = runTask(taskToolbox);
+ if (taskStatus.isFailure()) {
+ failure = true;
+ }
+ return taskStatus;
}
catch (Exception e) {
failure = true;
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/AbstractTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/AbstractTaskTest.java
index fa375a9c8f..9210006a9a 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/AbstractTaskTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/AbstractTaskTest.java
@@ -20,8 +20,10 @@
package org.apache.druid.indexing.common.task;
import org.apache.commons.io.FileUtils;
+import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexing.common.TaskToolbox;
import org.apache.druid.indexing.common.actions.TaskActionClient;
+import org.apache.druid.indexing.common.actions.UpdateStatusAction;
import org.apache.druid.indexing.common.config.TaskConfig;
import org.apache.druid.server.DruidNode;
import org.apache.druid.tasklogs.TaskLogPusher;
@@ -73,7 +75,8 @@ public class AbstractTaskTest
when(toolbox.getTaskActionClient()).thenReturn(taskActionClient);
- AbstractTask task = new NoopTask("myID", null, null, 1, 0, null, null, null) {
+ AbstractTask task = new NoopTask("myID", null, null, 1, 0, null, null, null)
+ {
@Nullable
@Override
public String setup(TaskToolbox toolbox) throws Exception
@@ -116,7 +119,8 @@ public class AbstractTaskTest
when(toolbox.getTaskActionClient()).thenReturn(taskActionClient);
- AbstractTask task = new NoopTask("myID", null, null, 1, 0, null, null, null) {
+ AbstractTask task = new NoopTask("myID", null, null, 1, 0, null, null, null)
+ {
@Nullable
@Override
public String setup(TaskToolbox toolbox) throws Exception
@@ -136,6 +140,41 @@ public class AbstractTaskTest
verify(pusher, never()).pushTaskReports(eq("myID"), any());
}
+ @Test
+ public void testTaskFailureWithoutExceptionGetsReportedCorrectly() throws Exception
+ {
+ TaskToolbox toolbox = mock(TaskToolbox.class);
+ when(toolbox.getAttemptId()).thenReturn("1");
+
+ DruidNode node = new DruidNode("foo", "foo", false, 1, 2, true, true);
+ when(toolbox.getTaskExecutorNode()).thenReturn(node);
+
+ TaskLogPusher pusher = mock(TaskLogPusher.class);
+ when(toolbox.getTaskLogPusher()).thenReturn(pusher);
+
+ TaskConfig config = mock(TaskConfig.class);
+ when(config.isEncapsulatedTask()).thenReturn(true);
+ File folder = temporaryFolder.newFolder();
+ when(config.getTaskDir(eq("myID"))).thenReturn(folder);
+ when(toolbox.getConfig()).thenReturn(config);
+
+ TaskActionClient taskActionClient = mock(TaskActionClient.class);
+ when(taskActionClient.submit(any())).thenReturn(TaskConfig.class);
+ when(toolbox.getTaskActionClient()).thenReturn(taskActionClient);
+
+ AbstractTask task = new NoopTask("myID", null, null, 1, 0, null, null, null)
+ {
+ @Override
+ public TaskStatus runTask(TaskToolbox toolbox)
+ {
+ return TaskStatus.failure("myId", "failed");
+ }
+ };
+ task.run(toolbox);
+ UpdateStatusAction action = new UpdateStatusAction("failure");
+ verify(taskActionClient).submit(eq(action));
+ }
+
@Test
public void testBatchIOConfigAppend()
{
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org