You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by GitBox <gi...@apache.org> on 2021/07/15 02:03:03 UTC

[GitHub] [druid] jihoonson opened a new pull request #11446: Mm task failures

jihoonson opened a new pull request #11446:
URL: https://github.com/apache/druid/pull/11446


   ### Description
   
   This PR is a follow-up of https://github.com/apache/druid/pull/11419 and fills the error message in `TaskStatus` properly when the task fails in the middleManager, indexer, or peon. It includes a minor refactoring for `ForkingTaskRunner` to make it more testable. Other than that, all changes are quite straight forward.
   
   While I was making this PR, I remembered an issue with `TaskRunnerListener` that sometimes a wrong taskStatus is fed even though `TaskRunnerListener` is almost not in use. I created https://github.com/apache/druid/issues/11445 to track this issue.
   
   <hr>
   
   <!-- Check the items by putting "x" in the brackets for the done things. Not all of these items apply to every PR. Remove the items which are not done or not relevant to the PR. None of the items from the checklist below are strictly necessary, but it would be very helpful if you at least self-review the PR. -->
   
   This PR has:
   - [x] been self-reviewed.
      - [ ] using the [concurrency checklist](https://github.com/apache/druid/blob/master/dev/code-review/concurrency.md) (Remove this item if the PR doesn't have any relation to concurrency.)
   - [ ] added documentation for new or modified features or behaviors.
   - [ ] added Javadocs for most classes and all non-trivial methods. Linked related entities via Javadoc links.
   - [ ] added or updated version, license, or notice information in [licenses.yaml](https://github.com/apache/druid/blob/master/dev/license.md)
   - [x] added comments explaining the "why" and the intent of the code wherever would not be obvious for an unfamiliar reader.
   - [x] added unit tests or modified existing tests to cover new code paths, ensuring the threshold for [code coverage](https://github.com/apache/druid/blob/master/dev/code-review/code-coverage.md) is met.
   - [ ] added integration tests.
   - [ ] been tested in a test Druid cluster.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] abhishekagarwal87 commented on a change in pull request #11446: Add errorMessage in taskStatus for task failures in middleManagers/indexers/peons

Posted by GitBox <gi...@apache.org>.
abhishekagarwal87 commented on a change in pull request #11446:
URL: https://github.com/apache/druid/pull/11446#discussion_r672154705



##########
File path: indexing-service/src/main/java/org/apache/druid/indexing/overlord/ForkingTaskRunner.java
##########
@@ -364,38 +362,19 @@ public TaskStatus call()
                       );
 
                       LOGGER.info("Logging task %s output to: %s", task.getId(), logFile);
-                      boolean runFailed = true;
-
-                      final ByteSink logSink = Files.asByteSink(logFile, FileWriteMode.APPEND);
-
-                      // This will block for a while. So we append the thread information with more details
-                      final String priorThreadName = Thread.currentThread().getName();
-                      Thread.currentThread().setName(StringUtils.format("%s-[%s]", priorThreadName, task.getId()));
-
-                      try (final OutputStream toLogfile = logSink.openStream()) {
-                        ByteStreams.copy(processHolder.process.getInputStream(), toLogfile);
-                        final int statusCode = processHolder.process.waitFor();
-                        LOGGER.info("Process exited with status[%d] for task: %s", statusCode, task.getId());
-                        if (statusCode == 0) {
-                          runFailed = false;
-                        }
-                      }
-                      finally {
-                        Thread.currentThread().setName(priorThreadName);
-                        // Upload task logs
-                        taskLogPusher.pushTaskLog(task.getId(), logFile);
-                        if (reportsFile.exists()) {
-                          taskLogPusher.pushTaskReports(task.getId(), reportsFile);
-                        }
-                      }
-
-                      TaskStatus status;
-                      if (!runFailed) {
+                      final int exitCode = waitForTaskProcessToComplete(task, processHolder, logFile, reportsFile);
+                      final TaskStatus status;
+                      if (exitCode == 0) {
+                        LOGGER.info("Process exited successfully for task: %s", task.getId());
                         // Process exited successfully
                         status = jsonMapper.readValue(statusFile, TaskStatus.class);
                       } else {
+                        LOGGER.error("Process exited with code[%d] for task: %s", exitCode, task.getId());
                         // Process exited unsuccessfully
-                        status = TaskStatus.failure(task.getId());
+                        status = TaskStatus.failure(
+                            task.getId(),
+                            "Task execution process exited unsuccessfully. See middleManager logs for more details."

Review comment:
       do you want to add the exit code in the message here? 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] loquisgon commented on a change in pull request #11446: Add errorMessage in taskStatus for task failures in middleManagers/indexers/peons

Posted by GitBox <gi...@apache.org>.
loquisgon commented on a change in pull request #11446:
URL: https://github.com/apache/druid/pull/11446#discussion_r672722320



##########
File path: indexing-service/src/test/java/org/apache/druid/indexing/overlord/ForkingTaskRunnerTest.java
##########
@@ -145,12 +162,192 @@ public void testMaskedIterator()
             "-Dsome.somepassword = secret=value",
             "-Dsome.some=notasecret",
             "-Dsome.otherSecret= =asfdhkj352872598====fasdlkjfa="
-            ),
-        "java -cp /path/to/somewhere:some-jars.jar /some===file /asecretFileNa=<masked> -Dsome.property=random -Dsome.otherproperty = random=random " +
-            "-Dsome.somesecret =<masked> -Dsome.somesecret=<masked> -Dsome.somepassword =<masked> -Dsome.some=notasecret -Dsome.otherSecret=<masked>"
+        ),
+        "java -cp /path/to/somewhere:some-jars.jar /some===file /asecretFileNa=<masked> -Dsome.property=random -Dsome.otherproperty = random=random "
+        + "-Dsome.somesecret =<masked> -Dsome.somesecret=<masked> -Dsome.somepassword =<masked> -Dsome.some=notasecret -Dsome.otherSecret=<masked>"
     );
     StartupLoggingConfig startupLoggingConfig = new StartupLoggingConfig();
