You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by "AmatyaAvadhanula (via GitHub)" <gi...@apache.org> on 2024/01/19 06:58:14 UTC

[PR] Try to fetch the task status for an active from memory (druid)

AmatyaAvadhanula opened a new pull request, #15724:
URL: https://github.com/apache/druid/pull/15724

   Utilize the in memory state of the Overlord for active tasks to reduce metadata calls while fetching their statuses
   
   This PR has:
   
   - [x] been self-reviewed.
      - [ ] using the [concurrency checklist](https://github.com/apache/druid/blob/master/dev/code-review/concurrency.md) (Remove this item if the PR doesn't have any relation to concurrency.)
   - [ ] added documentation for new or modified features or behaviors.
   - [ ] a release note entry in the PR description.
   - [ ] added Javadocs for most classes and all non-trivial methods. Linked related entities via Javadoc links.
   - [ ] added or updated version, license, or notice information in [licenses.yaml](https://github.com/apache/druid/blob/master/dev/license.md)
   - [ ] added comments explaining the "why" and the intent of the code wherever would not be obvious for an unfamiliar reader.
   - [ ] added unit tests or modified existing tests to cover new code paths, ensuring the threshold for [code coverage](https://github.com/apache/druid/blob/master/dev/code-review/code-coverage.md) is met.
   - [ ] added integration tests.
   - [ ] been tested in a test Druid cluster.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


Re: [PR] Try to fetch the task status for an active from memory (druid)

Posted by "abhishekagarwal87 (via GitHub)" <gi...@apache.org>.
abhishekagarwal87 commented on code in PR #15724:
URL: https://github.com/apache/druid/pull/15724#discussion_r1461495682


##########
extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java:
##########
@@ -143,7 +143,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
       false,
       false
   );
-  private static final String TOPIC_PREFIX = "testTopic";
+  private static final String TOPIC_PREFIX = "testTopi";

Review Comment:
   Why this change? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


Re: [PR] Try to fetch the task status for an active from memory (druid)

Posted by "AmatyaAvadhanula (via GitHub)" <gi...@apache.org>.
AmatyaAvadhanula commented on code in PR #15724:
URL: https://github.com/apache/druid/pull/15724#discussion_r1479394368


##########
extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java:
##########
@@ -143,7 +143,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
       false,
       false
   );
-  private static final String TOPIC_PREFIX = "testTopic";
+  private static final String TOPIC_PREFIX = "testTopi";

Review Comment:
   Sorry, that was accidental



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


Re: [PR] Try to fetch the task status for an active from memory (druid)

Posted by "gianm (via GitHub)" <gi...@apache.org>.
gianm commented on code in PR #15724:
URL: https://github.com/apache/druid/pull/15724#discussion_r1501707598


##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/OverlordResource.java:
##########
@@ -469,9 +469,15 @@ public Response getMultipleTaskStatuses(Set<String> taskIds)
       return Response.status(Response.Status.BAD_REQUEST).entity("No TaskIds provided.").build();
     }
 
