You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by as...@apache.org on 2019/04/14 15:37:22 UTC

[incubator-druid] branch master updated: run pending tasks when assigned a task that is already pending (#6991)

This is an automated email from the ASF dual-hosted git repository.

asdf2014 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 7a38d28  run pending tasks when assigned a task that is already pending (#6991)
7a38d28 is described below

commit 7a38d28cf37c46662023960b587a43618aa85158
Author: Mingming Qiu <cs...@gmail.com>
AuthorDate: Sun Apr 14 23:37:15 2019 +0800

    run pending tasks when assigned a task that is already pending (#6991)
    
    * run pending tasks when assigned a task that is already pending
    
    * add unit test
    
    * fix pending tasks aren't going to run
    
    * address comments
---
 .../druid/indexing/overlord/RemoteTaskRunner.java     |  6 ++++--
 .../org/apache/druid/indexing/overlord/TaskQueue.java | 12 ++++++++++++
 .../druid/indexing/overlord/RemoteTaskRunnerTest.java | 19 +++++++++++++++++++
 3 files changed, 35 insertions(+), 2 deletions(-)

diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/RemoteTaskRunner.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/RemoteTaskRunner.java
index 4dd15e0..fbe64f2 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/RemoteTaskRunner.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/RemoteTaskRunner.java
@@ -514,7 +514,8 @@ public class RemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer
   {
     final RemoteTaskRunnerWorkItem completeTask, runningTask, pendingTask;
     if ((pendingTask = pendingTasks.get(task.getId())) != null) {
-      log.info("Assigned a task[%s] that is already pending, not doing anything", task.getId());
+      log.info("Assigned a task[%s] that is already pending!", task.getId());
+      runPendingTasks();
       return pendingTask.getResult();
     } else if ((runningTask = runningTasks.get(task.getId())) != null) {
       ZkWorker zkWorker = findWorkerRunningTask(task.getId());
@@ -633,7 +634,8 @@ public class RemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer
   /**
    * Adds a task to the pending queue
    */
-  private RemoteTaskRunnerWorkItem addPendingTask(final Task task)
+  @VisibleForTesting
+  RemoteTaskRunnerWorkItem addPendingTask(final Task task)
   {
     log.info("Added pending task %s", task.getId());
     final RemoteTaskRunnerWorkItem taskRunnerWorkItem = new RemoteTaskRunnerWorkItem(
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java
index b583d07..327dd4d 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java
@@ -271,6 +271,11 @@ public class TaskQueue
               }
             }
             taskFutures.put(task.getId(), attachCallbacks(task, runnerTaskFuture));
+          } else if (isTaskPending(task)) {
+            // if the taskFutures contain this task and this task is pending, also let the taskRunner
+            // to run it to guarantee it will be assigned to run
+            // see https://github.com/apache/incubator-druid/pull/6991
+            taskRunner.run(task);
           }
         }
         // Kill tasks that shouldn't be running
@@ -315,6 +320,13 @@ public class TaskQueue
     }
   }
 
+  private boolean isTaskPending(Task task)
+  {
+    return taskRunner.getPendingTasks()
+                     .stream()
+                     .anyMatch(workItem -> workItem.getTaskId().equals(task.getId()));
+  }
+
   /**
    * Adds some work to the queue and the underlying task storage facility with a generic "running" status.
    *
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/RemoteTaskRunnerTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/RemoteTaskRunnerTest.java
index 49a0a88..15d87d4 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/RemoteTaskRunnerTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/RemoteTaskRunnerTest.java
@@ -114,6 +114,25 @@ public class RemoteTaskRunnerTest
   }
 
   @Test
+  public void testRunTaskThatAlreadyPending() throws Exception
+  {
+    doSetup();
+    remoteTaskRunner.addPendingTask(task);
+    Assert.assertFalse(workerRunningTask(task.getId()));
+
+    ListenableFuture<TaskStatus> result = remoteTaskRunner.run(task);
+
+    Assert.assertTrue(taskAnnounced(task.getId()));
+    mockWorkerRunningTask(task);
+    Assert.assertTrue(workerRunningTask(task.getId()));
+    mockWorkerCompleteSuccessfulTask(task);
+    Assert.assertTrue(workerCompletedTask(result));
+
+    Assert.assertEquals(task.getId(), result.get().getId());
+    Assert.assertEquals(TaskState.SUCCESS, result.get().getStatusCode());
+  }
+
+  @Test
   public void testStartWithNoWorker()
   {
     makeRemoteTaskRunner(new TestRemoteTaskRunnerConfig(new Period("PT1S")));


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org