-    ForkingTaskRunner forkingTaskRunner = new ForkingTaskRunner(new ForkingTaskRunnerConfig(), null, new WorkerConfig(), null, null, null, null, startupLoggingConfig);
-    Assert.assertEquals(originalAndExpectedCommand.rhs, forkingTaskRunner.getMaskedCommand(startupLoggingConfig.getMaskProperties(), originalAndExpectedCommand.lhs));
+    ForkingTaskRunner forkingTaskRunner = new ForkingTaskRunner(
+        new ForkingTaskRunnerConfig(),
+        null,
+        new WorkerConfig(),
+        null,
+        null,
+        null,
+        null,
+        startupLoggingConfig
+    );
+    Assert.assertEquals(
+        originalAndExpectedCommand.rhs,
+        forkingTaskRunner.getMaskedCommand(
+            startupLoggingConfig.getMaskProperties(),
+            originalAndExpectedCommand.lhs
+        )
+    );
+  }
+
+  @Test
+  public void testTaskStatusWhenTaskProcessFails() throws ExecutionException, InterruptedException
+  {
+    ForkingTaskRunner forkingTaskRunner = new ForkingTaskRunner(
+        new ForkingTaskRunnerConfig(),
+        new TaskConfig(
+            null,
+            null,
+            null,
+            null,
+            ImmutableList.of(),
+            false,
+            new Period("PT0S"),
+            new Period("PT10S"),
+            ImmutableList.of(),
+            false,
+            false
+        ),
+        new WorkerConfig(),
+        new Properties(),
+        new NoopTaskLogs(),
+        new DefaultObjectMapper(),
+        new DruidNode("middleManager", "host", false, 8091, null, true, false),
+        new StartupLoggingConfig()
+    )
+    {
+      @Override
+      ProcessHolder runTaskProcess(List<String> command, File logFile, TaskLocation taskLocation)
+      {
+        ProcessHolder processHolder = Mockito.mock(ProcessHolder.class);
+        Mockito.doNothing().when(processHolder).registerWithCloser(ArgumentMatchers.any());
+        Mockito.doNothing().when(processHolder).shutdown();
+        return processHolder;
+      }
+
+      @Override
+      int waitForTaskProcessToComplete(Task task, ProcessHolder processHolder, File logFile, File reportsFile)
+      {
+        // Emulate task process failure
+        return 1;
+      }
+    };
+
+    final TaskStatus status = forkingTaskRunner.run(NoopTask.create()).get();
+    Assert.assertEquals(TaskState.FAILED, status.getStatusCode());
+    Assert.assertEquals(
+        "Task execution process exited unsuccessfully. See middleManager logs for more details.",

Review comment:
       Now that you added the exit code to the message you need to update this message as well

##########
File path: indexing-service/src/test/java/org/apache/druid/indexing/overlord/ForkingTaskRunnerTest.java
##########
@@ -145,12 +162,192 @@ public void testMaskedIterator()
             "-Dsome.somepassword = secret=value",
             "-Dsome.some=notasecret",
             "-Dsome.otherSecret= =asfdhkj352872598====fasdlkjfa="
-            ),
-        "java -cp /path/to/somewhere:some-jars.jar /some===file /asecretFileNa=<masked> -Dsome.property=random -Dsome.otherproperty = random=random " +
-            "-Dsome.somesecret =<masked> -Dsome.somesecret=<masked> -Dsome.somepassword =<masked> -Dsome.some=notasecret -Dsome.otherSecret=<masked>"
+        ),
+        "java -cp /path/to/somewhere:some-jars.jar /some===file /asecretFileNa=<masked> -Dsome.property=random -Dsome.otherproperty = random=random "
+        + "-Dsome.somesecret =<masked> -Dsome.somesecret=<masked> -Dsome.somepassword =<masked> -Dsome.some=notasecret -Dsome.otherSecret=<masked>"
     );
     StartupLoggingConfig startupLoggingConfig = new StartupLoggingConfig();
-    ForkingTaskRunner forkingTaskRunner = new ForkingTaskRunner(new ForkingTaskRunnerConfig(), null, new WorkerConfig(), null, null, null, null, startupLoggingConfig);
-    Assert.assertEquals(originalAndExpectedCommand.rhs, forkingTaskRunner.getMaskedCommand(startupLoggingConfig.getMaskProperties(), originalAndExpectedCommand.lhs));
+    ForkingTaskRunner forkingTaskRunner = new ForkingTaskRunner(
+        new ForkingTaskRunnerConfig(),
+        null,
+        new WorkerConfig(),
+        null,
+        null,
+        null,
+        null,
+        startupLoggingConfig
+    );
+    Assert.assertEquals(
+        originalAndExpectedCommand.rhs,
+        forkingTaskRunner.getMaskedCommand(
+            startupLoggingConfig.getMaskProperties(),
+            originalAndExpectedCommand.lhs
+        )
+    );
+  }
+
+  @Test
+  public void testTaskStatusWhenTaskProcessFails() throws ExecutionException, InterruptedException
+  {
+    ForkingTaskRunner forkingTaskRunner = new ForkingTaskRunner(
+        new ForkingTaskRunnerConfig(),
+        new TaskConfig(
+            null,
+            null,
+            null,
+            null,
+            ImmutableList.of(),
+            false,
+            new Period("PT0S"),
+            new Period("PT10S"),
+            ImmutableList.of(),
+            false,
+            false
+        ),
+        new WorkerConfig(),
+        new Properties(),
+        new NoopTaskLogs(),
+        new DefaultObjectMapper(),
+        new DruidNode("middleManager", "host", false, 8091, null, true, false),
+        new StartupLoggingConfig()
+    )
+    {
+      @Override
+      ProcessHolder runTaskProcess(List<String> command, File logFile, TaskLocation taskLocation)
+      {
+        ProcessHolder processHolder = Mockito.mock(ProcessHolder.class);
+        Mockito.doNothing().when(processHolder).registerWithCloser(ArgumentMatchers.any());
+        Mockito.doNothing().when(processHolder).shutdown();
+        return processHolder;
+      }
+
+      @Override
+      int waitForTaskProcessToComplete(Task task, ProcessHolder processHolder, File logFile, File reportsFile)
+      {
+        // Emulate task process failure
+        return 1;
+      }
+    };
+
+    final TaskStatus status = forkingTaskRunner.run(NoopTask.create()).get();
+    Assert.assertEquals(TaskState.FAILED, status.getStatusCode());
+    Assert.assertEquals(
+        "Task execution process exited unsuccessfully. See middleManager logs for more details.",

Review comment:
       "Task execution process exited unsuccessfully with code[1]. See middleManager logs for more details."




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] loquisgon commented on a change in pull request #11446: Add errorMessage in taskStatus for task failures in middleManagers/indexers/peons

Posted by GitBox <gi...@apache.org>.
loquisgon commented on a change in pull request #11446:
URL: https://github.com/apache/druid/pull/11446#discussion_r672722320



##########
File path: indexing-service/src/test/java/org/apache/druid/indexing/overlord/ForkingTaskRunnerTest.java
##########
@@ -145,12 +162,192 @@ public void testMaskedIterator()
             "-Dsome.somepassword = secret=value",
             "-Dsome.some=notasecret",
             "-Dsome.otherSecret= =asfdhkj352872598====fasdlkjfa="
-            ),
-        "java -cp /path/to/somewhere:some-jars.jar /some===file /asecretFileNa=<masked> -Dsome.property=random -Dsome.otherproperty = random=random " +
-            "-Dsome.somesecret =<masked> -Dsome.somesecret=<masked> -Dsome.somepassword =<masked> -Dsome.some=notasecret -Dsome.otherSecret=<masked>"
+        ),
+        "java -cp /path/to/somewhere:some-jars.jar /some===file /asecretFileNa=<masked> -Dsome.property=random -Dsome.otherproperty = random=random "
+        + "-Dsome.somesecret =<masked> -Dsome.somesecret=<masked> -Dsome.somepassword =<masked> -Dsome.some=notasecret -Dsome.otherSecret=<masked>"
     );
     StartupLoggingConfig startupLoggingConfig = new StartupLoggingConfig();
