You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by ji...@apache.org on 2019/12/13 01:14:20 UTC

[incubator-druid] branch 0.17.0-incubating updated: HRTR: make pending task execution handling to go through all tasks on not finding worker slots (#8697) (#9022)

This is an automated email from the ASF dual-hosted git repository.

jihoonson pushed a commit to branch 0.17.0-incubating
in repository https://gitbox.apache.org/repos/asf/incubator-druid.git


The following commit(s) were added to refs/heads/0.17.0-incubating by this push:
     new b8a7b99  HRTR: make pending task execution handling to go through all tasks on not finding worker slots (#8697) (#9022)
b8a7b99 is described below

commit b8a7b99573d462c8b2f887d3ee934e01a3bacc69
Author: Himanshu <g....@gmail.com>
AuthorDate: Thu Dec 12 17:14:07 2019 -0800

    HRTR: make pending task execution handling to go through all tasks on not finding worker slots (#8697) (#9022)
    
    * HRTR: make pending task execution handling to go through all tasks on
    not finding worker slots
    
    * make HRTR methods package private that are meant to be used only in HttpRemoteTaskRunnerResource
    
    * mark HttpRemoteTaskRunnerWorkItem.State global variables final
    
    * hrtr: move immutableWorker NULL check outside of try-catch or finally block could have NPE
    
    * add some explanatory comments
    
    * add comment on explaining mechanics around hand off of pending tasks from submission to it getting picked up by a task execution thread
    
    * fix spelling
---
 .../druid/java/util/emitter/EmittingLogger.java    |   4 +-
 .../indexing/overlord/TaskRunnerWorkItem.java      |   1 +
 .../config/HttpRemoteTaskRunnerConfig.java         |   8 -
 .../overlord/hrtr/HttpRemoteTaskRunner.java        | 371 +++++++++++++++------
 .../hrtr/HttpRemoteTaskRunnerResource.java         |  72 +++-
 .../druid/indexing/overlord/hrtr/WorkerHolder.java |  16 +
 .../overlord/hrtr/HttpRemoteTaskRunnerTest.java    |   6 +
 7 files changed, 362 insertions(+), 116 deletions(-)

diff --git a/core/src/main/java/org/apache/druid/java/util/emitter/EmittingLogger.java b/core/src/main/java/org/apache/druid/java/util/emitter/EmittingLogger.java
index 0f6bb8b..7ad5d10 100644
--- a/core/src/main/java/org/apache/druid/java/util/emitter/EmittingLogger.java
+++ b/core/src/main/java/org/apache/druid/java/util/emitter/EmittingLogger.java
@@ -83,11 +83,13 @@ public class EmittingLogger extends Logger
           StringUtils.nonStrictFormat(message, objects)
       );
 
-      error(errorMessage);
       ISE e = new ISE(errorMessage);
       if (t != null) {
         e.addSuppressed(t);
       }
+
+      error(e, errorMessage);
+
       throw e;
     }
 
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskRunnerWorkItem.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskRunnerWorkItem.java
index e10f250..f9aa665 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskRunnerWorkItem.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskRunnerWorkItem.java
@@ -86,6 +86,7 @@ public abstract class TaskRunnerWorkItem
     return queueInsertionTime;
   }
 
