You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by as...@apache.org on 2019/04/14 15:37:22 UTC
[incubator-druid] branch master updated: run pending tasks when
assigned a task that is already pending (#6991)
This is an automated email from the ASF dual-hosted git repository.
asdf2014 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-druid.git
The following commit(s) were added to refs/heads/master by this push:
new 7a38d28 run pending tasks when assigned a task that is already pending (#6991)
7a38d28 is described below
commit 7a38d28cf37c46662023960b587a43618aa85158
Author: Mingming Qiu <cs...@gmail.com>
AuthorDate: Sun Apr 14 23:37:15 2019 +0800
run pending tasks when assigned a task that is already pending (#6991)
* run pending tasks when assigned a task that is already pending
* add unit test
* fix pending tasks aren't going to run
* address comments
---
.../druid/indexing/overlord/RemoteTaskRunner.java | 6 ++++--
.../org/apache/druid/indexing/overlord/TaskQueue.java | 12 ++++++++++++
.../druid/indexing/overlord/RemoteTaskRunnerTest.java | 19 +++++++++++++++++++
3 files changed, 35 insertions(+), 2 deletions(-)
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/RemoteTaskRunner.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/RemoteTaskRunner.java
index 4dd15e0..fbe64f2 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/RemoteTaskRunner.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/RemoteTaskRunner.java
@@ -514,7 +514,8 @@ public class RemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer
{
final RemoteTaskRunnerWorkItem completeTask, runningTask, pendingTask;
if ((pendingTask = pendingTasks.get(task.getId())) != null) {
- log.info("Assigned a task[%s] that is already pending, not doing anything", task.getId());
+ log.info("Assigned a task[%s] that is already pending!", task.getId());
+ runPendingTasks();
return pendingTask.getResult();
} else if ((runningTask = runningTasks.get(task.getId())) != null) {
ZkWorker zkWorker = findWorkerRunningTask(task.getId());
@@ -633,7 +634,8 @@ public class RemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer
/**
* Adds a task to the pending queue
*/
- private RemoteTaskRunnerWorkItem addPendingTask(final Task task)
+ @VisibleForTesting
+ RemoteTaskRunnerWorkItem addPendingTask(final Task task)
{
log.info("Added pending task %s", task.getId());
final RemoteTaskRunnerWorkItem taskRunnerWorkItem = new RemoteTaskRunnerWorkItem(
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java
index b583d07..327dd4d 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java
@@ -271,6 +271,11 @@ public class TaskQueue
}
}
taskFutures.put(task.getId(), attachCallbacks(task, runnerTaskFuture));
+ } else if (isTaskPending(task)) {
+ // if the taskFutures contain this task and this task is pending, also let the taskRunner
+ // to run it to guarantee it will be assigned to run
+ // see https://github.com/apache/incubator-druid/pull/6991
+ taskRunner.run(task);
}
}
// Kill tasks that shouldn't be running
@@ -315,6 +320,13 @@ public class TaskQueue
}
}
+ private boolean isTaskPending(Task task)
+ {
+ return taskRunner.getPendingTasks()
+ .stream()
+ .anyMatch(workItem -> workItem.getTaskId().equals(task.getId()));
+ }
+
/**
* Adds some work to the queue and the underlying task storage facility with a generic "running" status.
*
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/RemoteTaskRunnerTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/RemoteTaskRunnerTest.java
index 49a0a88..15d87d4 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/RemoteTaskRunnerTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/RemoteTaskRunnerTest.java
@@ -114,6 +114,25 @@ public class RemoteTaskRunnerTest
}
@Test
+ public void testRunTaskThatAlreadyPending() throws Exception
+ {
+ doSetup();
+ remoteTaskRunner.addPendingTask(task);
+ Assert.assertFalse(workerRunningTask(task.getId()));
+
+ ListenableFuture<TaskStatus> result = remoteTaskRunner.run(task);
+
+ Assert.assertTrue(taskAnnounced(task.getId()));
+ mockWorkerRunningTask(task);
+ Assert.assertTrue(workerRunningTask(task.getId()));
+ mockWorkerCompleteSuccessfulTask(task);
+ Assert.assertTrue(workerCompletedTask(result));
+
+ Assert.assertEquals(task.getId(), result.get().getId());
+ Assert.assertEquals(TaskState.SUCCESS, result.get().getStatusCode());
+ }
+
+ @Test
public void testStartWithNoWorker()
{
makeRemoteTaskRunner(new TestRemoteTaskRunnerConfig(new Period("PT1S")));
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org