-    ForkingTaskRunner forkingTaskRunner = new ForkingTaskRunner(new ForkingTaskRunnerConfig(), null, new WorkerConfig(), null, null, null, null, startupLoggingConfig);
-    Assert.assertEquals(originalAndExpectedCommand.rhs, forkingTaskRunner.getMaskedCommand(startupLoggingConfig.getMaskProperties(), originalAndExpectedCommand.lhs));
+    ForkingTaskRunner forkingTaskRunner = new ForkingTaskRunner(
+        new ForkingTaskRunnerConfig(),
+        null,
+        new WorkerConfig(),
+        null,
+        null,
+        null,
+        null,
+        startupLoggingConfig
+    );
+    Assert.assertEquals(
+        originalAndExpectedCommand.rhs,
+        forkingTaskRunner.getMaskedCommand(
+            startupLoggingConfig.getMaskProperties(),
+            originalAndExpectedCommand.lhs
+        )
+    );
+  }
+
+  @Test
+  public void testTaskStatusWhenTaskProcessFails() throws ExecutionException, InterruptedException
+  {
+    ForkingTaskRunner forkingTaskRunner = new ForkingTaskRunner(
+        new ForkingTaskRunnerConfig(),
+        new TaskConfig(
+            null,
+            null,
+            null,
+            null,
+            ImmutableList.of(),
+            false,
+            new Period("PT0S"),
+            new Period("PT10S"),
+            ImmutableList.of(),
+            false,
+            false
+        ),
+        new WorkerConfig(),
+        new Properties(),
+        new NoopTaskLogs(),
+        new DefaultObjectMapper(),
+        new DruidNode("middleManager", "host", false, 8091, null, true, false),
+        new StartupLoggingConfig()
+    )
+    {
+      @Override
+      ProcessHolder runTaskProcess(List<String> command, File logFile, TaskLocation taskLocation)
+      {
+        ProcessHolder processHolder = Mockito.mock(ProcessHolder.class);
+        Mockito.doNothing().when(processHolder).registerWithCloser(ArgumentMatchers.any());
+        Mockito.doNothing().when(processHolder).shutdown();
+        return processHolder;
+      }
+
+      @Override
+      int waitForTaskProcessToComplete(Task task, ProcessHolder processHolder, File logFile, File reportsFile)
+      {
+        // Emulate task process failure
+        return 1;
+      }
+    };
+
+    final TaskStatus status = forkingTaskRunner.run(NoopTask.create()).get();
+    Assert.assertEquals(TaskState.FAILED, status.getStatusCode());
+    Assert.assertEquals(
+        "Task execution process exited unsuccessfully. See middleManager logs for more details.",

Review comment:
       Now that you added the exit code to the message you need to update this message as well

##########
File path: indexing-service/src/test/java/org/apache/druid/indexing/overlord/ForkingTaskRunnerTest.java
##########
@@ -145,12 +162,192 @@ public void testMaskedIterator()
             "-Dsome.somepassword = secret=value",
             "-Dsome.some=notasecret",
             "-Dsome.otherSecret= =asfdhkj352872598====fasdlkjfa="
-            ),
-        "java -cp /path/to/somewhere:some-jars.jar /some===file /asecretFileNa=<masked> -Dsome.property=random -Dsome.otherproperty = random=random " +
-            "-Dsome.somesecret =<masked> -Dsome.somesecret=<masked> -Dsome.somepassword =<masked> -Dsome.some=notasecret -Dsome.otherSecret=<masked>"
+        ),
+        "java -cp /path/to/somewhere:some-jars.jar /some===file /asecretFileNa=<masked> -Dsome.property=random -Dsome.otherproperty = random=random "
+        + "-Dsome.somesecret =<masked> -Dsome.somesecret=<masked> -Dsome.somepassword =<masked> -Dsome.some=notasecret -Dsome.otherSecret=<masked>"
     );
     StartupLoggingConfig startupLoggingConfig = new StartupLoggingConfig();
-    ForkingTaskRunner forkingTaskRunner = new ForkingTaskRunner(new ForkingTaskRunnerConfig(), null, new WorkerConfig(), null, null, null, null, startupLoggingConfig);
-    Assert.assertEquals(originalAndExpectedCommand.rhs, forkingTaskRunner.getMaskedCommand(startupLoggingConfig.getMaskProperties(), originalAndExpectedCommand.lhs));
+    ForkingTaskRunner forkingTaskRunner = new ForkingTaskRunner(
+        new ForkingTaskRunnerConfig(),
+        null,
+        new WorkerConfig(),
+        null,
+        null,
+        null,
+        null,
+        startupLoggingConfig
+    );
+    Assert.assertEquals(
+        originalAndExpectedCommand.rhs,
+        forkingTaskRunner.getMaskedCommand(
+            startupLoggingConfig.getMaskProperties(),
+            originalAndExpectedCommand.lhs
+        )
+    );
+  }
+
+  @Test
+  public void testTaskStatusWhenTaskProcessFails() throws ExecutionException, InterruptedException
+  {
+    ForkingTaskRunner forkingTaskRunner = new ForkingTaskRunner(
+        new ForkingTaskRunnerConfig(),
+        new TaskConfig(
+            null,
+            null,
+            null,
+            null,
+            ImmutableList.of(),
+            false,
+            new Period("PT0S"),
+            new Period("PT10S"),
+            ImmutableList.of(),
+            false,
+            false
+        ),
+        new WorkerConfig(),
+        new Properties(),
+        new NoopTaskLogs(),
+        new DefaultObjectMapper(),
+        new DruidNode("middleManager", "host", false, 8091, null, true, false),
+        new StartupLoggingConfig()
+    )
+    {
+      @Override
+      ProcessHolder runTaskProcess(List<String> command, File logFile, TaskLocation taskLocation)
+      {
+        ProcessHolder processHolder = Mockito.mock(ProcessHolder.class);
+        Mockito.doNothing().when(processHolder).registerWithCloser(ArgumentMatchers.any());
+        Mockito.doNothing().when(processHolder).shutdown();
+        return processHolder;
+      }
+
+      @Override
+      int waitForTaskProcessToComplete(Task task, ProcessHolder processHolder, File logFile, File reportsFile)
+      {
+        // Emulate task process failure
+        return 1;
+      }
+    };
+
+    final TaskStatus status = forkingTaskRunner.run(NoopTask.create()).get();
+    Assert.assertEquals(TaskState.FAILED, status.getStatusCode());
+    Assert.assertEquals(
+        "Task execution process exited unsuccessfully. See middleManager logs for more details.",

Review comment:
       "Task execution process exited unsuccessfully with code[1]. See middleManager logs for more details."




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] jihoonson commented on a change in pull request #11446: Add errorMessage in taskStatus for task failures in middleManagers/indexers/peons

Posted by GitBox <gi...@apache.org>.
jihoonson commented on a change in pull request #11446:
URL: https://github.com/apache/druid/pull/11446#discussion_r672676958



##########
File path: indexing-service/src/main/java/org/apache/druid/indexing/overlord/ForkingTaskRunner.java
##########
@@ -364,38 +362,19 @@ public TaskStatus call()
                       );
 
                       LOGGER.info("Logging task %s output to: %s", task.getId(), logFile);
