You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by fj...@apache.org on 2019/02/20 05:07:28 UTC

[incubator-druid] branch 0.14.0-incubating updated: Missing Overlord and MiddleManager api docs (#7042) (#7106)

This is an automated email from the ASF dual-hosted git repository.

fjy pushed a commit to branch 0.14.0-incubating
in repository https://gitbox.apache.org/repos/asf/incubator-druid.git


The following commit(s) were added to refs/heads/0.14.0-incubating by this push:
     new 2f4acf5  Missing Overlord and MiddleManager api docs (#7042) (#7106)
2f4acf5 is described below

commit 2f4acf5c740785721039e28ee93cf6f030ec6a86
Author: Clint Wylie <cj...@gmail.com>
AuthorDate: Tue Feb 19 21:07:22 2019 -0800

    Missing Overlord and MiddleManager api docs (#7042) (#7106)
    
    * document middle manager api
    
    * re-arrange
    
    * correction
    
    * document more missing overlord api calls, minor re-arrange of some code i was referencing
    
    * fix it
    
    * this will fix it
    
    * fixup
    
    * link to other docs
---
 .../org/apache/druid/indexer/TaskStatusPlus.java   |   4 -
 docs/content/operations/api-reference.md           | 183 ++++++++--
 .../indexing/overlord/http/OverlordResource.java   | 400 ++++++++++-----------
 3 files changed, 351 insertions(+), 236 deletions(-)

diff --git a/core/src/main/java/org/apache/druid/indexer/TaskStatusPlus.java b/core/src/main/java/org/apache/druid/indexer/TaskStatusPlus.java
index 4912900..34733af 100644
--- a/core/src/main/java/org/apache/druid/indexer/TaskStatusPlus.java
+++ b/core/src/main/java/org/apache/druid/indexer/TaskStatusPlus.java
@@ -23,7 +23,6 @@ import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.google.common.base.Preconditions;
 import org.apache.druid.java.util.common.RE;
-import org.apache.druid.java.util.common.logger.Logger;
 import org.joda.time.DateTime;
 
 import javax.annotation.Nullable;
@@ -31,8 +30,6 @@ import java.util.Objects;
 
 public class TaskStatusPlus
 {
-  private static final Logger log = new Logger(TaskStatusPlus.class);
-
   private final String id;
   private final String type;
   private final DateTime createdTime;
@@ -74,7 +71,6 @@ public class TaskStatusPlus
     );
   }
 
-
   @JsonCreator
   public TaskStatusPlus(
       @JsonProperty("id") String id,
diff --git a/docs/content/operations/api-reference.md b/docs/content/operations/api-reference.md
index 23d1188..407b473 100644
--- a/docs/content/operations/api-reference.md
+++ b/docs/content/operations/api-reference.md
@@ -143,14 +143,17 @@ Returns full segment metadata for a specific segment as stored in the metadata s
 
 * `/druid/coordinator/v1/metadata/datasources/{dataSourceName}/segments`
 
-Returns a list of all segments, overlapping with any of given intervals,  for a datasource as stored in the metadata store. Request body is array of string intervals like [interval1, interval2,...] for example ["2012-01-01T00:00:00.000/2012-01-03T00:00:00.000", "2012-01-05T00:00:00.000/2012-01-07T00:00:00.000"]
+Returns a list of all segments, overlapping with any of given intervals,  for a datasource as stored in the metadata store. Request body is array of string IS0 8601 intervals like [interval1, interval2,...] for example ["2012-01-01T00:00:00.000/2012-01-03T00:00:00.000", "2012-01-05T00:00:00.000/2012-01-07T00:00:00.000"]
 
 * `/druid/coordinator/v1/metadata/datasources/{dataSourceName}/segments?full`
 
-Returns a list of all segments, overlapping with any of given intervals, for a datasource with the full segment metadata as stored in the metadata store. Request body is array of string intervals like [interval1, interval2,...] for example ["2012-01-01T00:00:00.000/2012-01-03T00:00:00.000", "2012-01-05T00:00:00.000/2012-01-07T00:00:00.000"]
+Returns a list of all segments, overlapping with any of given intervals, for a datasource with the full segment metadata as stored in the metadata store. Request body is array of string ISO 8601 intervals like [interval1, interval2,...] for example ["2012-01-01T00:00:00.000/2012-01-03T00:00:00.000", "2012-01-05T00:00:00.000/2012-01-07T00:00:00.000"]
 
 #### Datasources
 
+Note that all _interval_ URL parameters are ISO 8601 strings delimited by a `_` instead of a `/` 
+(e.g., 2016-06-27_2016-06-28).
+
 ##### GET
 
 * `/druid/coordinator/v1/datasources`
@@ -187,7 +190,7 @@ Returns a map of an interval to a map of segment metadata to a set of server nam
 
 * `/druid/coordinator/v1/datasources/{dataSourceName}/intervals/{interval}`
 
-Returns a set of segment ids for an ISO8601 interval. Note that {interval} parameters are delimited by a `_` instead of a `/` (e.g., 2016-06-27_2016-06-28).
+Returns a set of segment ids for an interval.
 
 * `/druid/coordinator/v1/datasources/{dataSourceName}/intervals/{interval}?simple`
 
@@ -234,18 +237,19 @@ Enables a segment of a datasource.
 Disables a datasource.
 
 * `/druid/coordinator/v1/datasources/{dataSourceName}/intervals/{interval}`
-* `@Deprecated. /druid/coordinator/v1/datasources/{dataSourceName}?kill=true&interval={myISO8601Interval}`
+* `@Deprecated. /druid/coordinator/v1/datasources/{dataSourceName}?kill=true&interval={myInterval}`
 
 Runs a [Kill task](../ingestion/tasks.html) for a given interval and datasource.
 
-Note that {interval} parameters are delimited by a `_` instead of a `/` (e.g., 2016-06-27_2016-06-28).
-
 * `/druid/coordinator/v1/datasources/{dataSourceName}/segments/{segmentId}`
 
 Disables a segment.
 
 #### Retention Rules
 
+Note that all _interval_ URL parameters are ISO 8601 strings delimited by a `_` instead of a `/` 
+(e.g., 2016-06-27_2016-06-28).
+
 ##### GET
 
 * `/druid/coordinator/v1/rules`
@@ -292,9 +296,10 @@ Optional Header Parameters for auditing the config change can also be specified.
 
 #### Intervals
 
-##### GET
+Note that all _interval_ URL parameters are ISO 8601 strings delimited by a `_` instead of a `/` 
+(e.g., 2016-06-27_2016-06-28).
 
-Note that {interval} parameters are delimited by a `_` instead of a `/` (e.g., 2016-06-27_2016-06-28).
+##### GET
 
 * `/druid/coordinator/v1/intervals`
 
@@ -338,6 +343,7 @@ will be set for them.
 
 Creates or updates the compaction config for a dataSource. See [Compaction Configuration](../configuration/index.html#compaction-dynamic-configuration) for configuration details.
 
+
 ##### DELETE
 
 * `/druid/coordinator/v1/config/compaction/{dataSource}`
@@ -357,12 +363,12 @@ ports.
 * `/druid/coordinator/v1/servers?simple`
 
 Returns a list of server data objects in which each object has the following keys:
-- `host`: host URL include (`{hostname}:{port}`)
-- `type`: node type (`indexer-executor`, `historical`)
-- `currSize`: storage size currently used
-- `maxSize`: maximum storage size
-- `priority`
-- `tier`
+* `host`: host URL include (`{hostname}:{port}`)
+* `type`: node type (`indexer-executor`, `historical`)
+* `currSize`: storage size currently used
+* `maxSize`: maximum storage size
+* `priority`
+* `tier`
 
 ### Overlord
 
@@ -382,8 +388,44 @@ only want the active leader to be considered in-service at the load balancer.
 
 #### Tasks<a name="overlord-tasks"></a> 
 
+Note that all _interval_ URL parameters are ISO 8601 strings delimited by a `_` instead of a `/` 
+(e.g., 2016-06-27_2016-06-28).
+
 ##### GET
 
+* `/druid/indexer/v1/tasks`
+
+Retrieve list of tasks. Accepts query string parameters `state`, `datasource`, `createdTimeInterval`, `max`, and `type`.
+
+|Query Parameter |Description |
+|---|---|
+|`state`|filter list of tasks by task state, valid options are `running`, `complete`, `waiting`, and `pending`.|
+| `datasource`| return tasks filtered by Druid datasource.|
+| `createdTimeInterval`| return tasks created within the specified interval. | 
+| `max`| maximum number of `"complete"` tasks to return. Only applies when `state` is set to `"complete"`.|
+| `type`| filter tasks by task type. See [task documentation](../ingestion/tasks.html) for more details.|
+
+
+* `/druid/indexer/v1/completeTasks`
+
+Retrieve list of complete tasks. Equivalent to `/druid/indexer/v1/tasks?state=complete`.
+
+* `/druid/indexer/v1/runningTasks`
+
+Retrieve list of running tasks. Equivalent to `/druid/indexer/v1/tasks?state=running`.
+
+* `/druid/indexer/v1/waitingTasks`
+
+Retrieve list of waiting tasks. Equivalent to `/druid/indexer/v1/tasks?state=waiting`.
+
+* `/druid/indexer/v1/pendingTasks`
+
+Retrieve list of pending tasks. Equivalent to `/druid/indexer/v1/tasks?state=pending`.
+
+* `/druid/indexer/v1/task/{taskId}`
+
+Retrieve the 'payload' of a task.
+
 * `/druid/indexer/v1/task/{taskId}/status`
 
 Retrieve the status of a task.
@@ -406,14 +448,27 @@ Retrieve a [task completion report](../ingestion/reports.html) for a task. Only
 
 Endpoint for submitting tasks and supervisor specs to the Overlord. Returns the taskId of the submitted task.
 
-* `druid/indexer/v1/task/{taskId}/shutdown`
+* `/druid/indexer/v1/task/{taskId}/shutdown`
 
 Shuts down a task.
 
-* `druid/indexer/v1/datasources/{dataSource}/shutdownAllTasks`
+* `/druid/indexer/v1/datasources/{dataSource}/shutdownAllTasks`
 
 Shuts down all tasks for a dataSource.
 
+* `/druid/indexer/v1/taskStatus`
+
+Retrieve list of task status objects for list of task id strings in request body.
+
+##### DELETE
+
+* `/druid/indexer/v1/pendingSegments/{dataSource}`
+
+Manually clean up pending segments table in metadata storage for `datasource`. Returns a JSON object response with 
+`numDeleted` and count of rows deleted from the pending segments table. This API is used by the 
+`druid.coordinator.kill.pendingSegments.on` [coordinator setting](../configuration/index.html#coordinator-operation)
+which automates this operation to perform periodically.
+
 #### Supervisors
 
 ##### GET
@@ -490,13 +545,94 @@ This API is deprecated and will be removed in future releases.
 Please use the equivalent 'terminate' instead.
 </div>
 
+#### Dynamic Configuration
+See [Overlord Dynamic Configuration](../configuration/index.html#overlord-dynamic-configuration) for details. 
+
+Note that all _interval_ URL parameters are ISO 8601 strings delimited by a `_` instead of a `/` 
+(e.g., 2016-06-27_2016-06-28).
+
+##### GET
+
+* `/druid/indexer/v1/worker`
+
+Retreives current overlord dynamic configuration. 
+
+* `/druid/indexer/v1/worker/history?interval={interval}&counter={count}`
+
+Retrieves history of changes to overlord dynamic configuration. Accepts `interval` and  `count` query string parameters 
+to filter by interval and limit the number of results respectively.
+
+* `/druid/indexer/v1/scaling`
+
+Retrieves overlord scaling events if auto-scaling runners are in use.
+
+##### POST
+
+* /druid/indexer/v1/worker
+
+Update overlord dynamic worker configuration.
+
 ## Data Server
 
-This section documents the API endpoints for the processes that reside on Data servers (MiddleManagers/Peons and Historicals) in the suggested [three-server configuration](../design/processes.html#server-types).
+This section documents the API endpoints for the processes that reside on Data servers (MiddleManagers/Peons and Historicals) 
+in the suggested [three-server configuration](../design/processes.html#server-types).
 
 ### MiddleManager
 
-The MiddleManager does not have any API endpoints beyond the [common endpoints](#common).
+##### GET
+
+* `/druid/worker/v1/enabled`
+
+Check whether a MiddleManager is in an enabled or disabled state. Returns JSON object keyed by the combined `druid.host` 
+and `druid.port` with the boolean state as the value.
+
+```json
+{"localhost:8091":true}
+```
+
+* `/druid/worker/v1/tasks`
+
+Retrieve a list of active tasks being run on MiddleManager. Returns JSON list of taskid strings.  Normal usage should 
+prefer to use the `/druid/indexer/v1/tasks` [Overlord API](#overlord) or one of it's task state specific variants instead.
+
+```json
+["index_wikiticker_2019-02-11T02:20:15.316Z"]
+```
+
+* `/druid/worker/v1/task/{taskid}/log` 
+
+Retrieve task log output stream by task id. Normal usage should prefer to use the `/druid/indexer/v1/task/{taskId}/log`
+[Overlord API](#overlord) instead.
+
+##### POST
+
+* `/druid/worker/v1/disable`
+
+'Disable' a MiddleManager, causing it to stop accepting new tasks but complete all existing tasks. Returns JSON  object 
+keyed by the combined `druid.host` and `druid.port`:
+
+```json
+{"localhost:8091":"disabled"}
+```
+
+* `/druid/worker/v1/enable`
+
+'Enable' a MiddleManager, allowing it to accept new tasks again if it was previously disabled. Returns JSON  object 
+keyed by the combined `druid.host` and `druid.port`:
+
+```json
+{"localhost:8091":"enabled"}
+```
+
+* `/druid/worker/v1/task/{taskid}/shutdown`
+
+Shutdown a running task by `taskid`. Normal usage should prefer to use the `/druid/indexer/v1/task/{taskId}/shutdown` 
+[Overlord API](#overlord) instead. Returns JSON:
+
+```json
+{"task":"index_kafka_wikiticker_f7011f8ffba384b_fpeclode"}
+```
+
 
 ### Peon
 
@@ -536,6 +672,9 @@ This section documents the API endpoints for the processes that reside on Query
 
 #### Datasource Information
 
+Note that all _interval_ URL parameters are ISO 8601 strings delimited by a `_` instead of a `/` 
+(e.g., 2016-06-27_2016-06-28).
+
 ##### GET
 
 * `/druid/v2/datasources`
@@ -546,7 +685,7 @@ Returns a list of queryable datasources.
 
 Returns the dimensions and metrics of the datasource. Optionally, you can provide request parameter "full" to get list of served intervals with dimensions and metrics being served for those intervals. You can also provide request param "interval" explicitly to refer to a particular interval.
 
-If no interval is specified, a default interval spanning a configurable period before the current time will be used. The duration of this interval is specified in ISO8601 format via:
+If no interval is specified, a default interval spanning a configurable period before the current time will be used. The default duration of this interval is specified in ISO 8601 duration format via:
 
 druid.query.segmentMetadata.defaultHistory
 
@@ -555,7 +694,7 @@ druid.query.segmentMetadata.defaultHistory
 Returns the dimensions of the datasource.
 
 <div class="note caution">
-This API is deprecated and will be removed in future releases. Please use [SegmentMetadataQuery](../querying/segmentmetadataquery.html) instead
+This API is deprecated and will be removed in future releases. Please use <a href="../querying/segmentmetadataquery.html">SegmentMetadataQuery</a> instead
 which provides more comprehensive information and supports all dataSource types including streaming dataSources. It's also encouraged to use [INFORMATION_SCHEMA tables](../querying/sql.html#retrieving-metadata)
 if you're using SQL.
 </div>
@@ -565,12 +704,12 @@ if you're using SQL.
 Returns the metrics of the datasource.
 
 <div class="note caution">
-This API is deprecated and will be removed in future releases. Please use [SegmentMetadataQuery](../querying/segmentmetadataquery.html) instead
+This API is deprecated and will be removed in future releases. Please use <a href="../querying/segmentmetadataquery.html">SegmentMetadataQuery</a> instead
 which provides more comprehensive information and supports all dataSource types including streaming dataSources. It's also encouraged to use [INFORMATION_SCHEMA tables](../querying/sql.html#retrieving-metadata)
 if you're using SQL.
 </div>
 
-* `/druid/v2/datasources/{dataSourceName}/candidates?intervals={comma-separated-intervals-in-ISO8601-format}&numCandidates={numCandidates}`
+* `/druid/v2/datasources/{dataSourceName}/candidates?intervals={comma-separated-intervals}&numCandidates={numCandidates}`
 
 Returns segment information lists including server locations for the given datasource and intervals. If "numCandidates" is not specified, it will return all servers for each interval.
 
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/OverlordResource.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/OverlordResource.java
index 9b59202..e5abac4 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/OverlordResource.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/OverlordResource.java
@@ -101,7 +101,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicReference;
-import java.util.function.BiFunction;
 import java.util.stream.Collectors;
 
 /**
@@ -122,7 +121,6 @@ public class OverlordResource
   private AtomicReference<WorkerBehaviorConfig> workerConfigRef = null;
   private static final List API_TASK_STATES = ImmutableList.of("pending", "waiting", "running", "complete");
 
-
   @Inject
   public OverlordResource(
       TaskMaster taskMaster,
@@ -503,100 +501,6 @@ public class OverlordResource
     return getTasks("waiting", null, null, null, null, req);
   }
 
-  private static class AnyTask extends TaskRunnerWorkItem
-  {
-    private final String taskType;
-    private final String dataSource;
-    private final TaskState taskState;
-    private final RunnerTaskState runnerTaskState;
-    private final DateTime createdTime;
-    private final DateTime queueInsertionTime;
-    private final TaskLocation taskLocation;
-
-    AnyTask(
-        String taskId,
-        String taskType,
-        ListenableFuture<TaskStatus> result,
-        String dataSource,
-        TaskState state,
-        RunnerTaskState runnerState,
-        DateTime createdTime,
-        DateTime queueInsertionTime,
-        TaskLocation taskLocation
-    )
-    {
-      super(taskId, result, DateTimes.EPOCH, DateTimes.EPOCH);
-      this.taskType = taskType;
-      this.dataSource = dataSource;
-      this.taskState = state;
-      this.runnerTaskState = runnerState;
-      this.createdTime = createdTime;
-      this.queueInsertionTime = queueInsertionTime;
-      this.taskLocation = taskLocation;
-    }
-
-    @Override
-    public TaskLocation getLocation()
-    {
-      return taskLocation;
-    }
-
-    @Override
-    public String getTaskType()
-    {
-      return taskType;
-    }
-
-    @Override
-    public String getDataSource()
-    {
-      return dataSource;
-    }
-
-    public TaskState getTaskState()
-    {
-      return taskState;
-    }
-
-    public RunnerTaskState getRunnerTaskState()
-    {
-      return runnerTaskState;
-    }
-
-    @Override
-    public DateTime getCreatedTime()
-    {
-      return createdTime;
-    }
-
-    @Override
-    public DateTime getQueueInsertionTime()
-    {
-      return queueInsertionTime;
-    }
-
-    public AnyTask withTaskState(
-        TaskState newTaskState,
-        RunnerTaskState runnerState,
-        DateTime createdTime,
-        DateTime queueInsertionTime,
-        TaskLocation taskLocation
-    )
-    {
-      return new AnyTask(
-          getTaskId(),
-          getTaskType(),
-          getResult(),
-          getDataSource(),
-          newTaskState,
-          runnerState,
-          createdTime,
-          queueInsertionTime,
-          taskLocation
-      );
-    }
-  }
-
   @GET
   @Path("/pendingTasks")
   @Produces(MediaType.APPLICATION_JSON)
@@ -760,120 +664,6 @@ public class OverlordResource
     return Response.ok(authorizedList).build();
   }
 
-  private static BiFunction<TaskInfo<Task, TaskStatus>, RunnerTaskState, TaskStatusPlus> newTaskInfo2TaskStatusPlusFn()
-  {
-    return (taskInfo, runnerTaskState) -> new TaskStatusPlus(
-        taskInfo.getId(),
-        taskInfo.getTask() == null ? null : taskInfo.getTask().getType(),
-        taskInfo.getCreatedTime(),
-        // Would be nice to include the real queue insertion time, but the
-        // TaskStorage API doesn't yet allow it.
-        DateTimes.EPOCH,
-        taskInfo.getStatus().getStatusCode(),
-        runnerTaskState,
-        taskInfo.getStatus().getDuration(),
-        TaskLocation.unknown(),
-        taskInfo.getDataSource(),
-        taskInfo.getStatus().getErrorMsg()
-    );
-  }
-
-  private List<AnyTask> filterActiveTasks(
-      RunnerTaskState state,
-      List<AnyTask> allTasks
-  )
-  {
-    //divide active tasks into 3 lists : running, pending, waiting
-    Optional<TaskRunner> taskRunnerOpt = taskMaster.getTaskRunner();
-    if (!taskRunnerOpt.isPresent()) {
-      throw new WebApplicationException(
-          Response.serverError().entity("No task runner found").build()
-      );
-    }
-    TaskRunner runner = taskRunnerOpt.get();
-    // the order of tasks below is waiting, pending, running to prevent
-    // skipping a task, it's the order in which tasks will change state
-    // if they do while this is code is executing, so a task might be
-    // counted twice but never skipped
-    if (RunnerTaskState.WAITING.equals(state)) {
-      Collection<? extends TaskRunnerWorkItem> runnersKnownTasks = runner.getKnownTasks();
-      Set<String> runnerKnownTaskIds = runnersKnownTasks
-          .stream()
-          .map(TaskRunnerWorkItem::getTaskId)
-          .collect(Collectors.toSet());
-      final List<AnyTask> waitingTasks = new ArrayList<>();
-      for (TaskRunnerWorkItem task : allTasks) {
-        if (!runnerKnownTaskIds.contains(task.getTaskId())) {
-          waitingTasks.add(((AnyTask) task).withTaskState(
-              TaskState.RUNNING,
-              RunnerTaskState.WAITING,
-              task.getCreatedTime(),
-              task.getQueueInsertionTime(),
-              task.getLocation()
-          ));
-        }
-      }
-      return waitingTasks;
-    }
-
-    if (RunnerTaskState.PENDING.equals(state)) {
-      Collection<? extends TaskRunnerWorkItem> knownPendingTasks = runner.getPendingTasks();
-      Set<String> pendingTaskIds = knownPendingTasks
-          .stream()
-          .map(TaskRunnerWorkItem::getTaskId)
-          .collect(Collectors.toSet());
-      Map<String, TaskRunnerWorkItem> workItemIdMap = knownPendingTasks
-          .stream()
-          .collect(Collectors.toMap(
-              TaskRunnerWorkItem::getTaskId,
-              java.util.function.Function.identity(),
-              (previousWorkItem, newWorkItem) -> newWorkItem
-          ));
-      final List<AnyTask> pendingTasks = new ArrayList<>();
-      for (TaskRunnerWorkItem task : allTasks) {
-        if (pendingTaskIds.contains(task.getTaskId())) {
-          pendingTasks.add(((AnyTask) task).withTaskState(
-              TaskState.RUNNING,
-              RunnerTaskState.PENDING,
-              workItemIdMap.get(task.getTaskId()).getCreatedTime(),
-              workItemIdMap.get(task.getTaskId()).getQueueInsertionTime(),
-              workItemIdMap.get(task.getTaskId()).getLocation()
-          ));
-        }
-      }
-      return pendingTasks;
-    }
-
-    if (RunnerTaskState.RUNNING.equals(state)) {
-      Collection<? extends TaskRunnerWorkItem> knownRunningTasks = runner.getRunningTasks();
-      Set<String> runningTaskIds = knownRunningTasks
-          .stream()
-          .map(TaskRunnerWorkItem::getTaskId)
-          .collect(Collectors.toSet());
-      Map<String, TaskRunnerWorkItem> workItemIdMap = knownRunningTasks
-          .stream()
-          .collect(Collectors.toMap(
-              TaskRunnerWorkItem::getTaskId,
-              java.util.function.Function.identity(),
-              (previousWorkItem, newWorkItem) -> newWorkItem
-          ));
-      final List<AnyTask> runningTasks = new ArrayList<>();
-      for (TaskRunnerWorkItem task : allTasks) {
-        if (runningTaskIds.contains(task.getTaskId())) {
-          runningTasks.add(((AnyTask) task).withTaskState(
-              TaskState.RUNNING,
-              RunnerTaskState.RUNNING,
-              workItemIdMap.get(task.getTaskId()).getCreatedTime(),
-              workItemIdMap.get(task.getTaskId()).getQueueInsertionTime(),
-              workItemIdMap.get(task.getTaskId()).getLocation()
-          ));
-        }
-      }
-      return runningTasks;
-    }
-    return allTasks;
-  }
-
   @DELETE
   @Path("/pendingSegments/{dataSource}")
   @Produces(MediaType.APPLICATION_JSON)
@@ -1016,6 +806,102 @@ public class OverlordResource
     }
   }
 
+  private List<AnyTask> filterActiveTasks(
+      RunnerTaskState state,
+      List<AnyTask> allTasks
+  )
+  {
+    //divide active tasks into 3 lists : running, pending, waiting
+    Optional<TaskRunner> taskRunnerOpt = taskMaster.getTaskRunner();
+    if (!taskRunnerOpt.isPresent()) {
+      throw new WebApplicationException(
+          Response.serverError().entity("No task runner found").build()
+      );
+    }
+    TaskRunner runner = taskRunnerOpt.get();
+    // the order of tasks below is waiting, pending, running to prevent
+    // skipping a task, it's the order in which tasks will change state
+    // if they do while this is code is executing, so a task might be
+    // counted twice but never skipped
+    if (RunnerTaskState.WAITING.equals(state)) {
+      Collection<? extends TaskRunnerWorkItem> runnersKnownTasks = runner.getKnownTasks();
+      Set<String> runnerKnownTaskIds = runnersKnownTasks
+          .stream()
+          .map(TaskRunnerWorkItem::getTaskId)
+          .collect(Collectors.toSet());
+      final List<AnyTask> waitingTasks = new ArrayList<>();
+      for (TaskRunnerWorkItem task : allTasks) {
+        if (!runnerKnownTaskIds.contains(task.getTaskId())) {
+          waitingTasks.add(((AnyTask) task).withTaskState(
+              TaskState.RUNNING,
+              RunnerTaskState.WAITING,
+              task.getCreatedTime(),
+              task.getQueueInsertionTime(),
+              task.getLocation()
+          ));
+        }
+      }
+      return waitingTasks;
+    }
+
+    if (RunnerTaskState.PENDING.equals(state)) {
+      Collection<? extends TaskRunnerWorkItem> knownPendingTasks = runner.getPendingTasks();
+      Set<String> pendingTaskIds = knownPendingTasks
+          .stream()
+          .map(TaskRunnerWorkItem::getTaskId)
+          .collect(Collectors.toSet());
+      Map<String, TaskRunnerWorkItem> workItemIdMap = knownPendingTasks
+          .stream()
+          .collect(Collectors.toMap(
+              TaskRunnerWorkItem::getTaskId,
+              java.util.function.Function.identity(),
+              (previousWorkItem, newWorkItem) -> newWorkItem
+          ));
+      final List<AnyTask> pendingTasks = new ArrayList<>();
+      for (TaskRunnerWorkItem task : allTasks) {
+        if (pendingTaskIds.contains(task.getTaskId())) {
+          pendingTasks.add(((AnyTask) task).withTaskState(
+              TaskState.RUNNING,
+              RunnerTaskState.PENDING,
+              workItemIdMap.get(task.getTaskId()).getCreatedTime(),
+              workItemIdMap.get(task.getTaskId()).getQueueInsertionTime(),
+              workItemIdMap.get(task.getTaskId()).getLocation()
+          ));
+        }
+      }
+      return pendingTasks;
+    }
+
+    if (RunnerTaskState.RUNNING.equals(state)) {
+      Collection<? extends TaskRunnerWorkItem> knownRunningTasks = runner.getRunningTasks();
+      Set<String> runningTaskIds = knownRunningTasks
+          .stream()
+          .map(TaskRunnerWorkItem::getTaskId)
+          .collect(Collectors.toSet());
+      Map<String, TaskRunnerWorkItem> workItemIdMap = knownRunningTasks
+          .stream()
+          .collect(Collectors.toMap(
+              TaskRunnerWorkItem::getTaskId,
+              java.util.function.Function.identity(),
+              (previousWorkItem, newWorkItem) -> newWorkItem
+          ));
+      final List<AnyTask> runningTasks = new ArrayList<>();
+      for (TaskRunnerWorkItem task : allTasks) {
+        if (runningTaskIds.contains(task.getTaskId())) {
+          runningTasks.add(((AnyTask) task).withTaskState(
+              TaskState.RUNNING,
+              RunnerTaskState.RUNNING,
+              workItemIdMap.get(task.getTaskId()).getCreatedTime(),
+              workItemIdMap.get(task.getTaskId()).getQueueInsertionTime(),
+              workItemIdMap.get(task.getTaskId()).getLocation()
+          ));
+        }
+      }
+      return runningTasks;
+    }
+    return allTasks;
+  }
+
   private List<TaskStatusPlus> securedTaskStatusPlus(
       List<TaskStatusPlus> collectionToFilter,
       @Nullable String dataSource,
@@ -1057,4 +943,98 @@ public class OverlordResource
         )
     );
   }
+
+  private static class AnyTask extends TaskRunnerWorkItem
+  {
+    private final String taskType;
+    private final String dataSource;
+    private final TaskState taskState;
+    private final RunnerTaskState runnerTaskState;
+    private final DateTime createdTime;
+    private final DateTime queueInsertionTime;
+    private final TaskLocation taskLocation;
+
+    AnyTask(
+        String taskId,
+        String taskType,
+        ListenableFuture<TaskStatus> result,
+        String dataSource,
+        TaskState state,
+        RunnerTaskState runnerState,
+        DateTime createdTime,
+        DateTime queueInsertionTime,
+        TaskLocation taskLocation
+    )
+    {
+      super(taskId, result, DateTimes.EPOCH, DateTimes.EPOCH);
+      this.taskType = taskType;
+      this.dataSource = dataSource;
+      this.taskState = state;
+      this.runnerTaskState = runnerState;
+      this.createdTime = createdTime;
+      this.queueInsertionTime = queueInsertionTime;
+      this.taskLocation = taskLocation;
+    }
+
+    @Override
+    public TaskLocation getLocation()
+    {
+      return taskLocation;
+    }
+
+    @Override
+    public String getTaskType()
+    {
+      return taskType;
+    }
+
+    @Override
+    public String getDataSource()
+    {
+      return dataSource;
+    }
+
+    public TaskState getTaskState()
+    {
+      return taskState;
+    }
+
+    public RunnerTaskState getRunnerTaskState()
+    {
+      return runnerTaskState;
+    }
+
+    @Override
+    public DateTime getCreatedTime()
+    {
+      return createdTime;
+    }
+
+    @Override
+    public DateTime getQueueInsertionTime()
+    {
+      return queueInsertionTime;
+    }
+
+    public AnyTask withTaskState(
+        TaskState newTaskState,
+        RunnerTaskState runnerState,
+        DateTime createdTime,
+        DateTime queueInsertionTime,
+        TaskLocation taskLocation
+    )
+    {
+      return new AnyTask(
+          getTaskId(),
+          getTaskType(),
+          getResult(),
+          getDataSource(),
+          newTaskState,
+          runnerState,
+          createdTime,
+          queueInsertionTime,
+          taskLocation
+      );
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org