+  @JsonProperty
   public abstract TaskLocation getLocation();
 
   /**
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/config/HttpRemoteTaskRunnerConfig.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/config/HttpRemoteTaskRunnerConfig.java
index b0d91bc..bc0ba7f 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/config/HttpRemoteTaskRunnerConfig.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/config/HttpRemoteTaskRunnerConfig.java
@@ -31,9 +31,6 @@ public class HttpRemoteTaskRunnerConfig extends RemoteTaskRunnerConfig
   private int workerSyncNumThreads = 5;
 
   @JsonProperty
-  private Period waitForWorkerSlot = new Period("PT1M");
-
-  @JsonProperty
   private int shutdownRequestMaxRetries = 3;
 
   @JsonProperty
@@ -56,11 +53,6 @@ public class HttpRemoteTaskRunnerConfig extends RemoteTaskRunnerConfig
     return workerSyncNumThreads;
   }
 
-  public Period getWaitForWorkerSlot()
-  {
-    return waitForWorkerSlot;
-  }
-
   public int getShutdownRequestMaxRetries()
   {
     return shutdownRequestMaxRetries;
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java
index 3020d3c..e5549f6 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java
@@ -19,6 +19,7 @@
 
 package org.apache.druid.indexing.overlord.hrtr;
 
+import com.fasterxml.jackson.annotation.JsonProperty;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.base.Joiner;
 import com.google.common.base.Optional;
@@ -36,6 +37,7 @@ import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.ListenableScheduledFuture;
 import com.google.common.util.concurrent.ListeningScheduledExecutorService;
 import com.google.common.util.concurrent.MoreExecutors;
+import com.google.errorprone.annotations.concurrent.GuardedBy;
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.druid.concurrent.LifecycleLock;
 import org.apache.druid.discovery.DiscoveryDruidNode;
@@ -106,7 +108,7 @@ import java.util.stream.Collectors;
 /**
  * A Remote TaskRunner to manage tasks on Middle Manager nodes using internal-discovery({@link DruidNodeDiscoveryProvider})
  * to discover them and Http.
- * Middle Managers manages list of assigned/completed tasks on disk and exposes 3 HTTP endpoints
+ * Middle Managers manages list of assigned/completed tasks on disk and expose 3 HTTP endpoints
  * 1. POST request for assigning a task
  * 2. POST request for shutting down a task
  * 3. GET request for getting list of assigned, running, completed tasks on Middle Manager and its enable/disable status.
@@ -126,10 +128,18 @@ public class HttpRemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer
   // Executor for assigning pending tasks to workers.
   private final ExecutorService pendingTasksExec;
 
-  // All known tasks
+  // All known tasks, TaskID -> HttpRemoteTaskRunnerWorkItem
+  // This is a ConcurrentMap as some of the reads are done without holding the lock.
+  @GuardedBy("statusLock")
   private final ConcurrentMap<String, HttpRemoteTaskRunnerWorkItem> tasks = new ConcurrentHashMap<>();
 
-  // All discovered workers.
+  // This is the list of pending tasks in the order they arrived, exclusively manipulated/used by thread that
+  // gives a new task to this class and threads in pendingTasksExec that are responsible for assigning tasks to
+  // workers.
+  @GuardedBy("statusLock")
+  private final List<String> pendingTaskIds = new ArrayList<>();
+
+  // All discovered workers, "host:port" -> WorkerHolder
   private final ConcurrentMap<String, WorkerHolder> workers = new ConcurrentHashMap<>();
 
   // Executor for syncing state of each worker.
@@ -143,15 +153,15 @@ public class HttpRemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer
 
   // workers which were assigned a task and are yet to acknowledge same.
   // Map: workerId -> taskId
+  // all writes are guarded
+  @GuardedBy("statusLock")
   private final ConcurrentMap<String, String> workersWithUnacknowledgedTask = new ConcurrentHashMap<>();
 
   // Executor to complete cleanup of workers which have disappeared.
   private final ListeningScheduledExecutorService cleanupExec;
   private final ConcurrentMap<String, ScheduledFuture> removedWorkerCleanups = new ConcurrentHashMap<>();
 
-  // Guards the pending/running/complete lists of tasks and list of workers
-  // statusLock.notifyAll() is called whenever there is a possibility of worker slot to run task becoming available.
-  // statusLock.notifyAll() is called whenever a task status or location changes.
+
   private final Object statusLock = new Object();
 
   // task runner listeners
@@ -243,6 +253,9 @@ public class HttpRemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer
       provisioningService = provisioningStrategy.makeProvisioningService(this);
 
       scheduleSyncMonitoring();
+
+      startPendingTaskHandling();
+
       lifecycleLock.started();
 
       log.info("Started.");
@@ -313,6 +326,31 @@ public class HttpRemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer
     );
   }
 
+  /**
+   * Must not be used outside of this class and {@link HttpRemoteTaskRunnerResource}
+   */
+  @SuppressWarnings("GuardedBy") // Read on workersWithUnacknowledgedTask is safe
+  Map<String, ImmutableWorkerInfo> getWorkersEligibleToRunTasks()
+  {
+    // In this class, this method is called with statusLock held.
+    // writes to workersWithUnacknowledgedTask are always guarded by statusLock.
+    // however writes to lazyWorker/blacklistedWorkers aren't necessarily guarded by same lock, so technically there
+    // could be races in that a task could get assigned to a worker which in another thread is concurrently being
+    // marked lazy/blacklisted , but that is ok because that is equivalent to this worker being picked for task and
+    // being assigned lazy/blacklisted right after even when the two threads hold a mutually exclusive lock.
+    return Maps.transformEntries(
+        Maps.filterEntries(
+            workers,
+            input -> !lazyWorkers.containsKey(input.getKey()) &&
+                     !workersWithUnacknowledgedTask.containsKey(input.getKey()) &&
+                     !blackListedWorkers.containsKey(input.getKey()) &&
+                     input.getValue().isInitialized() &&
+                     input.getValue().isEnabled()
+        ),
+        (String key, WorkerHolder value) -> value.toImmutable()
+    );
+  }
+
   private ImmutableWorkerInfo findWorkerToRunTask(Task task)
   {
     WorkerBehaviorConfig workerConfig = workerConfigRef.get();
@@ -326,17 +364,7 @@ public class HttpRemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer
 
     return strategy.findWorkerForTask(
         config,
-        ImmutableMap.copyOf(
-            Maps.transformEntries(
-                Maps.filterEntries(
-                    workers,
-                    input -> !lazyWorkers.containsKey(input.getKey()) &&
-                             !workersWithUnacknowledgedTask.containsKey(input.getKey()) &&
-                             !blackListedWorkers.containsKey(input.getKey())
-                ),
-                (String key, WorkerHolder value) -> value.toImmutable()
-            )
-        ),
+        ImmutableMap.copyOf(getWorkersEligibleToRunTasks()),
         task
     );
   }