-                      boolean runFailed = true;
-
-                      final ByteSink logSink = Files.asByteSink(logFile, FileWriteMode.APPEND);
-
-                      // This will block for a while. So we append the thread information with more details
-                      final String priorThreadName = Thread.currentThread().getName();
-                      Thread.currentThread().setName(StringUtils.format("%s-[%s]", priorThreadName, task.getId()));
-
-                      try (final OutputStream toLogfile = logSink.openStream()) {
-                        ByteStreams.copy(processHolder.process.getInputStream(), toLogfile);
-                        final int statusCode = processHolder.process.waitFor();
-                        LOGGER.info("Process exited with status[%d] for task: %s", statusCode, task.getId());
-                        if (statusCode == 0) {
-                          runFailed = false;
-                        }
-                      }
-                      finally {
-                        Thread.currentThread().setName(priorThreadName);
-                        // Upload task logs
-                        taskLogPusher.pushTaskLog(task.getId(), logFile);
-                        if (reportsFile.exists()) {
-                          taskLogPusher.pushTaskReports(task.getId(), reportsFile);
-                        }
-                      }
-
-                      TaskStatus status;
-                      if (!runFailed) {
+                      final int exitCode = waitForTaskProcessToComplete(task, processHolder, logFile, reportsFile);
+                      final TaskStatus status;
+                      if (exitCode == 0) {
+                        LOGGER.info("Process exited successfully for task: %s", task.getId());
                         // Process exited successfully
                         status = jsonMapper.readValue(statusFile, TaskStatus.class);
                       } else {
+                        LOGGER.error("Process exited with code[%d] for task: %s", exitCode, task.getId());
                         // Process exited unsuccessfully
-                        status = TaskStatus.failure(task.getId());
+                        status = TaskStatus.failure(
+                            task.getId(),
+                            "Task execution process exited unsuccessfully. See middleManager logs for more details."

Review comment:
       It seems reasonable to include it. Unlike other exceptions thrown in Druid, middleManager logs will probably not provide much information about the process failure. :+1: 

##########
File path: indexing-service/src/test/java/org/apache/druid/indexing/overlord/ForkingTaskRunnerTest.java
##########
@@ -145,12 +162,192 @@ public void testMaskedIterator()
             "-Dsome.somepassword = secret=value",
             "-Dsome.some=notasecret",
             "-Dsome.otherSecret= =asfdhkj352872598====fasdlkjfa="
-            ),
-        "java -cp /path/to/somewhere:some-jars.jar /some===file /asecretFileNa=<masked> -Dsome.property=random -Dsome.otherproperty = random=random " +
-            "-Dsome.somesecret =<masked> -Dsome.somesecret=<masked> -Dsome.somepassword =<masked> -Dsome.some=notasecret -Dsome.otherSecret=<masked>"
+        ),
+        "java -cp /path/to/somewhere:some-jars.jar /some===file /asecretFileNa=<masked> -Dsome.property=random -Dsome.otherproperty = random=random "
+        + "-Dsome.somesecret =<masked> -Dsome.somesecret=<masked> -Dsome.somepassword =<masked> -Dsome.some=notasecret -Dsome.otherSecret=<masked>"
     );
     StartupLoggingConfig startupLoggingConfig = new StartupLoggingConfig();
-    ForkingTaskRunner forkingTaskRunner = new ForkingTaskRunner(new ForkingTaskRunnerConfig(), null, new WorkerConfig(), null, null, null, null, startupLoggingConfig);
-    Assert.assertEquals(originalAndExpectedCommand.rhs, forkingTaskRunner.getMaskedCommand(startupLoggingConfig.getMaskProperties(), originalAndExpectedCommand.lhs));
+    ForkingTaskRunner forkingTaskRunner = new ForkingTaskRunner(
+        new ForkingTaskRunnerConfig(),
+        null,
+        new WorkerConfig(),
+        null,
+        null,
+        null,
+        null,
+        startupLoggingConfig
+    );
+    Assert.assertEquals(
+        originalAndExpectedCommand.rhs,
+        forkingTaskRunner.getMaskedCommand(
+            startupLoggingConfig.getMaskProperties(),
+            originalAndExpectedCommand.lhs
+        )
+    );
+  }
+
+  @Test
+  public void testTaskStatusWhenTaskProcessFails() throws ExecutionException, InterruptedException
+  {
+    ForkingTaskRunner forkingTaskRunner = new ForkingTaskRunner(
+        new ForkingTaskRunnerConfig(),
+        new TaskConfig(
+            null,
+            null,
+            null,
+            null,
+            ImmutableList.of(),
+            false,
+            new Period("PT0S"),
+            new Period("PT10S"),
+            ImmutableList.of(),
+            false,
+            false
+        ),
+        new WorkerConfig(),
+        new Properties(),
+        new NoopTaskLogs(),
+        new DefaultObjectMapper(),
+        new DruidNode("middleManager", "host", false, 8091, null, true, false),
+        new StartupLoggingConfig()
+    )
+    {
+      @Override
+      ProcessHolder runTaskProcess(List<String> command, File logFile, TaskLocation taskLocation)
+      {
+        ProcessHolder processHolder = Mockito.mock(ProcessHolder.class);
+        Mockito.doNothing().when(processHolder).registerWithCloser(ArgumentMatchers.any());
+        Mockito.doNothing().when(processHolder).shutdown();
+        return processHolder;
+      }
+
+      @Override
+      int waitForTaskProcessToComplete(Task task, ProcessHolder processHolder, File logFile, File reportsFile)
+      {
+        // Emulate task process failure
+        return 1;
+      }
+    };
+
+    final TaskStatus status = forkingTaskRunner.run(NoopTask.create()).get();
+    Assert.assertEquals(TaskState.FAILED, status.getStatusCode());
+    Assert.assertEquals(
+        "Task execution process exited unsuccessfully. See middleManager logs for more details.",

Review comment:
       Oops thanks :+1: 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] jihoonson commented on pull request #11446: Add errorMessage in taskStatus for task failures in middleManagers/indexers/peons

Posted by GitBox <gi...@apache.org>.
jihoonson commented on pull request #11446:
URL: https://github.com/apache/druid/pull/11446#issuecomment-883880605


   @abhishekagarwal87 @loquisgon thanks for the review :+1: 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] jihoonson commented on a change in pull request #11446: Add errorMessage in taskStatus for task failures in middleManagers/indexers/peons

Posted by GitBox <gi...@apache.org>.
jihoonson commented on a change in pull request #11446:
URL: https://github.com/apache/druid/pull/11446#discussion_r672676958



