You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by ji...@apache.org on 2019/12/13 01:14:20 UTC
[incubator-druid] branch 0.17.0-incubating updated: HRTR: make
pending task execution handling to go through all tasks on not finding
worker slots (#8697) (#9022)
This is an automated email from the ASF dual-hosted git repository.
jihoonson pushed a commit to branch 0.17.0-incubating
in repository https://gitbox.apache.org/repos/asf/incubator-druid.git
The following commit(s) were added to refs/heads/0.17.0-incubating by this push:
new b8a7b99 HRTR: make pending task execution handling to go through all tasks on not finding worker slots (#8697) (#9022)
b8a7b99 is described below
commit b8a7b99573d462c8b2f887d3ee934e01a3bacc69
Author: Himanshu <g....@gmail.com>
AuthorDate: Thu Dec 12 17:14:07 2019 -0800
HRTR: make pending task execution handling to go through all tasks on not finding worker slots (#8697) (#9022)
* HRTR: make pending task execution handling to go through all tasks on
not finding worker slots
* make HRTR methods package private that are meant to be used only in HttpRemoteTaskRunnerResource
* mark HttpRemoteTaskRunnerWorkItem.State global variables final
* hrtr: move immutableWorker NULL check outside of try-catch or finally block could have NPE
* add some explanatory comments
* add comment on explaining mechanics around hand off of pending tasks from submission to it getting picked up by a task execution thread
* fix spelling
---
.../druid/java/util/emitter/EmittingLogger.java | 4 +-
.../indexing/overlord/TaskRunnerWorkItem.java | 1 +
.../config/HttpRemoteTaskRunnerConfig.java | 8 -
.../overlord/hrtr/HttpRemoteTaskRunner.java | 371 +++++++++++++++------
.../hrtr/HttpRemoteTaskRunnerResource.java | 72 +++-
.../druid/indexing/overlord/hrtr/WorkerHolder.java | 16 +
.../overlord/hrtr/HttpRemoteTaskRunnerTest.java | 6 +
7 files changed, 362 insertions(+), 116 deletions(-)
diff --git a/core/src/main/java/org/apache/druid/java/util/emitter/EmittingLogger.java b/core/src/main/java/org/apache/druid/java/util/emitter/EmittingLogger.java
index 0f6bb8b..7ad5d10 100644
--- a/core/src/main/java/org/apache/druid/java/util/emitter/EmittingLogger.java
+++ b/core/src/main/java/org/apache/druid/java/util/emitter/EmittingLogger.java
@@ -83,11 +83,13 @@ public class EmittingLogger extends Logger
StringUtils.nonStrictFormat(message, objects)
);
- error(errorMessage);
ISE e = new ISE(errorMessage);
if (t != null) {
e.addSuppressed(t);
}
+
+ error(e, errorMessage);
+
throw e;
}
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskRunnerWorkItem.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskRunnerWorkItem.java
index e10f250..f9aa665 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskRunnerWorkItem.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskRunnerWorkItem.java
@@ -86,6 +86,7 @@ public abstract class TaskRunnerWorkItem
return queueInsertionTime;
}
+ @JsonProperty
public abstract TaskLocation getLocation();
/**
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/config/HttpRemoteTaskRunnerConfig.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/config/HttpRemoteTaskRunnerConfig.java
index b0d91bc..bc0ba7f 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/config/HttpRemoteTaskRunnerConfig.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/config/HttpRemoteTaskRunnerConfig.java
@@ -31,9 +31,6 @@ public class HttpRemoteTaskRunnerConfig extends RemoteTaskRunnerConfig
private int workerSyncNumThreads = 5;
@JsonProperty
- private Period waitForWorkerSlot = new Period("PT1M");
-
- @JsonProperty
private int shutdownRequestMaxRetries = 3;
@JsonProperty
@@ -56,11 +53,6 @@ public class HttpRemoteTaskRunnerConfig extends RemoteTaskRunnerConfig
return workerSyncNumThreads;
}
- public Period getWaitForWorkerSlot()
- {
- return waitForWorkerSlot;
- }
-
public int getShutdownRequestMaxRetries()
{
return shutdownRequestMaxRetries;
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java
index 3020d3c..e5549f6 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java
@@ -19,6 +19,7 @@
package org.apache.druid.indexing.overlord.hrtr;
+import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Joiner;
import com.google.common.base.Optional;
@@ -36,6 +37,7 @@ import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListenableScheduledFuture;
import com.google.common.util.concurrent.ListeningScheduledExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
+import com.google.errorprone.annotations.concurrent.GuardedBy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.druid.concurrent.LifecycleLock;
import org.apache.druid.discovery.DiscoveryDruidNode;
@@ -106,7 +108,7 @@ import java.util.stream.Collectors;
/**
* A Remote TaskRunner to manage tasks on Middle Manager nodes using internal-discovery({@link DruidNodeDiscoveryProvider})
* to discover them and Http.
- * Middle Managers manages list of assigned/completed tasks on disk and exposes 3 HTTP endpoints
+ * Middle Managers manages list of assigned/completed tasks on disk and expose 3 HTTP endpoints
* 1. POST request for assigning a task
* 2. POST request for shutting down a task
* 3. GET request for getting list of assigned, running, completed tasks on Middle Manager and its enable/disable status.
@@ -126,10 +128,18 @@ public class HttpRemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer
// Executor for assigning pending tasks to workers.
private final ExecutorService pendingTasksExec;
- // All known tasks
+ // All known tasks, TaskID -> HttpRemoteTaskRunnerWorkItem
+ // This is a ConcurrentMap as some of the reads are done without holding the lock.
+ @GuardedBy("statusLock")
private final ConcurrentMap<String, HttpRemoteTaskRunnerWorkItem> tasks = new ConcurrentHashMap<>();
- // All discovered workers.
+ // This is the list of pending tasks in the order they arrived, exclusively manipulated/used by thread that
+ // gives a new task to this class and threads in pendingTasksExec that are responsible for assigning tasks to
+ // workers.
+ @GuardedBy("statusLock")
+ private final List<String> pendingTaskIds = new ArrayList<>();
+
+ // All discovered workers, "host:port" -> WorkerHolder
private final ConcurrentMap<String, WorkerHolder> workers = new ConcurrentHashMap<>();
// Executor for syncing state of each worker.
@@ -143,15 +153,15 @@ public class HttpRemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer
// workers which were assigned a task and are yet to acknowledge same.
// Map: workerId -> taskId
+ // all writes are guarded
+ @GuardedBy("statusLock")
private final ConcurrentMap<String, String> workersWithUnacknowledgedTask = new ConcurrentHashMap<>();
// Executor to complete cleanup of workers which have disappeared.
private final ListeningScheduledExecutorService cleanupExec;
private final ConcurrentMap<String, ScheduledFuture> removedWorkerCleanups = new ConcurrentHashMap<>();
- // Guards the pending/running/complete lists of tasks and list of workers
- // statusLock.notifyAll() is called whenever there is a possibility of worker slot to run task becoming available.
- // statusLock.notifyAll() is called whenever a task status or location changes.
+
private final Object statusLock = new Object();
// task runner listeners
@@ -243,6 +253,9 @@ public class HttpRemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer
provisioningService = provisioningStrategy.makeProvisioningService(this);
scheduleSyncMonitoring();
+
+ startPendingTaskHandling();
+
lifecycleLock.started();
log.info("Started.");
@@ -313,6 +326,31 @@ public class HttpRemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer
);
}
+ /**
+ * Must not be used outside of this class and {@link HttpRemoteTaskRunnerResource}
+ */
+ @SuppressWarnings("GuardedBy") // Read on workersWithUnacknowledgedTask is safe
+ Map<String, ImmutableWorkerInfo> getWorkersEligibleToRunTasks()
+ {
+ // In this class, this method is called with statusLock held.
+ // writes to workersWithUnacknowledgedTask are always guarded by statusLock.
+ // however writes to lazyWorker/blacklistedWorkers aren't necessarily guarded by same lock, so technically there
+ // could be races in that a task could get assigned to a worker which in another thread is concurrently being
+ // marked lazy/blacklisted , but that is ok because that is equivalent to this worker being picked for task and
+ // being assigned lazy/blacklisted right after even when the two threads hold a mutually exclusive lock.
+ return Maps.transformEntries(
+ Maps.filterEntries(
+ workers,
+ input -> !lazyWorkers.containsKey(input.getKey()) &&
+ !workersWithUnacknowledgedTask.containsKey(input.getKey()) &&
+ !blackListedWorkers.containsKey(input.getKey()) &&
+ input.getValue().isInitialized() &&
+ input.getValue().isEnabled()
+ ),
+ (String key, WorkerHolder value) -> value.toImmutable()
+ );
+ }
+
private ImmutableWorkerInfo findWorkerToRunTask(Task task)
{
WorkerBehaviorConfig workerConfig = workerConfigRef.get();
@@ -326,17 +364,7 @@ public class HttpRemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer
return strategy.findWorkerForTask(
config,
- ImmutableMap.copyOf(
- Maps.transformEntries(
- Maps.filterEntries(
- workers,
- input -> !lazyWorkers.containsKey(input.getKey()) &&
- !workersWithUnacknowledgedTask.containsKey(input.getKey()) &&
- !blackListedWorkers.containsKey(input.getKey())
- ),
- (String key, WorkerHolder value) -> value.toImmutable()
- )
- ),
+ ImmutableMap.copyOf(getWorkersEligibleToRunTasks()),
task
);
}
@@ -344,7 +372,7 @@ public class HttpRemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer
private boolean runTaskOnWorker(
final HttpRemoteTaskRunnerWorkItem workItem,
final String workerHost
- ) throws Exception
+ ) throws InterruptedException
{
String taskId = workItem.getTaskId();
WorkerHolder workerHolder = workers.get(workerHost);
@@ -363,8 +391,7 @@ public class HttpRemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer
long waitStart = System.currentTimeMillis();
boolean isTaskAssignmentTimedOut = false;
synchronized (statusLock) {
- while (tasks.containsKey(taskId)
- && tasks.get(taskId).getState() == HttpRemoteTaskRunnerWorkItem.State.PENDING) {
+ while (tasks.containsKey(taskId) && tasks.get(taskId).getState().isPending()) {
long remaining = waitMs - (System.currentTimeMillis() - waitStart);
if (remaining > 0) {
statusLock.wait(remaining);
@@ -382,6 +409,7 @@ public class HttpRemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer
taskId,
config.getTaskAssignmentTimeout()
).emit();
+ // taskComplete(..) must be called outside of statusLock, see comments on method.
taskComplete(workItem, workerHolder, TaskStatus.failure(taskId));
}
@@ -611,6 +639,7 @@ public class HttpRemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer
taskItem.getTaskId(),
config.getTaskCleanupTimeout()
);
+ // taskComplete(..) must be called outside of statusLock, see comments on method.
taskComplete(taskItem, null, TaskStatus.failure(taskItem.getTaskId()));
}
}
@@ -689,7 +718,7 @@ public class HttpRemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer
* This method returns the debugging information exposed by {@link HttpRemoteTaskRunnerResource} and meant
* for that use only. It must not be used for any other purpose.
*/
- public Map<String, Object> getDebugInfo()
+ Map<String, Object> getWorkerSyncerDebugInfo()
{
Preconditions.checkArgument(lifecycleLock.awaitStarted(1, TimeUnit.MILLISECONDS));
@@ -849,7 +878,7 @@ public class HttpRemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer
synchronized (statusLock) {
return tasks.values()
.stream()
- .filter(item -> item.getState() == HttpRemoteTaskRunnerWorkItem.State.PENDING)
+ .filter(item -> item.getState().isPending())
.map(HttpRemoteTaskRunnerWorkItem::getTask)
.collect(Collectors.toList());
}
@@ -858,6 +887,7 @@ public class HttpRemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer
@Override
public Optional<ByteSource> streamTaskLog(String taskId, long offset)
{
+ @SuppressWarnings("GuardedBy") // Read on tasks is safe
HttpRemoteTaskRunnerWorkItem taskRunnerWorkItem = tasks.get(taskId);
Worker worker = null;
if (taskRunnerWorkItem != null && taskRunnerWorkItem.getState() != HttpRemoteTaskRunnerWorkItem.State.COMPLETE) {
@@ -904,6 +934,7 @@ public class HttpRemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer
@Override
public Optional<ByteSource> streamTaskReports(String taskId)
{
+ @SuppressWarnings("GuardedBy") // Read on tasks is safe
HttpRemoteTaskRunnerWorkItem taskRunnerWorkItem = tasks.get(taskId);
Worker worker = null;
if (taskRunnerWorkItem != null && taskRunnerWorkItem.getState() != HttpRemoteTaskRunnerWorkItem.State.COMPLETE) {
@@ -1019,92 +1050,175 @@ public class HttpRemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer
HttpRemoteTaskRunnerWorkItem.State.PENDING
);
tasks.put(task.getId(), taskRunnerWorkItem);
- addPendingTaskToExecutor(task.getId());
+ pendingTaskIds.add(task.getId());
+
+ statusLock.notifyAll();
+
return taskRunnerWorkItem.getResult();
}
}
}
- private void addPendingTaskToExecutor(final String taskId)
+ private void startPendingTaskHandling()
{
- pendingTasksExec.execute(
- () -> {
- while (!Thread.interrupted() && lifecycleLock.awaitStarted(1, TimeUnit.MILLISECONDS)) {
- ImmutableWorkerInfo immutableWorker;
- HttpRemoteTaskRunnerWorkItem taskItem = null;
+ for (int i = 0; i < config.getPendingTasksRunnerNumThreads(); i++) {
+ pendingTasksExec.submit(
+ () -> {
try {
- synchronized (statusLock) {
- taskItem = tasks.get(taskId);
+ if (!lifecycleLock.awaitStarted()) {
+ log.makeAlert("Lifecycle not started, PendingTaskExecution loop will not run.").emit();
+ return;
+ }
- if (taskItem == null) {
- log.info(
- "Task[%s] work item not found. Probably user asked to shutdown before. Not assigning.",
- taskId
- );
- return;
- }
+ pendingTasksExecutionLoop();
+ }
+ catch (Throwable t) {
+ log.makeAlert(t, "Error while waiting for lifecycle start. PendingTaskExecution loop will not run")
+ .emit();
+ }
+ finally {
+ log.info("PendingTaskExecution loop exited.");
+ }
+ }
+ );
+ }
+ }
- if (taskItem.getState() != HttpRemoteTaskRunnerWorkItem.State.PENDING) {
- log.info(
- "Task[%s] is in state[%s]. Probably some worker already reported it. Not assigning.",
- taskId,
- taskItem.getState()
- );
- return;
- }
+ private void pendingTasksExecutionLoop()
+ {
+ while (!Thread.interrupted() && lifecycleLock.awaitStarted(1, TimeUnit.MILLISECONDS)) {
+ try {
+ // Find one pending task to run and a worker to run on
+ HttpRemoteTaskRunnerWorkItem taskItem = null;
+ ImmutableWorkerInfo immutableWorker = null;
- if (taskItem.getTask() == null) {
- throw new ISE("WTF! couldn't find Task instance for taskId[%s].", taskId);
- }
- immutableWorker = findWorkerToRunTask(taskItem.getTask());
-
- if (immutableWorker == null) {
- // no free worker, wait for some worker to become free
- statusLock.wait(config.getWaitForWorkerSlot().toStandardDuration().getMillis());
- continue;
- } else if (workersWithUnacknowledgedTask.putIfAbsent(
- immutableWorker.getWorker().getHost(),
- taskId
- ) != null) {
- // there was a race and someone else took this worker slot, try again
- continue;
- }
- }
+ synchronized (statusLock) {
+ Iterator<String> iter = pendingTaskIds.iterator();
+ while (iter.hasNext()) {
+ String taskId = iter.next();
+ HttpRemoteTaskRunnerWorkItem ti = tasks.get(taskId);
+
+ if (ti == null || !ti.getState().isPending()) {
+ // happens if the task was shutdown, failed or observed running by a worker
+ iter.remove();
+ continue;
+ }
- try {
- // this will send HTTP request to worker for assigning task and hence kept
- // outside the synchronized block.
- if (runTaskOnWorker(taskItem, immutableWorker.getWorker().getHost())) {
- return;
- }
- }
- finally {
- workersWithUnacknowledgedTask.remove(immutableWorker.getWorker().getHost());
- synchronized (statusLock) {
- statusLock.notifyAll();
- }
- }
+ if (ti.getState() == HttpRemoteTaskRunnerWorkItem.State.PENDING_WORKER_ASSIGN) {
+ // picked up by another pending task executor thread which is in the process of trying to
+ // run it on a worker, skip to next.
+ continue;
}
- catch (InterruptedException ex) {
- log.info("Got InterruptedException while assigning task[%s].", taskId);
- Thread.currentThread().interrupt();
- return;
+ if (ti.getTask() == null) {
+ // this is not supposed to happen except for a bug, we want to mark this task failed but
+ // taskComplete(..) can not be called while holding statusLock. See the javadoc on that
+ // method.
+ // so this will get marked failed afterwards outside of current synchronized block.
+ taskItem = ti;
+ break;
}
- catch (Throwable th) {
- log.makeAlert(th, "Exception while trying to assign task")
- .addData("taskId", taskId)
- .emit();
- if (taskItem != null) {
- taskComplete(taskItem, null, TaskStatus.failure(taskId));
- }
+ immutableWorker = findWorkerToRunTask(ti.getTask());
+ if (immutableWorker == null) {
+ continue;
+ }
+
+ String prevUnackedTaskId = workersWithUnacknowledgedTask.putIfAbsent(
+ immutableWorker.getWorker().getHost(),
+ taskId
+ );
+ if (prevUnackedTaskId != null) {
+ log.makeAlert(
+ "Found worker[%s] with unacked task[%s] but still was identified to run task[%s].",
+ immutableWorker.getWorker().getHost(),
+ prevUnackedTaskId,
+ taskId
+ ).emit();
+ }
+
+ // set state to PENDING_WORKER_ASSIGN before releasing the lock so that this task item is not picked
+ // up by another task execution thread.
+ // note that we can't simply delete this task item from pendingTaskIds or else we would have to add it
+ // back if this thread couldn't run this task for any reason, which we will know at some later time
+ // and also we will need to add it back to its old position in the list. that becomes complex quickly.
+ // Instead we keep the PENDING_WORKER_ASSIGN to notify other task execution threads not to pick this one up.
+ // And, it is automatically removed by any of the task execution threads when they notice that
+ // ti.getState().isPending() is false (at the beginning of this loop)
+ ti.setState(HttpRemoteTaskRunnerWorkItem.State.PENDING_WORKER_ASSIGN);
+ taskItem = ti;
+ break;
+ }
+
+ if (taskItem == null) {
+ // Either no pending task is found or no suitable worker is found for any of the pending tasks.
+ // statusLock.notifyAll() is called whenever a new task shows up or if there is a possibility for a task
+ // to successfully get worker to run, for example when a new worker shows up, a task slot opens up
+ // because some task completed etc.
+ statusLock.wait(TimeUnit.MINUTES.toMillis(1));
+ continue;
+ }
+ }
+
+ String taskId = taskItem.getTaskId();
+
+ if (taskItem.getTask() == null) {
+ log.makeAlert("No Task obj found in TaskItem for taskID[%s]. Failed.", taskId).emit();
+ // taskComplete(..) must be called outside of statusLock, see comments on method.
+ taskComplete(taskItem, null, TaskStatus.failure(taskId));
+ continue;
+ }
+
+ if (immutableWorker == null) {
+ throw new ISE("WTH! NULL immutableWorker");
+ }
- return;
+ try {
+ // this will send HTTP request to worker for assigning task
+ if (!runTaskOnWorker(taskItem, immutableWorker.getWorker().getHost())) {
+ if (taskItem.getState() == HttpRemoteTaskRunnerWorkItem.State.PENDING_WORKER_ASSIGN) {
+ taskItem.revertStateFromPendingWorkerAssignToPending();
}
}
}
- );
+ catch (InterruptedException ex) {
+ log.info("Got InterruptedException while assigning task[%s].", taskId);
+ throw ex;
+ }
+ catch (Throwable th) {
+ log.makeAlert(th, "Exception while trying to assign task")
+ .addData("taskId", taskId)
+ .emit();
+
+ // taskComplete(..) must be called outside of statusLock, see comments on method.
+ taskComplete(taskItem, null, TaskStatus.failure(taskId));
+ }
+ finally {
+ synchronized (statusLock) {
+ workersWithUnacknowledgedTask.remove(immutableWorker.getWorker().getHost());
+ statusLock.notifyAll();
+ }
+ }
+
+ }
+ catch (InterruptedException ex) {
+ log.info("Interrupted, will Exit.");
+ Thread.currentThread().interrupt();
+ }
+ catch (Throwable th) {
+ log.makeAlert(th, "Unknown Exception while trying to assign tasks.").emit();
+ }
+ }
+ }
+
+ /**
+ * Must not be used outside of this class and {@link HttpRemoteTaskRunnerResource}
+ */
+ List<String> getPendingTasksList()
+ {
+ synchronized (statusLock) {
+ return ImmutableList.copyOf(pendingTaskIds);
+ }
}
@Override
@@ -1161,6 +1275,7 @@ public class HttpRemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer
}
@Override
+ @SuppressWarnings("GuardedBy") // Read on tasks is safe
public Collection<? extends TaskRunnerWorkItem> getRunningTasks()
{
return tasks.values()
@@ -1170,11 +1285,12 @@ public class HttpRemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer
}
@Override
+ @SuppressWarnings("GuardedBy") // Read on tasks is safe
public Collection<? extends TaskRunnerWorkItem> getPendingTasks()
{
return tasks.values()
.stream()
- .filter(item -> item.getState() == HttpRemoteTaskRunnerWorkItem.State.PENDING)
+ .filter(item -> item.getState().isPending())
.collect(Collectors.toList());
}
@@ -1186,6 +1302,7 @@ public class HttpRemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer
}
}
+ @SuppressWarnings("GuardedBy") // Read on tasks is safe
public Collection<? extends TaskRunnerWorkItem> getCompletedTasks()
{
return tasks.values()
@@ -1196,26 +1313,19 @@ public class HttpRemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer
@Nullable
@Override
+ @SuppressWarnings("GuardedBy") // Read on tasks is safe
public RunnerTaskState getRunnerTaskState(String taskId)
{
final HttpRemoteTaskRunnerWorkItem workItem = tasks.get(taskId);
if (workItem == null) {
return null;
} else {
- switch (workItem.state) {
- case PENDING:
- return RunnerTaskState.PENDING;
- case RUNNING:
- return RunnerTaskState.RUNNING;
- case COMPLETE:
- return RunnerTaskState.NONE;
- default:
- throw new ISE("Unknown state[%s]", workItem.state);
- }
+ return workItem.getState().toRunnerTaskState();
}
}
@Override
+ @SuppressWarnings("GuardedBy") // Read on tasks is safe
public TaskLocation getTaskLocation(String taskId)
{
final HttpRemoteTaskRunnerWorkItem workItem = tasks.get(taskId);
@@ -1233,6 +1343,15 @@ public class HttpRemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer
).collect(Collectors.toList());
}
+ /**
+ * Must not be used outside of this class and {@link HttpRemoteTaskRunnerResource} , used for read only.
+ */
+ @SuppressWarnings("GuardedBy")
+ Map<String, String> getWorkersWithUnacknowledgedTasks()
+ {
+ return workersWithUnacknowledgedTask;
+ }
+
@Override
public Optional<ScalingStats> getScalingStats()
{
@@ -1313,6 +1432,7 @@ public class HttpRemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer
case RUNNING:
switch (taskItem.getState()) {
case PENDING:
+ case PENDING_WORKER_ASSIGN:
taskItem.setWorker(worker);
taskItem.setState(HttpRemoteTaskRunnerWorkItem.State.RUNNING);
log.info("Task[%s] started RUNNING on worker[%s].", taskId, worker.getHost());
@@ -1361,6 +1481,7 @@ public class HttpRemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer
case SUCCESS:
switch (taskItem.getState()) {
case PENDING:
+ case PENDING_WORKER_ASSIGN:
taskItem.setWorker(worker);
taskItem.setState(HttpRemoteTaskRunnerWorkItem.State.RUNNING);
log.info("Task[%s] finished on worker[%s].", taskId, worker.getHost());
@@ -1413,6 +1534,7 @@ public class HttpRemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer
}
if (isTaskCompleted) {
+ // taskComplete(..) must be called outside of statusLock, see comments on method.
taskComplete(taskItem, workerHolder, announcement.getTaskStatus());
}
@@ -1430,15 +1552,35 @@ public class HttpRemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer
{
enum State
{
- PENDING(0),
- RUNNING(1),
- COMPLETE(2);
+ // Task has been given to HRTR, but a worker to run this task hasn't been identified yet.
+ PENDING(0, true, RunnerTaskState.PENDING),
+
+ // A Worker has been identified to run this task, but request to run task hasn't been made to worker yet
+ // or worker hasn't acknowledged the task yet.
+ PENDING_WORKER_ASSIGN(1, true, RunnerTaskState.PENDING),
- int index;
+ RUNNING(2, false, RunnerTaskState.RUNNING),
+ COMPLETE(3, false, RunnerTaskState.NONE);
- State(int index)
+ private final int index;
+ private final boolean isPending;
+ private final RunnerTaskState runnerTaskState;
+
+ State(int index, boolean isPending, RunnerTaskState runnerTaskState)
{
this.index = index;
+ this.isPending = isPending;
+ this.runnerTaskState = runnerTaskState;
+ }
+
+ boolean isPending()
+ {
+ return isPending;
+ }
+
+ RunnerTaskState toRunnerTaskState()
+ {
+ return runnerTaskState;
}
}
@@ -1478,6 +1620,7 @@ public class HttpRemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer
}
}
+ @JsonProperty
public State getState()
{
return state;
@@ -1499,7 +1642,25 @@ public class HttpRemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer
state
);
+ setStateUnconditionally(state);
+ }
+
+ public void revertStateFromPendingWorkerAssignToPending()
+ {
+ Preconditions.checkState(
+ this.state == State.PENDING_WORKER_ASSIGN,
+ "Can't move state from [%s] to [%s]",
+ this.state,
+ State.PENDING
+ );
+
+ setStateUnconditionally(State.PENDING);
+ }
+
+ private void setStateUnconditionally(State state)
+ {
if (log.isDebugEnabled()) {
+ // Exception is logged to know what led to this call.
log.debug(
new RuntimeException("Stacktrace..."),
"Setting task[%s] work item state from [%s] to [%s].",
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunnerResource.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunnerResource.java
index 0266ba4..fc2c5ce 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunnerResource.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunnerResource.java
@@ -34,6 +34,8 @@ import javax.ws.rs.core.Response;
/**
* Collection of http endpoits to introspect state of HttpRemoteTaskRunner instance for debugging.
+ * Also, generic TaskRunner state can be introspected by the endpoints in
+ * {@link org.apache.druid.indexing.overlord.http.OverlordResource}
*/
@Path("/druid-internal/v1/httpRemoteTaskRunner")
@ResourceFilters(StateResourceFilter.class)
@@ -48,15 +50,42 @@ public class HttpRemoteTaskRunnerResource
}
@GET
+ @Path("/knownTasks")
@Produces(MediaType.APPLICATION_JSON)
- public Response getDebugInfo()
+ public Response getAllKnownTasks()
{
HttpRemoteTaskRunner httpRemoteTaskRunner = getHttpRemoteTaskRunner();
if (httpRemoteTaskRunner == null) {
return Response.status(Response.Status.FORBIDDEN).entity("HttpRemoteTaskRunner is NULL.").build();
}
- return Response.ok().entity(httpRemoteTaskRunner.getDebugInfo()).build();
+ return Response.ok().entity(httpRemoteTaskRunner.getKnownTasks()).build();
+ }
+
+ @GET
+ @Path("/pendingTasksQueue")
+ @Produces(MediaType.APPLICATION_JSON)
+ public Response getPendingTasksQueue()
+ {
+ HttpRemoteTaskRunner httpRemoteTaskRunner = getHttpRemoteTaskRunner();
+ if (httpRemoteTaskRunner == null) {
+ return Response.status(Response.Status.FORBIDDEN).entity("HttpRemoteTaskRunner is NULL.").build();
+ }
+
+ return Response.ok().entity(httpRemoteTaskRunner.getPendingTasksList()).build();
+ }
+
+ @GET
+ @Path("/workerSyncerDebugInfo")
+ @Produces(MediaType.APPLICATION_JSON)
+ public Response getWorkerSyncerDebugInfo()
+ {
+ HttpRemoteTaskRunner httpRemoteTaskRunner = getHttpRemoteTaskRunner();
+ if (httpRemoteTaskRunner == null) {
+ return Response.status(Response.Status.FORBIDDEN).entity("HttpRemoteTaskRunner is NULL.").build();
+ }
+
+ return Response.ok().entity(httpRemoteTaskRunner.getWorkerSyncerDebugInfo()).build();
}
@GET
@@ -72,6 +101,45 @@ public class HttpRemoteTaskRunnerResource
return Response.ok().entity(httpRemoteTaskRunner.getBlacklistedWorkers()).build();
}
+ @GET
+ @Path("/lazyWorkers")
+ @Produces(MediaType.APPLICATION_JSON)
+ public Response getLazyWorkers()
+ {
+ HttpRemoteTaskRunner httpRemoteTaskRunner = getHttpRemoteTaskRunner();
+ if (httpRemoteTaskRunner == null) {
+ return Response.status(Response.Status.FORBIDDEN).entity("HttpRemoteTaskRunner is NULL.").build();
+ }
+
+ return Response.ok().entity(httpRemoteTaskRunner.getLazyWorkers()).build();
+ }
+
+ @GET
+ @Path("/workersWithUnacknowledgedTasks")
+ @Produces(MediaType.APPLICATION_JSON)
+ public Response getWorkersWithUnacknowledgedTasks()
+ {
+ HttpRemoteTaskRunner httpRemoteTaskRunner = getHttpRemoteTaskRunner();
+ if (httpRemoteTaskRunner == null) {
+ return Response.status(Response.Status.FORBIDDEN).entity("HttpRemoteTaskRunner is NULL.").build();
+ }
+
+ return Response.ok().entity(httpRemoteTaskRunner.getWorkersWithUnacknowledgedTasks()).build();
+ }
+
+ @GET
+ @Path("/workersEilgibleToRunTasks")
+ @Produces(MediaType.APPLICATION_JSON)
+ public Response getWorkersEilgibleToRunTasks()
+ {
+ HttpRemoteTaskRunner httpRemoteTaskRunner = getHttpRemoteTaskRunner();
+ if (httpRemoteTaskRunner == null) {
+ return Response.status(Response.Status.FORBIDDEN).entity("HttpRemoteTaskRunner is NULL.").build();
+ }
+
+ return Response.ok().entity(httpRemoteTaskRunner.getWorkersEligibleToRunTasks()).build();
+ }
+
private HttpRemoteTaskRunner getHttpRemoteTaskRunner()
{
Optional<TaskRunner> taskRunnerOpt = taskMaster.getTaskRunner();
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/WorkerHolder.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/WorkerHolder.java
index cfb6180..630b0f0 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/WorkerHolder.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/WorkerHolder.java
@@ -332,6 +332,22 @@ public class WorkerHolder
}
}
+ public boolean isInitialized()
+ {
+ try {
+ return syncer.awaitInitialization(1, TimeUnit.MILLISECONDS);
+ }
+ catch (InterruptedException ignored) {
+ Thread.currentThread().interrupt();
+ return false;
+ }
+ }
+
+ public boolean isEnabled()
+ {
+ return !disabled.get();
+ }
+
public ChangeRequestHttpSyncer<WorkerHistoryItem> getUnderlyingSyncer()
{
return syncer;
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunnerTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunnerTest.java
index 745f3e5..fe1df77 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunnerTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunnerTest.java
@@ -1350,6 +1350,12 @@ public class HttpRemoteTaskRunnerTest
}
@Override
+ public boolean isInitialized()
+ {
+ return true;
+ }
+
+ @Override
public void waitForInitialization()
{
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org