@@ -344,7 +372,7 @@ public class HttpRemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer
   private boolean runTaskOnWorker(
       final HttpRemoteTaskRunnerWorkItem workItem,
       final String workerHost
-  ) throws Exception
+  ) throws InterruptedException
   {
     String taskId = workItem.getTaskId();
     WorkerHolder workerHolder = workers.get(workerHost);
@@ -363,8 +391,7 @@ public class HttpRemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer
       long waitStart = System.currentTimeMillis();
       boolean isTaskAssignmentTimedOut = false;
       synchronized (statusLock) {
-        while (tasks.containsKey(taskId)
-               && tasks.get(taskId).getState() == HttpRemoteTaskRunnerWorkItem.State.PENDING) {
+        while (tasks.containsKey(taskId) && tasks.get(taskId).getState().isPending()) {
           long remaining = waitMs - (System.currentTimeMillis() - waitStart);
           if (remaining > 0) {
             statusLock.wait(remaining);
@@ -382,6 +409,7 @@ public class HttpRemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer
             taskId,
             config.getTaskAssignmentTimeout()
         ).emit();
+        // taskComplete(..) must be called outside of statusLock, see comments on method.
         taskComplete(workItem, workerHolder, TaskStatus.failure(taskId));
       }
 
@@ -611,6 +639,7 @@ public class HttpRemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer
                     taskItem.getTaskId(),
                     config.getTaskCleanupTimeout()
                 );
+                // taskComplete(..) must be called outside of statusLock, see comments on method.
                 taskComplete(taskItem, null, TaskStatus.failure(taskItem.getTaskId()));
               }
             }
@@ -689,7 +718,7 @@ public class HttpRemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer
    * This method returns the debugging information exposed by {@link HttpRemoteTaskRunnerResource} and meant
    * for that use only. It must not be used for any other purpose.
    */