##########
File path: indexing-service/src/main/java/org/apache/druid/indexing/overlord/ForkingTaskRunner.java
##########
@@ -364,38 +362,19 @@ public TaskStatus call()
                       );
 
                       LOGGER.info("Logging task %s output to: %s", task.getId(), logFile);
-                      boolean runFailed = true;
-
-                      final ByteSink logSink = Files.asByteSink(logFile, FileWriteMode.APPEND);
-
-                      // This will block for a while. So we append the thread information with more details
-                      final String priorThreadName = Thread.currentThread().getName();
-                      Thread.currentThread().setName(StringUtils.format("%s-[%s]", priorThreadName, task.getId()));
-
-                      try (final OutputStream toLogfile = logSink.openStream()) {
-                        ByteStreams.copy(processHolder.process.getInputStream(), toLogfile);
-                        final int statusCode = processHolder.process.waitFor();
-                        LOGGER.info("Process exited with status[%d] for task: %s", statusCode, task.getId());
-                        if (statusCode == 0) {
-                          runFailed = false;
-                        }
-                      }
-                      finally {
-                        Thread.currentThread().setName(priorThreadName);
-                        // Upload task logs
-                        taskLogPusher.pushTaskLog(task.getId(), logFile);
-                        if (reportsFile.exists()) {
-                          taskLogPusher.pushTaskReports(task.getId(), reportsFile);
-                        }
-                      }
-
-                      TaskStatus status;
-                      if (!runFailed) {
+                      final int exitCode = waitForTaskProcessToComplete(task, processHolder, logFile, reportsFile);
+                      final TaskStatus status;
+                      if (exitCode == 0) {
+                        LOGGER.info("Process exited successfully for task: %s", task.getId());
                         // Process exited successfully
                         status = jsonMapper.readValue(statusFile, TaskStatus.class);
                       } else {
+                        LOGGER.error("Process exited with code[%d] for task: %s", exitCode, task.getId());
                         // Process exited unsuccessfully
-                        status = TaskStatus.failure(task.getId());
+                        status = TaskStatus.failure(
+                            task.getId(),
+                            "Task execution process exited unsuccessfully. See middleManager logs for more details."

Review comment:
       It seems reasonable to include it. Unlike other exceptions thrown in Druid, middleManager logs will probably not provide much information about the process failure. :+1: 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] loquisgon commented on a change in pull request #11446: Add errorMessage in taskStatus for task failures in middleManagers/indexers/peons

Posted by GitBox <gi...@apache.org>.
loquisgon commented on a change in pull request #11446:
URL: https://github.com/apache/druid/pull/11446#discussion_r672722320



##########
File path: indexing-service/src/test/java/org/apache/druid/indexing/overlord/ForkingTaskRunnerTest.java
##########
@@ -145,12 +162,192 @@ public void testMaskedIterator()
             "-Dsome.somepassword = secret=value",
             "-Dsome.some=notasecret",
             "-Dsome.otherSecret= =asfdhkj352872598====fasdlkjfa="
-            ),
-        "java -cp /path/to/somewhere:some-jars.jar /some===file /asecretFileNa=<masked> -Dsome.property=random -Dsome.otherproperty = random=random " +
-            "-Dsome.somesecret =<masked> -Dsome.somesecret=<masked> -Dsome.somepassword =<masked> -Dsome.some=notasecret -Dsome.otherSecret=<masked>"
+        ),
+        "java -cp /path/to/somewhere:some-jars.jar /some===file /asecretFileNa=<masked> -Dsome.property=random -Dsome.otherproperty = random=random "
+        + "-Dsome.somesecret =<masked> -Dsome.somesecret=<masked> -Dsome.somepassword =<masked> -Dsome.some=notasecret -Dsome.otherSecret=<masked>"
     );
     StartupLoggingConfig startupLoggingConfig = new StartupLoggingConfig();
-    ForkingTaskRunner forkingTaskRunner = new ForkingTaskRunner(new ForkingTaskRunnerConfig(), null, new WorkerConfig(), null, null, null, null, startupLoggingConfig);
-    Assert.assertEquals(originalAndExpectedCommand.rhs, forkingTaskRunner.getMaskedCommand(startupLoggingConfig.getMaskProperties(), originalAndExpectedCommand.lhs));
+    ForkingTaskRunner forkingTaskRunner = new ForkingTaskRunner(
+        new ForkingTaskRunnerConfig(),
+        null,
+        new WorkerConfig(),
+        null,
+        null,
+        null,
+        null,
+        startupLoggingConfig
+    );
+    Assert.assertEquals(
+        originalAndExpectedCommand.rhs,
+        forkingTaskRunner.getMaskedCommand(
+            startupLoggingConfig.getMaskProperties(),
+            originalAndExpectedCommand.lhs
+        )
+    );
+  }
+
+  @Test
+  public void testTaskStatusWhenTaskProcessFails() throws ExecutionException, InterruptedException
+  {
+    ForkingTaskRunner forkingTaskRunner = new ForkingTaskRunner(
+        new ForkingTaskRunnerConfig(),
+        new TaskConfig(
+            null,
+            null,
+            null,
+            null,
+            ImmutableList.of(),
+            false,
+            new Period("PT0S"),
+            new Period("PT10S"),
+            ImmutableList.of(),
+            false,
+            false
+        ),
+        new WorkerConfig(),
+        new Properties(),
+        new NoopTaskLogs(),
+        new DefaultObjectMapper(),
+        new DruidNode("middleManager", "host", false, 8091, null, true, false),
+        new StartupLoggingConfig()
+    )
+    {
+      @Override
+      ProcessHolder runTaskProcess(List<String> command, File logFile, TaskLocation taskLocation)
+      {
+        ProcessHolder processHolder = Mockito.mock(ProcessHolder.class);
+        Mockito.doNothing().when(processHolder).registerWithCloser(ArgumentMatchers.any());
+        Mockito.doNothing().when(processHolder).shutdown();
+        return processHolder;
+      }
+
+      @Override
+      int waitForTaskProcessToComplete(Task task, ProcessHolder processHolder, File logFile, File reportsFile)
+      {
+        // Emulate task process failure
+        return 1;
+      }
+    };
+
+    final TaskStatus status = forkingTaskRunner.run(NoopTask.create()).get();
+    Assert.assertEquals(TaskState.FAILED, status.getStatusCode());
+    Assert.assertEquals(
+        "Task execution process exited unsuccessfully. See middleManager logs for more details.",

Review comment:
       Now that you added the exit code to the message you need to update this message as well

##########
File path: indexing-service/src/test/java/org/apache/druid/indexing/overlord/ForkingTaskRunnerTest.java
##########
@@ -145,12 +162,192 @@ public void testMaskedIterator()
             "-Dsome.somepassword = secret=value",
             "-Dsome.some=notasecret",
             "-Dsome.otherSecret= =asfdhkj352872598====fasdlkjfa="
-            ),
-        "java -cp /path/to/somewhere:some-jars.jar /some===file /asecretFileNa=<masked> -Dsome.property=random -Dsome.otherproperty = random=random " +
-            "-Dsome.somesecret =<masked> -Dsome.somesecret=<masked> -Dsome.somepassword =<masked> -Dsome.some=notasecret -Dsome.otherSecret=<masked>"
+        ),
+        "java -cp /path/to/somewhere:some-jars.jar /some===file /asecretFileNa=<masked> -Dsome.property=random -Dsome.otherproperty = random=random "
+        + "-Dsome.somesecret =<masked> -Dsome.somesecret=<masked> -Dsome.somepassword =<masked> -Dsome.some=notasecret -Dsome.otherSecret=<masked>"
     );
     StartupLoggingConfig startupLoggingConfig = new StartupLoggingConfig();