+    final Optional<TaskQueue> taskQueue = taskMaster.getTaskQueue();
     Map<String, TaskStatus> result = Maps.newHashMapWithExpectedSize(taskIds.size());
     for (String taskId : taskIds) {
-      Optional<TaskStatus> optional = taskStorageQueryAdapter.getStatus(taskId);
+      final Optional<TaskStatus> optional;
+      if (taskQueue.isPresent()) {

Review Comment:
   Ah, I see, I didn't realize that the fallback logic was in the `TaskQueue` method. In that case this code is OK.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


Re: [PR] Try to fetch the task status for an active from memory (druid)

Posted by "gianm (via GitHub)" <gi...@apache.org>.
gianm commented on code in PR #15724:
URL: https://github.com/apache/druid/pull/15724#discussion_r1496214158


##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/OverlordResource.java:
##########
@@ -469,9 +469,15 @@ public Response getMultipleTaskStatuses(Set<String> taskIds)
       return Response.status(Response.Status.BAD_REQUEST).entity("No TaskIds provided.").build();
     }
 
+    final Optional<TaskQueue> taskQueue = taskMaster.getTaskQueue();
     Map<String, TaskStatus> result = Maps.newHashMapWithExpectedSize(taskIds.size());
     for (String taskId : taskIds) {
-      Optional<TaskStatus> optional = taskStorageQueryAdapter.getStatus(taskId);
+      final Optional<TaskStatus> optional;
+      if (taskQueue.isPresent()) {

Review Comment:
   If the task is not present in the `taskQueue`, we should then check the `taskStorageQueryAdapter`. It might be already complete.



##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/client/IndexerWorkerManagerClient.java:
##########
@@ -65,10 +65,13 @@ public Map<String, TaskStatus> statuses(Set<String> taskIds)
   @Override
   public TaskLocation location(String workerId)
   {
-    final TaskStatusResponse response = FutureUtils.getUnchecked(overlordClient.taskStatus(workerId), true);
+    final TaskStatus response = FutureUtils.getUnchecked(
+        overlordClient.taskStatuses(ImmutableSet.of(workerId)),
+        true
+    ).get(workerId);
 
-    if (response.getStatus() != null) {
-      return response.getStatus().getLocation();

Review Comment:
   We need the same change in `SpecificTaskServiceLocator`, to use `taskStatuses` instead of `taskStatus`.



##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java:
##########
@@ -894,7 +885,12 @@ public TaskLocation getTaskLocation(final String id)
       @Override
       public Optional<TaskStatus> getTaskStatus(String id)
       {
-        return taskStorage.getStatus(id);
+        final Optional<TaskQueue> taskQueue = taskMaster.getTaskQueue();

Review Comment:
   If the task is not present in the `taskQueue`, we should then check the `taskStorageQueryAdapter`. It might be already complete.



##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java:
##########
@@ -671,7 +672,7 @@ private void notifyStatus(final Task task, final TaskStatus taskStatus, String r
     // Save status to metadata store first, so if we crash while doing the rest of the shutdown, our successor
     // remembers that this task has completed.
     try {
-      final Optional<TaskStatus> previousStatus = taskStorage.getStatus(task.getId());
+      final Optional<TaskStatus> previousStatus = getTaskStatus(task.getId());

Review Comment:
   This really should use the metadata store. The code block is only called when a task completes, and we need to check to make sure the metadata store has the correct status stored.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


Re: [PR] Try to fetch the task status for an active from memory (druid)

Posted by "AmatyaAvadhanula (via GitHub)" <gi...@apache.org>.
AmatyaAvadhanula commented on PR #15724:
URL: https://github.com/apache/druid/pull/15724#issuecomment-1963323843

   Thank you for the reviews @gianm @abhishekagarwal87 
   I've fixed the coverage in one of the modules, but it's not straightforward in the other one.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


Re: [PR] Try to fetch the task status for an active from memory (druid)

Posted by "AmatyaAvadhanula (via GitHub)" <gi...@apache.org>.
AmatyaAvadhanula commented on code in PR #15724:
URL: https://github.com/apache/druid/pull/15724#discussion_r1496788584


##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/OverlordResource.java:
##########
@@ -469,9 +469,15 @@ public Response getMultipleTaskStatuses(Set<String> taskIds)
       return Response.status(Response.Status.BAD_REQUEST).entity("No TaskIds provided.").build();
     }
 
+    final Optional<TaskQueue> taskQueue = taskMaster.getTaskQueue();
     Map<String, TaskStatus> result = Maps.newHashMapWithExpectedSize(taskIds.size());
     for (String taskId : taskIds) {
-      Optional<TaskStatus> optional = taskStorageQueryAdapter.getStatus(taskId);
+      final Optional<TaskStatus> optional;
+      if (taskQueue.isPresent()) {

Review Comment:
   TaskQueue#getStatus looks at the metadata store if the task is not running
   
   ```
     public Optional<TaskStatus> getTaskStatus(final String taskId)
     {
       RunnerTaskState runnerTaskState = taskRunner.getRunnerTaskState(taskId);
       if (runnerTaskState != null && runnerTaskState != RunnerTaskState.NONE) {
         return Optional.of(TaskStatus.running(taskId).withLocation(taskRunner.getTaskLocation(taskId)));
       } else {
         return taskStorage.getStatus(taskId);
       }
     }
   ```
   
   Would it be better to move this fallback to db logic out of the TaskQueue in the calling method?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


Re: [PR] Try to fetch the task status for an active from memory (druid)

Posted by "gianm (via GitHub)" <gi...@apache.org>.
gianm commented on PR #15724:
URL: https://github.com/apache/druid/pull/15724#issuecomment-1962747668

   There are some failures due to code coverage. @AmatyaAvadhanula please see if it's straightforward to add coverage, and if it isn't please let us know.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


Re: [PR] Try to fetch the task status for an active from memory (druid)

Posted by "github-advanced-security[bot] (via GitHub)" <gi...@apache.org>.
github-advanced-security[bot] commented on code in PR #15724:
URL: https://github.com/apache/druid/pull/15724#discussion_r1502006220


##########
indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordResourceTest.java:
##########
@@ -1794,6 +1794,82 @@
     Assert.assertEquals(expectedResourceActions, resourceActions);
   }
 
+  @Test
+  public void testGetMultipleTaskStatuses_presentTaskQueue()
+  {
+    // Needed for teardown
+    EasyMock.replay(
+        authConfig,
+        taskRunner,
+        taskMaster,
+        taskStorageQueryAdapter,
+        indexerMetadataStorageAdapter,
+        req,
+        workerTaskRunnerQueryAdapter
+    );
+
+    TaskQueue taskQueue = EasyMock.createMock(TaskQueue.class);
+    EasyMock.expect(taskQueue.getTaskStatus("task"))
+            .andReturn(Optional.of(TaskStatus.running("task")));
+    EasyMock.replay(taskQueue);
+    TaskMaster taskMaster = EasyMock.createMock(TaskMaster.class);

Review Comment:
   ## Possible confusion of local and field
   
   Confusing name: method [testGetMultipleTaskStatuses_presentTaskQueue](1) also refers to field [taskMaster](2) (without qualifying it with 'this').
   
   [Show more details](https://github.com/apache/druid/security/code-scanning/6772)



##########
indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordResourceTest.java:
##########
@@ -1794,6 +1794,82 @@
     Assert.assertEquals(expectedResourceActions, resourceActions);
   }
 
+  @Test
+  public void testGetMultipleTaskStatuses_presentTaskQueue()
+  {
+    // Needed for teardown
+    EasyMock.replay(
+        authConfig,
+        taskRunner,
+        taskMaster,
+        taskStorageQueryAdapter,
+        indexerMetadataStorageAdapter,
+        req,
+        workerTaskRunnerQueryAdapter
+    );
+
+    TaskQueue taskQueue = EasyMock.createMock(TaskQueue.class);
+    EasyMock.expect(taskQueue.getTaskStatus("task"))
+            .andReturn(Optional.of(TaskStatus.running("task")));
+    EasyMock.replay(taskQueue);
+    TaskMaster taskMaster = EasyMock.createMock(TaskMaster.class);
+    EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue));
+    EasyMock.replay(taskMaster);
+    OverlordResource overlordResource = new OverlordResource(
+        taskMaster,
+        null,
+        null,
+        null,
+        null,
+        null,
+        null,
+        null,
+        null,
+        null
+    );
+    final Object response = overlordResource.getMultipleTaskStatuses(ImmutableSet.of("task"))
+                                            .getEntity();
+    Assert.assertEquals(ImmutableMap.of("task", TaskStatus.running("task")), response);
+  }
+
+  @Test
+  public void testGetMultipleTaskStatuses_absentTaskQueue()
+  {
+    // Needed for teardown
+    EasyMock.replay(
+        authConfig,
+        taskRunner,
+        taskMaster,
+        taskStorageQueryAdapter,
+        indexerMetadataStorageAdapter,
+        req,
+        workerTaskRunnerQueryAdapter
+    );
+
+    TaskStorageQueryAdapter taskStorageQueryAdapter = EasyMock.createMock(TaskStorageQueryAdapter.class);
+    EasyMock.expect(taskStorageQueryAdapter.getStatus("task"))
+            .andReturn(Optional.of(TaskStatus.running("task")));
+    EasyMock.replay(taskStorageQueryAdapter);
+    TaskMaster taskMaster = EasyMock.createMock(TaskMaster.class);

Review Comment:
   ## Possible confusion of local and field
   
   Confusing name: method [testGetMultipleTaskStatuses_absentTaskQueue](1) also refers to field [taskMaster](2) (without qualifying it with 'this').
   
   [Show more details](https://github.com/apache/druid/security/code-scanning/6774)



##########
indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordResourceTest.java:
##########
@@ -1794,6 +1794,82 @@
     Assert.assertEquals(expectedResourceActions, resourceActions);
   }
 
+  @Test
+  public void testGetMultipleTaskStatuses_presentTaskQueue()
+  {
+    // Needed for teardown
+    EasyMock.replay(
+        authConfig,
+        taskRunner,
+        taskMaster,
+        taskStorageQueryAdapter,
+        indexerMetadataStorageAdapter,
+        req,
+        workerTaskRunnerQueryAdapter
+    );
+
+    TaskQueue taskQueue = EasyMock.createMock(TaskQueue.class);
+    EasyMock.expect(taskQueue.getTaskStatus("task"))
+            .andReturn(Optional.of(TaskStatus.running("task")));
+    EasyMock.replay(taskQueue);
+    TaskMaster taskMaster = EasyMock.createMock(TaskMaster.class);
+    EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue));
+    EasyMock.replay(taskMaster);
+    OverlordResource overlordResource = new OverlordResource(
+        taskMaster,
+        null,
+        null,
+        null,
+        null,
+        null,
+        null,
+        null,
+        null,
+        null
+    );
+    final Object response = overlordResource.getMultipleTaskStatuses(ImmutableSet.of("task"))
+                                            .getEntity();
+    Assert.assertEquals(ImmutableMap.of("task", TaskStatus.running("task")), response);
+  }
+
+  @Test
+  public void testGetMultipleTaskStatuses_absentTaskQueue()
+  {
+    // Needed for teardown
+    EasyMock.replay(
+        authConfig,
+        taskRunner,
+        taskMaster,
+        taskStorageQueryAdapter,
+        indexerMetadataStorageAdapter,
+        req,
+        workerTaskRunnerQueryAdapter
+    );
+
+    TaskStorageQueryAdapter taskStorageQueryAdapter = EasyMock.createMock(TaskStorageQueryAdapter.class);

Review Comment:
   ## Possible confusion of local and field
   
   Confusing name: method [testGetMultipleTaskStatuses_absentTaskQueue](1) also refers to field [taskStorageQueryAdapter](2) (without qualifying it with 'this').
   
   [Show more details](https://github.com/apache/druid/security/code-scanning/6773)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


Re: [PR] Try to fetch the task status for an active from memory (druid)

Posted by "AmatyaAvadhanula (via GitHub)" <gi...@apache.org>.
AmatyaAvadhanula commented on code in PR #15724:
URL: https://github.com/apache/druid/pull/15724#discussion_r1498622977


##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java:
##########
@@ -671,7 +672,7 @@ private void notifyStatus(final Task task, final TaskStatus taskStatus, String r
     // Save status to metadata store first, so if we crash while doing the rest of the shutdown, our successor
     // remembers that this task has completed.
     try {
-      final Optional<TaskStatus> previousStatus = taskStorage.getStatus(task.getId());
+      final Optional<TaskStatus> previousStatus = getTaskStatus(task.getId());

Review Comment:
   Thanks, added a comment as well



##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/client/IndexerWorkerManagerClient.java:
##########
@@ -65,10 +65,13 @@ public Map<String, TaskStatus> statuses(Set<String> taskIds)
   @Override
   public TaskLocation location(String workerId)
   {
-    final TaskStatusResponse response = FutureUtils.getUnchecked(overlordClient.taskStatus(workerId), true);
+    final TaskStatus response = FutureUtils.getUnchecked(
+        overlordClient.taskStatuses(ImmutableSet.of(workerId)),
+        true
+    ).get(workerId);
 
-    if (response.getStatus() != null) {
-      return response.getStatus().getLocation();

Review Comment:
   Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


Re: [PR] Try to fetch the task status for an active from memory (druid)

Posted by "AmatyaAvadhanula (via GitHub)" <gi...@apache.org>.
AmatyaAvadhanula merged PR #15724:
URL: https://github.com/apache/druid/pull/15724


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org