-  public Map<String, Object> getDebugInfo()
+  Map<String, Object> getWorkerSyncerDebugInfo()
   {
     Preconditions.checkArgument(lifecycleLock.awaitStarted(1, TimeUnit.MILLISECONDS));
 
@@ -849,7 +878,7 @@ public class HttpRemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer
     synchronized (statusLock) {
       return tasks.values()
                   .stream()
-                  .filter(item -> item.getState() == HttpRemoteTaskRunnerWorkItem.State.PENDING)
+                  .filter(item -> item.getState().isPending())
                   .map(HttpRemoteTaskRunnerWorkItem::getTask)
                   .collect(Collectors.toList());
     }
@@ -858,6 +887,7 @@ public class HttpRemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer
   @Override
   public Optional<ByteSource> streamTaskLog(String taskId, long offset)
   {
+    @SuppressWarnings("GuardedBy") // Read on tasks is safe
     HttpRemoteTaskRunnerWorkItem taskRunnerWorkItem = tasks.get(taskId);
     Worker worker = null;
     if (taskRunnerWorkItem != null && taskRunnerWorkItem.getState() != HttpRemoteTaskRunnerWorkItem.State.COMPLETE) {
@@ -904,6 +934,7 @@ public class HttpRemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer
   @Override
   public Optional<ByteSource> streamTaskReports(String taskId)
   {
+    @SuppressWarnings("GuardedBy") // Read on tasks is safe
     HttpRemoteTaskRunnerWorkItem taskRunnerWorkItem = tasks.get(taskId);
     Worker worker = null;
     if (taskRunnerWorkItem != null && taskRunnerWorkItem.getState() != HttpRemoteTaskRunnerWorkItem.State.COMPLETE) {
@@ -1019,92 +1050,175 @@ public class HttpRemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer
             HttpRemoteTaskRunnerWorkItem.State.PENDING
         );
         tasks.put(task.getId(), taskRunnerWorkItem);
-        addPendingTaskToExecutor(task.getId());
+        pendingTaskIds.add(task.getId());
+
+        statusLock.notifyAll();
+
         return taskRunnerWorkItem.getResult();
       }
     }
   }
 
-  private void addPendingTaskToExecutor(final String taskId)
+  private void startPendingTaskHandling()
   {
-    pendingTasksExec.execute(
-        () -> {
-          while (!Thread.interrupted() && lifecycleLock.awaitStarted(1, TimeUnit.MILLISECONDS)) {
-            ImmutableWorkerInfo immutableWorker;
-            HttpRemoteTaskRunnerWorkItem taskItem = null;
+    for (int i = 0; i < config.getPendingTasksRunnerNumThreads(); i++) {
+      pendingTasksExec.submit(
+          () -> {
             try {
-              synchronized (statusLock) {
-                taskItem = tasks.get(taskId);
+              if (!lifecycleLock.awaitStarted()) {
+                log.makeAlert("Lifecycle not started, PendingTaskExecution loop will not run.").emit();
+                return;
+              }
 
-                if (taskItem == null) {
-                  log.info(
-                      "Task[%s] work item not found. Probably user asked to shutdown before. Not assigning.",
-                      taskId
-                  );
-                  return;
-                }
+              pendingTasksExecutionLoop();
+            }
+            catch (Throwable t) {
+              log.makeAlert(t, "Error while waiting for lifecycle start. PendingTaskExecution loop will not run")
+                 .emit();
+            }
+            finally {
+              log.info("PendingTaskExecution loop exited.");
+            }
+          }
+      );
+    }
+  }
 
-                if (taskItem.getState() != HttpRemoteTaskRunnerWorkItem.State.PENDING) {
-                  log.info(
-                      "Task[%s] is in state[%s]. Probably some worker already reported it. Not assigning.",
-                      taskId,
-                      taskItem.getState()
-                  );
-                  return;
-                }
+  private void pendingTasksExecutionLoop()
+  {
+    while (!Thread.interrupted() && lifecycleLock.awaitStarted(1, TimeUnit.MILLISECONDS)) {
+      try {
+        // Find one pending task to run and a worker to run on
+        HttpRemoteTaskRunnerWorkItem taskItem = null;
+        ImmutableWorkerInfo immutableWorker = null;
 
-                if (taskItem.getTask() == null) {
-                  throw new ISE("WTF! couldn't find Task instance for taskId[%s].", taskId);
-                }
-                immutableWorker = findWorkerToRunTask(taskItem.getTask());
-
-                if (immutableWorker == null) {
-                  // no free worker, wait for some worker to become free
-                  statusLock.wait(config.getWaitForWorkerSlot().toStandardDuration().getMillis());
-                  continue;
-                } else if (workersWithUnacknowledgedTask.putIfAbsent(
-                    immutableWorker.getWorker().getHost(),
-                    taskId
-                ) != null) {
-                  // there was a race and someone else took this worker slot, try again
-                  continue;
-                }
-              }
+        synchronized (statusLock) {
+          Iterator<String> iter = pendingTaskIds.iterator();
+          while (iter.hasNext()) {
+            String taskId = iter.next();
+            HttpRemoteTaskRunnerWorkItem ti = tasks.get(taskId);
+
+            if (ti == null || !ti.getState().isPending()) {
+              // happens if the task was shutdown, failed or observed running by a worker
+              iter.remove();
+              continue;
+            }
 
-              try {
-                // this will send HTTP request to worker for assigning task and hence kept
-                // outside the synchronized block.
-                if (runTaskOnWorker(taskItem, immutableWorker.getWorker().getHost())) {
-                  return;
-                }
-              }
-              finally {
-                workersWithUnacknowledgedTask.remove(immutableWorker.getWorker().getHost());
-                synchronized (statusLock) {
-                  statusLock.notifyAll();
-                }
-              }
+            if (ti.getState() == HttpRemoteTaskRunnerWorkItem.State.PENDING_WORKER_ASSIGN) {
+              // picked up by another pending task executor thread which is in the process of trying to
+              // run it on a worker, skip to next.
+              continue;
             }
-            catch (InterruptedException ex) {
-              log.info("Got InterruptedException while assigning task[%s].", taskId);
-              Thread.currentThread().interrupt();
 
-              return;
+            if (ti.getTask() == null) {
+              // this is not supposed to happen except for a bug, we want to mark this task failed but
+              // taskComplete(..) can not be called while holding statusLock. See the javadoc on that
+              // method.
+              // so this will get marked failed afterwards outside of current synchronized block.
+              taskItem = ti;
+              break;
             }
-            catch (Throwable th) {
-              log.makeAlert(th, "Exception while trying to assign task")
-                 .addData("taskId", taskId)
-                 .emit();
 
-              if (taskItem != null) {
-                taskComplete(taskItem, null, TaskStatus.failure(taskId));
-              }
+            immutableWorker = findWorkerToRunTask(ti.getTask());
+            if (immutableWorker == null) {
+              continue;
+            }
+
+            String prevUnackedTaskId = workersWithUnacknowledgedTask.putIfAbsent(
+                immutableWorker.getWorker().getHost(),
+                taskId
+            );
+            if (prevUnackedTaskId != null) {
+              log.makeAlert(
+                  "Found worker[%s] with unacked task[%s] but still was identified to run task[%s].",
+                  immutableWorker.getWorker().getHost(),
+                  prevUnackedTaskId,
+                  taskId
+              ).emit();
+            }
+
+            // set state to PENDING_WORKER_ASSIGN before releasing the lock so that this task item is not picked
+            // up by another task execution thread.
+            // note that we can't simply delete this task item from pendingTaskIds or else we would have to add it
+            // back if this thread couldn't run this task for any reason, which we will know at some later time
+            // and also we will need to add it back to its old position in the list. that becomes complex quickly.
+            // Instead we keep the PENDING_WORKER_ASSIGN to notify other task execution threads not to pick this one up.
+            // And, it is automatically removed by any of the task execution threads when they notice that
+            // ti.getState().isPending() is false (at the beginning of this loop)
+            ti.setState(HttpRemoteTaskRunnerWorkItem.State.PENDING_WORKER_ASSIGN);
+            taskItem = ti;
+            break;
+          }
+
+          if (taskItem == null) {
+            // Either no pending task is found or no suitable worker is found for any of the pending tasks.
+            // statusLock.notifyAll() is called whenever a new task shows up or if there is a possibility for a task
+            // to successfully get worker to run, for example when a new worker shows up, a task slot opens up
+            // because some task completed etc.
+            statusLock.wait(TimeUnit.MINUTES.toMillis(1));
+            continue;
+          }
+        }
+
+        String taskId = taskItem.getTaskId();
+
+        if (taskItem.getTask() == null) {
+          log.makeAlert("No Task obj found in TaskItem for taskID[%s]. Failed.", taskId).emit();
+          // taskComplete(..) must be called outside of statusLock, see comments on method.
+          taskComplete(taskItem, null, TaskStatus.failure(taskId));
+          continue;
+        }
+
+        if (immutableWorker == null) {
+          throw new ISE("WTH! NULL immutableWorker");
+        }
 
-              return;
+        try {
+          // this will send HTTP request to worker for assigning task
+          if (!runTaskOnWorker(taskItem, immutableWorker.getWorker().getHost())) {
+            if (taskItem.getState() == HttpRemoteTaskRunnerWorkItem.State.PENDING_WORKER_ASSIGN) {
+              taskItem.revertStateFromPendingWorkerAssignToPending();
             }
           }
         }
-    );
+        catch (InterruptedException ex) {
+          log.info("Got InterruptedException while assigning task[%s].", taskId);
+          throw ex;
+        }
+        catch (Throwable th) {
+          log.makeAlert(th, "Exception while trying to assign task")
+             .addData("taskId", taskId)
+             .emit();
+
+          // taskComplete(..) must be called outside of statusLock, see comments on method.
+          taskComplete(taskItem, null, TaskStatus.failure(taskId));
+        }
+        finally {
+          synchronized (statusLock) {
+            workersWithUnacknowledgedTask.remove(immutableWorker.getWorker().getHost());
+            statusLock.notifyAll();
+          }
+        }
+
+      }
+      catch (InterruptedException ex) {
+        log.info("Interrupted, will Exit.");
+        Thread.currentThread().interrupt();
+      }
+      catch (Throwable th) {
+        log.makeAlert(th, "Unknown Exception while trying to assign tasks.").emit();
+      }
+    }
+  }
+
+  /**
+   * Must not be used outside of this class and {@link HttpRemoteTaskRunnerResource}
+   */
+  List<String> getPendingTasksList()
+  {
+    synchronized (statusLock) {
+      return ImmutableList.copyOf(pendingTaskIds);
+    }
   }
 
   @Override
@@ -1161,6 +1275,7 @@ public class HttpRemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer
   }
 
   @Override
+  @SuppressWarnings("GuardedBy") // Read on tasks is safe
   public Collection<? extends TaskRunnerWorkItem> getRunningTasks()
   {
     return tasks.values()
@@ -1170,11 +1285,12 @@ public class HttpRemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer
   }
 
   @Override
+  @SuppressWarnings("GuardedBy") // Read on tasks is safe
   public Collection<? extends TaskRunnerWorkItem> getPendingTasks()
   {
     return tasks.values()
                 .stream()
-                .filter(item -> item.getState() == HttpRemoteTaskRunnerWorkItem.State.PENDING)
+                .filter(item -> item.getState().isPending())
                 .collect(Collectors.toList());
   }
 
@@ -1186,6 +1302,7 @@ public class HttpRemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer
     }
   }
 
+  @SuppressWarnings("GuardedBy") // Read on tasks is safe
   public Collection<? extends TaskRunnerWorkItem> getCompletedTasks()
   {
     return tasks.values()
@@ -1196,26 +1313,19 @@ public class HttpRemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer
 
   @Nullable
   @Override
+  @SuppressWarnings("GuardedBy") // Read on tasks is safe
   public RunnerTaskState getRunnerTaskState(String taskId)
   {
     final HttpRemoteTaskRunnerWorkItem workItem = tasks.get(taskId);
     if (workItem == null) {
       return null;
     } else {
-      switch (workItem.state) {
-        case PENDING:
-          return RunnerTaskState.PENDING;
-        case RUNNING:
-          return RunnerTaskState.RUNNING;
-        case COMPLETE:
-          return RunnerTaskState.NONE;
-        default:
-          throw new ISE("Unknown state[%s]", workItem.state);
-      }
+      return workItem.getState().toRunnerTaskState();
     }
   }
 
   @Override
+  @SuppressWarnings("GuardedBy") // Read on tasks is safe
   public TaskLocation getTaskLocation(String taskId)
   {
     final HttpRemoteTaskRunnerWorkItem workItem = tasks.get(taskId);
@@ -1233,6 +1343,15 @@ public class HttpRemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer
     ).collect(Collectors.toList());
   }
 
+  /**
+   * Must not be used outside of this class and {@link HttpRemoteTaskRunnerResource} , used for read only.
+   */
+  @SuppressWarnings("GuardedBy")
+  Map<String, String> getWorkersWithUnacknowledgedTasks()
+  {
+    return workersWithUnacknowledgedTask;
+  }
+
   @Override
   public Optional<ScalingStats> getScalingStats()
   {
@@ -1313,6 +1432,7 @@ public class HttpRemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer
           case RUNNING:
             switch (taskItem.getState()) {
               case PENDING:
+              case PENDING_WORKER_ASSIGN:
                 taskItem.setWorker(worker);
                 taskItem.setState(HttpRemoteTaskRunnerWorkItem.State.RUNNING);
                 log.info("Task[%s] started RUNNING on worker[%s].", taskId, worker.getHost());
@@ -1361,6 +1481,7 @@ public class HttpRemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer
           case SUCCESS:
             switch (taskItem.getState()) {
               case PENDING:
+              case PENDING_WORKER_ASSIGN:
                 taskItem.setWorker(worker);
                 taskItem.setState(HttpRemoteTaskRunnerWorkItem.State.RUNNING);
                 log.info("Task[%s] finished on worker[%s].", taskId, worker.getHost());
@@ -1413,6 +1534,7 @@ public class HttpRemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer
     }
 
     if (isTaskCompleted) {
+      // taskComplete(..) must be called outside of statusLock, see comments on method.
       taskComplete(taskItem, workerHolder, announcement.getTaskStatus());
     }
 
@@ -1430,15 +1552,35 @@ public class HttpRemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer
   {
     enum State
     {
-      PENDING(0),
-      RUNNING(1),
-      COMPLETE(2);
+      // Task has been given to HRTR, but a worker to run this task hasn't been identified yet.
+      PENDING(0, true, RunnerTaskState.PENDING),
+
+      // A Worker has been identified to run this task, but request to run task hasn't been made to worker yet
+      // or worker hasn't acknowledged the task yet.
+      PENDING_WORKER_ASSIGN(1, true, RunnerTaskState.PENDING),
 
-      int index;
+      RUNNING(2, false, RunnerTaskState.RUNNING),
+      COMPLETE(3, false, RunnerTaskState.NONE);
 
-      State(int index)
+      private final int index;
+      private final boolean isPending;
+      private final RunnerTaskState runnerTaskState;
+
+      State(int index, boolean isPending, RunnerTaskState runnerTaskState)
       {
         this.index = index;
+        this.isPending = isPending;
+        this.runnerTaskState = runnerTaskState;
+      }
+
+      boolean isPending()
+      {
+        return isPending;
+      }
+
+      RunnerTaskState toRunnerTaskState()
+      {
+        return runnerTaskState;
       }
     }
 
@@ -1478,6 +1620,7 @@ public class HttpRemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer
       }
     }
 
+    @JsonProperty
     public State getState()
     {
       return state;
@@ -1499,7 +1642,25 @@ public class HttpRemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer
           state
       );
 
+      setStateUnconditionally(state);
+    }
+
+    public void revertStateFromPendingWorkerAssignToPending()
+    {
+      Preconditions.checkState(
+          this.state == State.PENDING_WORKER_ASSIGN,
+          "Can't move state from [%s] to [%s]",
+          this.state,
+          State.PENDING
+      );
+
+      setStateUnconditionally(State.PENDING);
+    }
+
+    private void setStateUnconditionally(State state)
+    {
       if (log.isDebugEnabled()) {
+        // Exception is logged to know what led to this call.
         log.debug(
             new RuntimeException("Stacktrace..."),
             "Setting task[%s] work item state from [%s] to [%s].",
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunnerResource.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunnerResource.java
index 0266ba4..fc2c5ce 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunnerResource.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunnerResource.java
@@ -34,6 +34,8 @@ import javax.ws.rs.core.Response;
 
 /**
  * Collection of http endpoits to introspect state of HttpRemoteTaskRunner instance for debugging.
+ * Also, generic TaskRunner state can be introspected by the endpoints in
+ * {@link org.apache.druid.indexing.overlord.http.OverlordResource}
  */
 @Path("/druid-internal/v1/httpRemoteTaskRunner")
 @ResourceFilters(StateResourceFilter.class)
@@ -48,15 +50,42 @@ public class HttpRemoteTaskRunnerResource
   }
 
   @GET
+  @Path("/knownTasks")
   @Produces(MediaType.APPLICATION_JSON)
-  public Response getDebugInfo()
+  public Response getAllKnownTasks()
   {
     HttpRemoteTaskRunner httpRemoteTaskRunner = getHttpRemoteTaskRunner();
     if (httpRemoteTaskRunner == null) {
       return Response.status(Response.Status.FORBIDDEN).entity("HttpRemoteTaskRunner is NULL.").build();
     }
 
-    return Response.ok().entity(httpRemoteTaskRunner.getDebugInfo()).build();
+    return Response.ok().entity(httpRemoteTaskRunner.getKnownTasks()).build();
+  }
+
+  @GET
+  @Path("/pendingTasksQueue")
+  @Produces(MediaType.APPLICATION_JSON)
+  public Response getPendingTasksQueue()
+  {
+    HttpRemoteTaskRunner httpRemoteTaskRunner = getHttpRemoteTaskRunner();
+    if (httpRemoteTaskRunner == null) {
+      return Response.status(Response.Status.FORBIDDEN).entity("HttpRemoteTaskRunner is NULL.").build();
+    }
+
+    return Response.ok().entity(httpRemoteTaskRunner.getPendingTasksList()).build();
+  }
+
+  @GET
+  @Path("/workerSyncerDebugInfo")
+  @Produces(MediaType.APPLICATION_JSON)
+  public Response getWorkerSyncerDebugInfo()
+  {
+    HttpRemoteTaskRunner httpRemoteTaskRunner = getHttpRemoteTaskRunner();
+    if (httpRemoteTaskRunner == null) {
+      return Response.status(Response.Status.FORBIDDEN).entity("HttpRemoteTaskRunner is NULL.").build();
+    }
+
+    return Response.ok().entity(httpRemoteTaskRunner.getWorkerSyncerDebugInfo()).build();
   }
 
   @GET
@@ -72,6 +101,45 @@ public class HttpRemoteTaskRunnerResource
     return Response.ok().entity(httpRemoteTaskRunner.getBlacklistedWorkers()).build();
   }
 
+  @GET
+  @Path("/lazyWorkers")
+  @Produces(MediaType.APPLICATION_JSON)
+  public Response getLazyWorkers()
+  {
+    HttpRemoteTaskRunner httpRemoteTaskRunner = getHttpRemoteTaskRunner();
+    if (httpRemoteTaskRunner == null) {
+      return Response.status(Response.Status.FORBIDDEN).entity("HttpRemoteTaskRunner is NULL.").build();
+    }
+
+    return Response.ok().entity(httpRemoteTaskRunner.getLazyWorkers()).build();
+  }
+
+  @GET
+  @Path("/workersWithUnacknowledgedTasks")
+  @Produces(MediaType.APPLICATION_JSON)
+  public Response getWorkersWithUnacknowledgedTasks()
+  {
+    HttpRemoteTaskRunner httpRemoteTaskRunner = getHttpRemoteTaskRunner();
+    if (httpRemoteTaskRunner == null) {
+      return Response.status(Response.Status.FORBIDDEN).entity("HttpRemoteTaskRunner is NULL.").build();
+    }
+
+    return Response.ok().entity(httpRemoteTaskRunner.getWorkersWithUnacknowledgedTasks()).build();
+  }
+
+  @GET
+  @Path("/workersEilgibleToRunTasks")
+  @Produces(MediaType.APPLICATION_JSON)
+  public Response getWorkersEilgibleToRunTasks()
+  {
+    HttpRemoteTaskRunner httpRemoteTaskRunner = getHttpRemoteTaskRunner();
+    if (httpRemoteTaskRunner == null) {
+      return Response.status(Response.Status.FORBIDDEN).entity("HttpRemoteTaskRunner is NULL.").build();
+    }
+
+    return Response.ok().entity(httpRemoteTaskRunner.getWorkersEligibleToRunTasks()).build();
+  }
+
   private HttpRemoteTaskRunner getHttpRemoteTaskRunner()
   {
     Optional<TaskRunner> taskRunnerOpt = taskMaster.getTaskRunner();
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/WorkerHolder.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/WorkerHolder.java
index cfb6180..630b0f0 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/WorkerHolder.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/WorkerHolder.java
@@ -332,6 +332,22 @@ public class WorkerHolder
     }
   }
 
+  public boolean isInitialized()
+  {
+    try {
+      return syncer.awaitInitialization(1, TimeUnit.MILLISECONDS);
+    }
+    catch (InterruptedException ignored) {
+      Thread.currentThread().interrupt();
+      return false;
+    }
+  }
+
+  public boolean isEnabled()
+  {
+    return !disabled.get();
+  }
+
   public ChangeRequestHttpSyncer<WorkerHistoryItem> getUnderlyingSyncer()
   {
     return syncer;
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunnerTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunnerTest.java
index 745f3e5..fe1df77 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunnerTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunnerTest.java
@@ -1350,6 +1350,12 @@ public class HttpRemoteTaskRunnerTest
       }
 
       @Override
+      public boolean isInitialized()
+      {
+        return true;
+      }
+
+      @Override
       public void waitForInitialization()
       {
       }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org