-    ForkingTaskRunner forkingTaskRunner = new ForkingTaskRunner(new ForkingTaskRunnerConfig(), null, new WorkerConfig(), null, null, null, null, startupLoggingConfig);
-    Assert.assertEquals(originalAndExpectedCommand.rhs, forkingTaskRunner.getMaskedCommand(startupLoggingConfig.getMaskProperties(), originalAndExpectedCommand.lhs));
+    ForkingTaskRunner forkingTaskRunner = new ForkingTaskRunner(
+        new ForkingTaskRunnerConfig(),
+        null,
+        new WorkerConfig(),
+        null,
+        null,
+        null,
+        null,
+        startupLoggingConfig
+    );
+    Assert.assertEquals(
+        originalAndExpectedCommand.rhs,
+        forkingTaskRunner.getMaskedCommand(
+            startupLoggingConfig.getMaskProperties(),
+            originalAndExpectedCommand.lhs
+        )
+    );
+  }
+
+  @Test
+  public void testTaskStatusWhenTaskProcessFails() throws ExecutionException, InterruptedException
+  {
+    ForkingTaskRunner forkingTaskRunner = new ForkingTaskRunner(
+        new ForkingTaskRunnerConfig(),
+        new TaskConfig(
+            null,
+            null,
+            null,
+            null,
+            ImmutableList.of(),
+            false,
+            new Period("PT0S"),
+            new Period("PT10S"),
+            ImmutableList.of(),
+            false,
+            false
+        ),
+        new WorkerConfig(),
+        new Properties(),
+        new NoopTaskLogs(),
+        new DefaultObjectMapper(),
+        new DruidNode("middleManager", "host", false, 8091, null, true, false),
+        new StartupLoggingConfig()
+    )
+    {
+      @Override
+      ProcessHolder runTaskProcess(List<String> command, File logFile, TaskLocation taskLocation)
+      {
+        ProcessHolder processHolder = Mockito.mock(ProcessHolder.class);
+        Mockito.doNothing().when(processHolder).registerWithCloser(ArgumentMatchers.any());
+        Mockito.doNothing().when(processHolder).shutdown();
+        return processHolder;
+      }
+
+      @Override
+      int waitForTaskProcessToComplete(Task task, ProcessHolder processHolder, File logFile, File reportsFile)
+      {
+        // Emulate task process failure
+        return 1;
+      }
+    };
+
+    final TaskStatus status = forkingTaskRunner.run(NoopTask.create()).get();
+    Assert.assertEquals(TaskState.FAILED, status.getStatusCode());
+    Assert.assertEquals(
+        "Task execution process exited unsuccessfully. See middleManager logs for more details.",

Review comment:
       "Task execution process exited unsuccessfully with code[1]. See middleManager logs for more details."




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] jihoonson merged pull request #11446: Add errorMessage in taskStatus for task failures in middleManagers/indexers/peons

Posted by GitBox <gi...@apache.org>.
jihoonson merged pull request #11446:
URL: https://github.com/apache/druid/pull/11446


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] jihoonson commented on a change in pull request #11446: Add errorMessage in taskStatus for task failures in middleManagers/indexers/peons

Posted by GitBox <gi...@apache.org>.
jihoonson commented on a change in pull request #11446:
URL: https://github.com/apache/druid/pull/11446#discussion_r672728193



##########
File path: indexing-service/src/test/java/org/apache/druid/indexing/overlord/ForkingTaskRunnerTest.java
##########
@@ -145,12 +162,192 @@ public void testMaskedIterator()
             "-Dsome.somepassword = secret=value",
             "-Dsome.some=notasecret",
             "-Dsome.otherSecret= =asfdhkj352872598====fasdlkjfa="
-            ),
-        "java -cp /path/to/somewhere:some-jars.jar /some===file /asecretFileNa=<masked> -Dsome.property=random -Dsome.otherproperty = random=random " +
-            "-Dsome.somesecret =<masked> -Dsome.somesecret=<masked> -Dsome.somepassword =<masked> -Dsome.some=notasecret -Dsome.otherSecret=<masked>"
+        ),
+        "java -cp /path/to/somewhere:some-jars.jar /some===file /asecretFileNa=<masked> -Dsome.property=random -Dsome.otherproperty = random=random "
+        + "-Dsome.somesecret =<masked> -Dsome.somesecret=<masked> -Dsome.somepassword =<masked> -Dsome.some=notasecret -Dsome.otherSecret=<masked>"
     );
     StartupLoggingConfig startupLoggingConfig = new StartupLoggingConfig();
-    ForkingTaskRunner forkingTaskRunner = new ForkingTaskRunner(new ForkingTaskRunnerConfig(), null, new WorkerConfig(), null, null, null, null, startupLoggingConfig);
-    Assert.assertEquals(originalAndExpectedCommand.rhs, forkingTaskRunner.getMaskedCommand(startupLoggingConfig.getMaskProperties(), originalAndExpectedCommand.lhs));
+    ForkingTaskRunner forkingTaskRunner = new ForkingTaskRunner(
+        new ForkingTaskRunnerConfig(),
+        null,
+        new WorkerConfig(),
+        null,
+        null,
+        null,
+        null,
+        startupLoggingConfig
+    );
+    Assert.assertEquals(
+        originalAndExpectedCommand.rhs,
+        forkingTaskRunner.getMaskedCommand(
+            startupLoggingConfig.getMaskProperties(),
+            originalAndExpectedCommand.lhs
+        )
+    );
+  }
+
+  @Test
+  public void testTaskStatusWhenTaskProcessFails() throws ExecutionException, InterruptedException
+  {
+    ForkingTaskRunner forkingTaskRunner = new ForkingTaskRunner(
+        new ForkingTaskRunnerConfig(),
+        new TaskConfig(
+            null,
+            null,
+            null,
+            null,
+            ImmutableList.of(),
+            false,
+            new Period("PT0S"),
+            new Period("PT10S"),
+            ImmutableList.of(),
+            false,
+            false
+        ),
+        new WorkerConfig(),
+        new Properties(),
+        new NoopTaskLogs(),
+        new DefaultObjectMapper(),
+        new DruidNode("middleManager", "host", false, 8091, null, true, false),
+        new StartupLoggingConfig()
+    )
+    {
+      @Override
+      ProcessHolder runTaskProcess(List<String> command, File logFile, TaskLocation taskLocation)
+      {
+        ProcessHolder processHolder = Mockito.mock(ProcessHolder.class);
+        Mockito.doNothing().when(processHolder).registerWithCloser(ArgumentMatchers.any());
+        Mockito.doNothing().when(processHolder).shutdown();
+        return processHolder;
+      }
+
+      @Override
+      int waitForTaskProcessToComplete(Task task, ProcessHolder processHolder, File logFile, File reportsFile)
+      {
+        // Emulate task process failure
+        return 1;
+      }
+    };
+
+    final TaskStatus status = forkingTaskRunner.run(NoopTask.create()).get();
+    Assert.assertEquals(TaskState.FAILED, status.getStatusCode());
+    Assert.assertEquals(
+        "Task execution process exited unsuccessfully. See middleManager logs for more details.",

Review comment:
       Oops thanks :+1: 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] loquisgon commented on a change in pull request #11446: Add errorMessage in taskStatus for task failures in middleManagers/indexers/peons

Posted by GitBox <gi...@apache.org>.
loquisgon commented on a change in pull request #11446:
URL: https://github.com/apache/druid/pull/11446#discussion_r673458541



##########
File path: indexing-service/src/test/java/org/apache/druid/indexing/overlord/ForkingTaskRunnerTest.java
##########
@@ -145,12 +162,192 @@ public void testMaskedIterator()
             "-Dsome.somepassword = secret=value",
             "-Dsome.some=notasecret",
             "-Dsome.otherSecret= =asfdhkj352872598====fasdlkjfa="
-            ),
-        "java -cp /path/to/somewhere:some-jars.jar /some===file /asecretFileNa=<masked> -Dsome.property=random -Dsome.otherproperty = random=random " +
-            "-Dsome.somesecret =<masked> -Dsome.somesecret=<masked> -Dsome.somepassword =<masked> -Dsome.some=notasecret -Dsome.otherSecret=<masked>"
+        ),
+        "java -cp /path/to/somewhere:some-jars.jar /some===file /asecretFileNa=<masked> -Dsome.property=random -Dsome.otherproperty = random=random "
+        + "-Dsome.somesecret =<masked> -Dsome.somesecret=<masked> -Dsome.somepassword =<masked> -Dsome.some=notasecret -Dsome.otherSecret=<masked>"
     );
     StartupLoggingConfig startupLoggingConfig = new StartupLoggingConfig();
-    ForkingTaskRunner forkingTaskRunner = new ForkingTaskRunner(new ForkingTaskRunnerConfig(), null, new WorkerConfig(), null, null, null, null, startupLoggingConfig);
-    Assert.assertEquals(originalAndExpectedCommand.rhs, forkingTaskRunner.getMaskedCommand(startupLoggingConfig.getMaskProperties(), originalAndExpectedCommand.lhs));
+    ForkingTaskRunner forkingTaskRunner = new ForkingTaskRunner(
+        new ForkingTaskRunnerConfig(),
+        null,
+        new WorkerConfig(),
+        null,
+        null,
+        null,
+        null,
+        startupLoggingConfig
+    );
+    Assert.assertEquals(
+        originalAndExpectedCommand.rhs,
+        forkingTaskRunner.getMaskedCommand(
+            startupLoggingConfig.getMaskProperties(),
+            originalAndExpectedCommand.lhs
+        )
+    );
+  }
+
+  @Test
+  public void testTaskStatusWhenTaskProcessFails() throws ExecutionException, InterruptedException
+  {
+    ForkingTaskRunner forkingTaskRunner = new ForkingTaskRunner(
+        new ForkingTaskRunnerConfig(),
+        new TaskConfig(
+            null,
+            null,
+            null,
+            null,
+            ImmutableList.of(),
+            false,
+            new Period("PT0S"),
+            new Period("PT10S"),
+            ImmutableList.of(),
+            false,
+            false
+        ),
+        new WorkerConfig(),
+        new Properties(),
+        new NoopTaskLogs(),
+        new DefaultObjectMapper(),
+        new DruidNode("middleManager", "host", false, 8091, null, true, false),
+        new StartupLoggingConfig()
+    )
+    {
+      @Override
+      ProcessHolder runTaskProcess(List<String> command, File logFile, TaskLocation taskLocation)
+      {
+        ProcessHolder processHolder = Mockito.mock(ProcessHolder.class);
+        Mockito.doNothing().when(processHolder).registerWithCloser(ArgumentMatchers.any());
+        Mockito.doNothing().when(processHolder).shutdown();
+        return processHolder;
+      }
+
+      @Override
+      int waitForTaskProcessToComplete(Task task, ProcessHolder processHolder, File logFile, File reportsFile)
+      {
+        // Emulate task process failure
+        return 1;
+      }
+    };
+
+    final TaskStatus status = forkingTaskRunner.run(NoopTask.create()).get();
+    Assert.assertEquals(TaskState.FAILED, status.getStatusCode());
+    Assert.assertEquals(
+        "Task execution process exited unsuccessfully. See middleManager logs for more details.",

Review comment:
       LGTM 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] abhishekagarwal87 commented on a change in pull request #11446: Add errorMessage in taskStatus for task failures in middleManagers/indexers/peons

Posted by GitBox <gi...@apache.org>.
abhishekagarwal87 commented on a change in pull request #11446:
URL: https://github.com/apache/druid/pull/11446#discussion_r672154705



##########
File path: indexing-service/src/main/java/org/apache/druid/indexing/overlord/ForkingTaskRunner.java
##########
@@ -364,38 +362,19 @@ public TaskStatus call()
                       );
 
                       LOGGER.info("Logging task %s output to: %s", task.getId(), logFile);
-                      boolean runFailed = true;
-
-                      final ByteSink logSink = Files.asByteSink(logFile, FileWriteMode.APPEND);
-
-                      // This will block for a while. So we append the thread information with more details
-                      final String priorThreadName = Thread.currentThread().getName();
-                      Thread.currentThread().setName(StringUtils.format("%s-[%s]", priorThreadName, task.getId()));
-
-                      try (final OutputStream toLogfile = logSink.openStream()) {
-                        ByteStreams.copy(processHolder.process.getInputStream(), toLogfile);
-                        final int statusCode = processHolder.process.waitFor();
-                        LOGGER.info("Process exited with status[%d] for task: %s", statusCode, task.getId());
-                        if (statusCode == 0) {
-                          runFailed = false;
-                        }
-                      }
-                      finally {
-                        Thread.currentThread().setName(priorThreadName);
-                        // Upload task logs
-                        taskLogPusher.pushTaskLog(task.getId(), logFile);
-                        if (reportsFile.exists()) {
-                          taskLogPusher.pushTaskReports(task.getId(), reportsFile);
-                        }
-                      }
-
-                      TaskStatus status;
-                      if (!runFailed) {
+                      final int exitCode = waitForTaskProcessToComplete(task, processHolder, logFile, reportsFile);
+                      final TaskStatus status;
+                      if (exitCode == 0) {
+                        LOGGER.info("Process exited successfully for task: %s", task.getId());
                         // Process exited successfully
                         status = jsonMapper.readValue(statusFile, TaskStatus.class);
                       } else {
+                        LOGGER.error("Process exited with code[%d] for task: %s", exitCode, task.getId());
                         // Process exited unsuccessfully
-                        status = TaskStatus.failure(task.getId());
+                        status = TaskStatus.failure(
+                            task.getId(),
+                            "Task execution process exited unsuccessfully. See middleManager logs for more details."

Review comment:
       do you want to add the exit code in the message here? 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] jihoonson commented on a change in pull request #11446: Add errorMessage in taskStatus for task failures in middleManagers/indexers/peons

Posted by GitBox <gi...@apache.org>.
jihoonson commented on a change in pull request #11446:
URL: https://github.com/apache/druid/pull/11446#discussion_r672676958



##########
File path: indexing-service/src/main/java/org/apache/druid/indexing/overlord/ForkingTaskRunner.java
##########
@@ -364,38 +362,19 @@ public TaskStatus call()
                       );
 
                       LOGGER.info("Logging task %s output to: %s", task.getId(), logFile);
-                      boolean runFailed = true;
-
-                      final ByteSink logSink = Files.asByteSink(logFile, FileWriteMode.APPEND);
-
-                      // This will block for a while. So we append the thread information with more details
-                      final String priorThreadName = Thread.currentThread().getName();
-                      Thread.currentThread().setName(StringUtils.format("%s-[%s]", priorThreadName, task.getId()));
-
-                      try (final OutputStream toLogfile = logSink.openStream()) {
-                        ByteStreams.copy(processHolder.process.getInputStream(), toLogfile);
-                        final int statusCode = processHolder.process.waitFor();
-                        LOGGER.info("Process exited with status[%d] for task: %s", statusCode, task.getId());
-                        if (statusCode == 0) {
-                          runFailed = false;
-                        }
-                      }
-                      finally {
-                        Thread.currentThread().setName(priorThreadName);
-                        // Upload task logs
-                        taskLogPusher.pushTaskLog(task.getId(), logFile);
-                        if (reportsFile.exists()) {
-                          taskLogPusher.pushTaskReports(task.getId(), reportsFile);
-                        }
-                      }
-
-                      TaskStatus status;
-                      if (!runFailed) {
+                      final int exitCode = waitForTaskProcessToComplete(task, processHolder, logFile, reportsFile);
+                      final TaskStatus status;
+                      if (exitCode == 0) {
+                        LOGGER.info("Process exited successfully for task: %s", task.getId());
                         // Process exited successfully
                         status = jsonMapper.readValue(statusFile, TaskStatus.class);
                       } else {
+                        LOGGER.error("Process exited with code[%d] for task: %s", exitCode, task.getId());
                         // Process exited unsuccessfully
-                        status = TaskStatus.failure(task.getId());
+                        status = TaskStatus.failure(
+                            task.getId(),
+                            "Task execution process exited unsuccessfully. See middleManager logs for more details."

Review comment:
       It seems reasonable to include it. Unlike other exceptions thrown in Druid, middleManager logs will probably not provide much information about the process failure. :+1: 

##########
File path: indexing-service/src/test/java/org/apache/druid/indexing/overlord/ForkingTaskRunnerTest.java
##########
@@ -145,12 +162,192 @@ public void testMaskedIterator()
             "-Dsome.somepassword = secret=value",
             "-Dsome.some=notasecret",
             "-Dsome.otherSecret= =asfdhkj352872598====fasdlkjfa="
-            ),
-        "java -cp /path/to/somewhere:some-jars.jar /some===file /asecretFileNa=<masked> -Dsome.property=random -Dsome.otherproperty = random=random " +
-            "-Dsome.somesecret =<masked> -Dsome.somesecret=<masked> -Dsome.somepassword =<masked> -Dsome.some=notasecret -Dsome.otherSecret=<masked>"
+        ),
+        "java -cp /path/to/somewhere:some-jars.jar /some===file /asecretFileNa=<masked> -Dsome.property=random -Dsome.otherproperty = random=random "
+        + "-Dsome.somesecret =<masked> -Dsome.somesecret=<masked> -Dsome.somepassword =<masked> -Dsome.some=notasecret -Dsome.otherSecret=<masked>"
     );
     StartupLoggingConfig startupLoggingConfig = new StartupLoggingConfig();
-    ForkingTaskRunner forkingTaskRunner = new ForkingTaskRunner(new ForkingTaskRunnerConfig(), null, new WorkerConfig(), null, null, null, null, startupLoggingConfig);
-    Assert.assertEquals(originalAndExpectedCommand.rhs, forkingTaskRunner.getMaskedCommand(startupLoggingConfig.getMaskProperties(), originalAndExpectedCommand.lhs));
+    ForkingTaskRunner forkingTaskRunner = new ForkingTaskRunner(
+        new ForkingTaskRunnerConfig(),
+        null,
+        new WorkerConfig(),
+        null,
+        null,
+        null,
+        null,
+        startupLoggingConfig
+    );
+    Assert.assertEquals(
+        originalAndExpectedCommand.rhs,
+        forkingTaskRunner.getMaskedCommand(
+            startupLoggingConfig.getMaskProperties(),
+            originalAndExpectedCommand.lhs
+        )
+    );
+  }
+
+  @Test
+  public void testTaskStatusWhenTaskProcessFails() throws ExecutionException, InterruptedException
+  {
+    ForkingTaskRunner forkingTaskRunner = new ForkingTaskRunner(
+        new ForkingTaskRunnerConfig(),
+        new TaskConfig(
+            null,
+            null,
+            null,
+            null,
+            ImmutableList.of(),
+            false,
+            new Period("PT0S"),
+            new Period("PT10S"),
+            ImmutableList.of(),
+            false,
+            false
+        ),
+        new WorkerConfig(),
+        new Properties(),
+        new NoopTaskLogs(),
+        new DefaultObjectMapper(),
+        new DruidNode("middleManager", "host", false, 8091, null, true, false),
+        new StartupLoggingConfig()
+    )
+    {
+      @Override
+      ProcessHolder runTaskProcess(List<String> command, File logFile, TaskLocation taskLocation)
+      {
+        ProcessHolder processHolder = Mockito.mock(ProcessHolder.class);
+        Mockito.doNothing().when(processHolder).registerWithCloser(ArgumentMatchers.any());
+        Mockito.doNothing().when(processHolder).shutdown();
+        return processHolder;
+      }
+
+      @Override
+      int waitForTaskProcessToComplete(Task task, ProcessHolder processHolder, File logFile, File reportsFile)
+      {
+        // Emulate task process failure
+        return 1;
+      }
+    };
+
+    final TaskStatus status = forkingTaskRunner.run(NoopTask.create()).get();
+    Assert.assertEquals(TaskState.FAILED, status.getStatusCode());
+    Assert.assertEquals(
+        "Task execution process exited unsuccessfully. See middleManager logs for more details.",

Review comment:
       Oops thanks :+1: 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org