You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by GitBox <gi...@apache.org> on 2022/09/07 04:19:02 UTC

[GitHub] [druid] kfaraz commented on a diff in pull request #11296: Fix future control bug for taskClient.pause

kfaraz commented on code in PR #11296:
URL: https://github.com/apache/druid/pull/11296#discussion_r963882323


##########
extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java:
##########
@@ -628,6 +640,9 @@ public void testReplicas() throws Exception
         )
     ).anyTimes();
     EasyMock.expect(taskQueue.add(EasyMock.capture(captured))).andReturn(true).times(2);
+    taskClient.cancelTaskPauseRequests();
+    EasyMock.expectLastCall().atLeastOnce();

Review Comment:
   Nit: Does this need to be verified in all the tests? Wouldn't just verifying it in some of the tests be enough?
   
   Same for the `KinesisSupervisorTest`.



##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java:
##########
@@ -2824,60 +2824,81 @@ private void checkTaskDuration() throws ExecutionException, InterruptedException
       }
     }
 
-    List<Map<PartitionIdType, SequenceOffsetType>> results = Futures.successfulAsList(futures)
-                                                                    .get(futureTimeoutInSeconds, TimeUnit.SECONDS);
-    for (int j = 0; j < results.size(); j++) {
-      Integer groupId = futureGroupIds.get(j);
-      TaskGroup group = activelyReadingTaskGroups.get(groupId);
-      Map<PartitionIdType, SequenceOffsetType> endOffsets = results.get(j);
-
-      if (endOffsets != null) {
-        // set a timeout and put this group in pendingCompletionTaskGroups so that it can be monitored for completion
-        group.completionTimeout = DateTimes.nowUtc().plus(ioConfig.getCompletionTimeout());
-        pendingCompletionTaskGroups.computeIfAbsent(groupId, k -> new CopyOnWriteArrayList<>()).add(group);
+    try {
+      Futures.successfulAsList(futures).get(futureTimeoutInSeconds, TimeUnit.SECONDS);
+    }
+    finally {
+      for (int i = 0; i < futureGroupIds.size(); ++i) {
+        Integer groupId = futureGroupIds.get(i);
+        ListenableFuture<Map<PartitionIdType, SequenceOffsetType>> future = futures.get(i);
 
+        if (future.isDone()) {
+          log.info("GroupId [%d] checking task finished", groupId);
 
-        boolean endOffsetsAreInvalid = false;
-        for (Entry<PartitionIdType, SequenceOffsetType> entry : endOffsets.entrySet()) {
-          if (entry.getValue().equals(getEndOfPartitionMarker())) {
-            log.info(
-                "Got end of partition marker for partition [%s] in checkTaskDuration, not updating partition offset.",
-                entry.getKey()
-            );
-            endOffsetsAreInvalid = true;
+          try {
+            this.moveGroupFromReadingToPending(groupId, future.get());
           }
-        }
-
-        // set endOffsets as the next startOffsets
-        // If we received invalid endOffset values, we clear the known offset to refetch the last committed offset
-        // from metadata. If any endOffset values are invalid, we treat the entire set as invalid as a safety measure.
-        if (!endOffsetsAreInvalid) {
-          for (Entry<PartitionIdType, SequenceOffsetType> entry : endOffsets.entrySet()) {
-            partitionOffsets.put(entry.getKey(), entry.getValue());
+          catch (Exception e1) {
+            log.warn(e1, "Get future result failed for groupId[%d]", groupId);
           }
         } else {
-          for (Entry<PartitionIdType, SequenceOffsetType> entry : endOffsets.entrySet()) {
-            partitionOffsets.put(entry.getKey(), getNotSetMarker());
-          }
+          log.warn("GroupId [%d] checking task not finished", groupId);

Review Comment:
   ```suggestion
             log.debug("Could not finish checkpoint for groupId [%d]", groupId);
   ```
   
   Changed these logs to debug as they are only useful while debugging. The groupId integer by itself doesn't give much information, so there is not much use flooding info logs with these lines.



##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java:
##########
@@ -2824,60 +2824,81 @@ private void checkTaskDuration() throws ExecutionException, InterruptedException
       }
     }
 
-    List<Map<PartitionIdType, SequenceOffsetType>> results = Futures.successfulAsList(futures)
-                                                                    .get(futureTimeoutInSeconds, TimeUnit.SECONDS);
-    for (int j = 0; j < results.size(); j++) {
-      Integer groupId = futureGroupIds.get(j);
-      TaskGroup group = activelyReadingTaskGroups.get(groupId);
-      Map<PartitionIdType, SequenceOffsetType> endOffsets = results.get(j);
-
-      if (endOffsets != null) {
-        // set a timeout and put this group in pendingCompletionTaskGroups so that it can be monitored for completion
-        group.completionTimeout = DateTimes.nowUtc().plus(ioConfig.getCompletionTimeout());
-        pendingCompletionTaskGroups.computeIfAbsent(groupId, k -> new CopyOnWriteArrayList<>()).add(group);
+    try {
+      Futures.successfulAsList(futures).get(futureTimeoutInSeconds, TimeUnit.SECONDS);
+    }
+    finally {
+      for (int i = 0; i < futureGroupIds.size(); ++i) {
+        Integer groupId = futureGroupIds.get(i);
+        ListenableFuture<Map<PartitionIdType, SequenceOffsetType>> future = futures.get(i);
 
+        if (future.isDone()) {
+          log.info("GroupId [%d] checking task finished", groupId);

Review Comment:
   ```suggestion
             log.debug("Finished checkpoint for groupId [%d]", groupId);
   ```



##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskClient.java:
##########
@@ -423,6 +469,35 @@ public ListenableFuture<SeekableStreamIndexTaskRunner.Status> getStatusAsync(fin
   protected abstract Class<PartitionIdType> getPartitionType();
 
   protected abstract Class<SequenceOffsetType> getSequenceType();
+
+  @VisibleForTesting
+  public class TaskPauseControlInfo

Review Comment:
   The only use of this class seems to be to have the `running` field which tracks whether a task is required to be paused or not. I think we can avoid having this class altogether if we use the other suggested changes.



##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskClient.java:
##########
@@ -131,7 +137,7 @@ public Map<PartitionIdType, SequenceOffsetType> pause(final String id)
       } else if (responseStatus.equals(HttpResponseStatus.ACCEPTED)) {
         // The task received the pause request, but its status hasn't been changed yet.
         final RetryPolicy retryPolicy = newRetryPolicy();
-        while (true) {
+        while (pausingTaskFutureMap.getOrDefault(id, DEFAULT_TASK_PAUSE_CONTROL_INFO).isRunning()) {

Review Comment:
   ```suggestion
           while (pausingTaskFutureMap.containsKey(id)) {
   ```



##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskClient.java:
##########
@@ -156,6 +162,9 @@ public Map<PartitionIdType, SequenceOffsetType> pause(final String id)
             Thread.sleep(sleepTime);
           }
         }
+
+        log.info("Task [%s] pause timeout, force to be finished", id);

Review Comment:
   ```suggestion
           log.debug("Could not finish pausing task [%s]. Pause request has been cancelled.", id);
   ```



##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskClient.java:
##########
@@ -376,10 +385,47 @@ public ListenableFuture<DateTime> getStartTimeAsync(final String id)
     return doAsync(() -> getStartTime(id));
   }
 
-
   public ListenableFuture<Map<PartitionIdType, SequenceOffsetType>> pauseAsync(final String id)
   {
-    return doAsync(() -> pause(id));
+    ListenableFuture<Map<PartitionIdType, SequenceOffsetType>> future = doAsync(() -> pause(id));
+    pausingTaskFutureMap.put(id, new TaskPauseControlInfo(future, true));

Review Comment:
   We should also add a callback on this future to remove entries from the map when the future completes. Otherwise, the map `pausingTaskFutureMap` will keep growing in size.



##########
indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamSupervisorSpecTest.java:
##########
@@ -753,7 +753,7 @@ public void testSeekableStreamSupervisorSpecWithScaleOut() throws InterruptedExc
     Assert.assertEquals(1, taskCountBeforeScaleOut);
     Thread.sleep(1000);
     int taskCountAfterScaleOut = supervisor.getIoConfig().getTaskCount();
-    Assert.assertEquals(2, taskCountAfterScaleOut);
+    Assert.assertEquals(1, taskCountAfterScaleOut);

Review Comment:
   Could you please clarify why these values had to be changed?
   I assume it's because some tasks are now being successfully killed/paused, but just want to be sure.



##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java:
##########
@@ -2824,60 +2824,81 @@ private void checkTaskDuration() throws ExecutionException, InterruptedException
       }
     }
 
-    List<Map<PartitionIdType, SequenceOffsetType>> results = Futures.successfulAsList(futures)
-                                                                    .get(futureTimeoutInSeconds, TimeUnit.SECONDS);
-    for (int j = 0; j < results.size(); j++) {
-      Integer groupId = futureGroupIds.get(j);
-      TaskGroup group = activelyReadingTaskGroups.get(groupId);
-      Map<PartitionIdType, SequenceOffsetType> endOffsets = results.get(j);
-
-      if (endOffsets != null) {
-        // set a timeout and put this group in pendingCompletionTaskGroups so that it can be monitored for completion
-        group.completionTimeout = DateTimes.nowUtc().plus(ioConfig.getCompletionTimeout());
-        pendingCompletionTaskGroups.computeIfAbsent(groupId, k -> new CopyOnWriteArrayList<>()).add(group);
+    try {
+      Futures.successfulAsList(futures).get(futureTimeoutInSeconds, TimeUnit.SECONDS);
+    }
+    finally {
+      for (int i = 0; i < futureGroupIds.size(); ++i) {
+        Integer groupId = futureGroupIds.get(i);
+        ListenableFuture<Map<PartitionIdType, SequenceOffsetType>> future = futures.get(i);
 
+        if (future.isDone()) {
+          log.info("GroupId [%d] checking task finished", groupId);
 
-        boolean endOffsetsAreInvalid = false;
-        for (Entry<PartitionIdType, SequenceOffsetType> entry : endOffsets.entrySet()) {
-          if (entry.getValue().equals(getEndOfPartitionMarker())) {
-            log.info(
-                "Got end of partition marker for partition [%s] in checkTaskDuration, not updating partition offset.",
-                entry.getKey()
-            );
-            endOffsetsAreInvalid = true;
+          try {
+            this.moveGroupFromReadingToPending(groupId, future.get());
           }
-        }
-
-        // set endOffsets as the next startOffsets
-        // If we received invalid endOffset values, we clear the known offset to refetch the last committed offset
-        // from metadata. If any endOffset values are invalid, we treat the entire set as invalid as a safety measure.
-        if (!endOffsetsAreInvalid) {
-          for (Entry<PartitionIdType, SequenceOffsetType> entry : endOffsets.entrySet()) {
-            partitionOffsets.put(entry.getKey(), entry.getValue());
+          catch (Exception e1) {
+            log.warn(e1, "Get future result failed for groupId[%d]", groupId);
           }
         } else {
-          for (Entry<PartitionIdType, SequenceOffsetType> entry : endOffsets.entrySet()) {
-            partitionOffsets.put(entry.getKey(), getNotSetMarker());
-          }
+          log.warn("GroupId [%d] checking task not finished", groupId);
         }
-      } else {
-        for (String id : group.taskIds()) {
-          killTask(
-              id,
-              "All tasks in group [%s] failed to transition to publishing state",
-              groupId
+      }
+
+      taskClient.cancelTaskPauseRequests();
+    }
+  }
+
+  protected void moveGroupFromReadingToPending(Integer groupId, Map<PartitionIdType, SequenceOffsetType> endOffsets)

Review Comment:
   ```suggestion
     protected void moveTaskGroupToPendingCompletion(Integer groupId, Map<PartitionIdType, SequenceOffsetType> endOffsets)
   ```
   This name would be better as there is already a method of the same name which does something similar. So we can just overload that one so that the functionality is clear.



##########
indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskClientTest.java:
##########
@@ -0,0 +1,305 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.seekablestream;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Optional;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Maps;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import org.apache.druid.indexer.TaskLocation;
+import org.apache.druid.indexer.TaskStatus;
+import org.apache.druid.indexing.common.IndexTaskClient;
+import org.apache.druid.indexing.common.TaskInfoProvider;
+import org.apache.druid.jackson.DefaultObjectMapper;
+import org.apache.druid.java.util.common.Either;
+import org.apache.druid.java.util.common.RE;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.http.client.HttpClient;
+import org.apache.druid.java.util.http.client.Request;
+import org.apache.druid.java.util.http.client.response.ObjectOrErrorResponseHandler;
+import org.apache.druid.java.util.http.client.response.StringFullResponseHolder;
+import org.easymock.Capture;
+import org.easymock.CaptureType;
+import org.easymock.EasyMock;
+import org.easymock.EasyMockSupport;
+import org.jboss.netty.handler.codec.http.HttpHeaders;
+import org.jboss.netty.handler.codec.http.HttpMethod;
+import org.jboss.netty.handler.codec.http.HttpResponse;
+import org.jboss.netty.handler.codec.http.HttpResponseStatus;
+import org.joda.time.Duration;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+public class SeekableStreamIndexTaskClientTest extends EasyMockSupport
+{
+  private static final String TEST_ID = "test-id";
+  private static final List<String> TEST_IDS = Arrays.asList("test-id1", "test-id2", "test-id3", "test-id4");
+  private static final String TEST_HOST = "test-host";
+  private static final int TEST_PORT = 1234;
+  private static final int TEST_TLS_PORT = -1;
+  private static final String TEST_DATASOURCE = "test-datasource";
+  private static final Duration TEST_HTTP_TIMEOUT = new Duration(5000);
+  private static final long TEST_NUM_RETRIES = 0;
+  private static final String URL_FORMATTER = "http://%s:%d/druid/worker/v1/chat/%s/%s";
+
+  private MySeekableStreamIndexTaskClient taskClient;
+  private HttpClient httpClient;
+  private ObjectMapper jsonMapper = new DefaultObjectMapper();
+  private TaskInfoProvider taskInfoProvider;
+  private StringFullResponseHolder responseHolder;
+  private HttpResponse response;
+  private HttpHeaders headers;
+
+  @Before
+  public void setUp()

Review Comment:
   Thanks for adding tests!



##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java:
##########
@@ -1783,41 +1783,49 @@ public Response pauseHTTP(
   @VisibleForTesting
   public Response pause() throws InterruptedException
   {
-    if (!(status == Status.PAUSED || status == Status.READING)) {
+    if (status == Status.NOT_STARTED || status == Status.STARTING) {
       return Response.status(Response.Status.BAD_REQUEST)
-                     .type(MediaType.TEXT_PLAIN)
-                     .entity(StringUtils.format("Can't pause, task is not in a pausable state (state: [%s])", status))
-                     .build();
+              .type(MediaType.TEXT_PLAIN)
+              .entity(StringUtils.format("Can't pause, task state is invalid (state: [%s])", status))

Review Comment:
   ```suggestion
                 .entity(StringUtils.format("Cannot pause task in state [%s]", status))
   ```



##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java:
##########
@@ -1783,41 +1783,49 @@ public Response pauseHTTP(
   @VisibleForTesting
   public Response pause() throws InterruptedException
   {
-    if (!(status == Status.PAUSED || status == Status.READING)) {
+    if (status == Status.NOT_STARTED || status == Status.STARTING) {
       return Response.status(Response.Status.BAD_REQUEST)
-                     .type(MediaType.TEXT_PLAIN)
-                     .entity(StringUtils.format("Can't pause, task is not in a pausable state (state: [%s])", status))
-                     .build();
+              .type(MediaType.TEXT_PLAIN)
+              .entity(StringUtils.format("Can't pause, task state is invalid (state: [%s])", status))
+              .build();
     }
 
-    pauseLock.lockInterruptibly();
-    try {
-      pauseRequested = true;
+    // if status in [PAUSED, READING], need to pause
+    // if status == PUBLISHING, return current offset, not to report exception
+    if (status == Status.PAUSED || status == Status.READING) {
+      log.info("Task state is pausable, taskId: [%s], state: [%s])", task.getId(), status);

Review Comment:
   ```suggestion
         log.info("Pausing taskId [%s] in state [%s])", task.getId(), status);
   ```



##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java:
##########
@@ -1783,41 +1783,49 @@ public Response pauseHTTP(
   @VisibleForTesting
   public Response pause() throws InterruptedException
   {
-    if (!(status == Status.PAUSED || status == Status.READING)) {
+    if (status == Status.NOT_STARTED || status == Status.STARTING) {
       return Response.status(Response.Status.BAD_REQUEST)
-                     .type(MediaType.TEXT_PLAIN)
-                     .entity(StringUtils.format("Can't pause, task is not in a pausable state (state: [%s])", status))
-                     .build();
+              .type(MediaType.TEXT_PLAIN)
+              .entity(StringUtils.format("Can't pause, task state is invalid (state: [%s])", status))
+              .build();
     }
 
-    pauseLock.lockInterruptibly();
-    try {
-      pauseRequested = true;
+    // if status in [PAUSED, READING], need to pause
+    // if status == PUBLISHING, return current offset, not to report exception
+    if (status == Status.PAUSED || status == Status.READING) {
+      log.info("Task state is pausable, taskId: [%s], state: [%s])", task.getId(), status);
 
-      pollRetryLock.lockInterruptibly();
+      pauseLock.lockInterruptibly();
       try {
-        isAwaitingRetry.signalAll();
-      }
-      finally {
-        pollRetryLock.unlock();
-      }
+        pauseRequested = true;
 
-      if (isPaused()) {
-        shouldResume.signalAll(); // kick the monitor so it re-awaits with the new pauseMillis
-      }
+        pollRetryLock.lockInterruptibly();
+        try {
+          isAwaitingRetry.signalAll();
+        }
+        finally {
+          pollRetryLock.unlock();
+        }
 
-      long nanos = TimeUnit.SECONDS.toNanos(2);
-      while (!isPaused()) {
-        if (nanos <= 0L) {
-          return Response.status(Response.Status.ACCEPTED)
-                         .entity("Request accepted but task has not yet paused")
-                         .build();
+        if (isPaused()) {
+          shouldResume.signalAll(); // kick the monitor so it re-awaits with the new pauseMillis
+        }
+
+        long nanos = TimeUnit.SECONDS.toNanos(2);
+        while (!isPaused()) {
+          if (nanos <= 0L) {
+            return Response.status(Response.Status.ACCEPTED)
+                    .entity("Request accepted but task has not yet paused")
+                    .build();
+          }
+          nanos = hasPaused.awaitNanos(nanos);
         }
-        nanos = hasPaused.awaitNanos(nanos);
       }
-    }
-    finally {
-      pauseLock.unlock();
+      finally {
+        pauseLock.unlock();
+      }
+    } else {
+      log.info("Return current offsets directly, taskId: [%s], state: [%s]", task.getId(), status);

Review Comment:
   ```suggestion
         log.info("Need not pause taskId [%s] as it is already in state [%s]", task.getId(), status);
   ```



##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskClient.java:
##########
@@ -55,6 +57,10 @@
 
   private static final EmittingLogger log = new EmittingLogger(SeekableStreamIndexTaskClient.class);
 
+  private final TaskPauseControlInfo DEFAULT_TASK_PAUSE_CONTROL_INFO = new TaskPauseControlInfo(null, true);

Review Comment:
   ```suggestion
   ```



##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskClient.java:
##########
@@ -376,10 +385,47 @@ public ListenableFuture<DateTime> getStartTimeAsync(final String id)
     return doAsync(() -> getStartTime(id));
   }
 
-
   public ListenableFuture<Map<PartitionIdType, SequenceOffsetType>> pauseAsync(final String id)
   {
-    return doAsync(() -> pause(id));
+    ListenableFuture<Map<PartitionIdType, SequenceOffsetType>> future = doAsync(() -> pause(id));
+    pausingTaskFutureMap.put(id, new TaskPauseControlInfo(future, true));

Review Comment:
   ```suggestion
       pausingTaskFutureMap.put(id, future);
   ```



##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskClient.java:
##########
@@ -376,10 +385,47 @@ public ListenableFuture<DateTime> getStartTimeAsync(final String id)
     return doAsync(() -> getStartTime(id));
   }
 
-
   public ListenableFuture<Map<PartitionIdType, SequenceOffsetType>> pauseAsync(final String id)
   {
-    return doAsync(() -> pause(id));
+    ListenableFuture<Map<PartitionIdType, SequenceOffsetType>> future = doAsync(() -> pause(id));
+    pausingTaskFutureMap.put(id, new TaskPauseControlInfo(future, true));
+    return future;
+  }
+
+  public void cancelTaskPauseRequests()
+  {
+    for (Map.Entry<String, TaskPauseControlInfo> entry : pausingTaskFutureMap.entrySet()) {
+      String taskId = entry.getKey();
+      TaskPauseControlInfo taskPauseControl = entry.getValue();
+
+      if (!taskPauseControl.getFuture().isDone()) {
+        this.stopPausingTask(taskId, taskPauseControl);
+        log.info("Cancel unfinished pause task [%s]", taskId);
+      } else {
+        log.info("Finished pause task [%s]", taskId);
+      }
+    }
+
+    pausingTaskFutureMap.clear();
+  }
+
+  @VisibleForTesting
+  public int getPauseFutureSize()
+  {
+    return pausingTaskFutureMap.size();
+  }
+
+  @VisibleForTesting
+  protected void stopPausingTask(String taskId, TaskPauseControlInfo taskPauseControl)

Review Comment:
   this method would not be needed with the suggested changes.



##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskClient.java:
##########
@@ -376,10 +385,47 @@ public ListenableFuture<DateTime> getStartTimeAsync(final String id)
     return doAsync(() -> getStartTime(id));
   }
 
-
   public ListenableFuture<Map<PartitionIdType, SequenceOffsetType>> pauseAsync(final String id)
   {
-    return doAsync(() -> pause(id));
+    ListenableFuture<Map<PartitionIdType, SequenceOffsetType>> future = doAsync(() -> pause(id));
+    pausingTaskFutureMap.put(id, new TaskPauseControlInfo(future, true));
+    return future;
+  }
+
+  public void cancelTaskPauseRequests()
+  {
+    for (Map.Entry<String, TaskPauseControlInfo> entry : pausingTaskFutureMap.entrySet()) {
+      String taskId = entry.getKey();
+      TaskPauseControlInfo taskPauseControl = entry.getValue();
+
+      if (!taskPauseControl.getFuture().isDone()) {
+        this.stopPausingTask(taskId, taskPauseControl);
+        log.info("Cancel unfinished pause task [%s]", taskId);
+      } else {
+        log.info("Finished pause task [%s]", taskId);
+      }
+    }
+
+    pausingTaskFutureMap.clear();
+  }
+
+  @VisibleForTesting
+  public int getPauseFutureSize()

Review Comment:
   We shouldn't need to expose this. There should be other ways to verify that the pause has been cancelled for a given taskId. 
   
   For example, I can see is that while retrying to pause, we keep polling on the status (`getStatus`). In the test, we can assert that if pause requests have been cancelled, `getStatus` is not called anymore.



##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskClient.java:
##########
@@ -376,10 +385,47 @@ public ListenableFuture<DateTime> getStartTimeAsync(final String id)
     return doAsync(() -> getStartTime(id));
   }
 
-
   public ListenableFuture<Map<PartitionIdType, SequenceOffsetType>> pauseAsync(final String id)
   {
-    return doAsync(() -> pause(id));
+    ListenableFuture<Map<PartitionIdType, SequenceOffsetType>> future = doAsync(() -> pause(id));
+    pausingTaskFutureMap.put(id, new TaskPauseControlInfo(future, true));
+    return future;
+  }
+
+  public void cancelTaskPauseRequests()
+  {
+    for (Map.Entry<String, TaskPauseControlInfo> entry : pausingTaskFutureMap.entrySet()) {
+      String taskId = entry.getKey();
+      TaskPauseControlInfo taskPauseControl = entry.getValue();
+
+      if (!taskPauseControl.getFuture().isDone()) {
+        this.stopPausingTask(taskId, taskPauseControl);
+        log.info("Cancel unfinished pause task [%s]", taskId);
+      } else {
+        log.info("Finished pause task [%s]", taskId);
+      }
+    }

Review Comment:
   ```suggestion
       for (Map.Entry<String, Future> entry : pausingTaskFutureMap.entrySet()) {
         String taskId = entry.getKey();
         Future taskPauseFuture = entry.getValue();
   
         if (!taskPauseFuture.isDone()) {
           log.debug("Cancel unfinished pause of task [%s]", taskId);
         } else {
           log.debug("Task [%s] is already paused", taskId);
         }
       }
   ```



##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskClient.java:
##########
@@ -55,6 +57,10 @@
 
   private static final EmittingLogger log = new EmittingLogger(SeekableStreamIndexTaskClient.class);
 
+  private final TaskPauseControlInfo DEFAULT_TASK_PAUSE_CONTROL_INFO = new TaskPauseControlInfo(null, true);
+
+  private ConcurrentHashMap<String, TaskPauseControlInfo> pausingTaskFutureMap = new ConcurrentHashMap<>();

Review Comment:
   ```suggestion
     private ConcurrentHashMap<String, Future> pausingTaskFutureMap = new ConcurrentHashMap<>();
   ```



##########
extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskClientTest.java:
##########
@@ -1054,6 +1054,36 @@ public void testSetEndOffsetsAsyncWithResume() throws Exception
     }
   }
 
+  @Test
+  public void testCancelTaskPauseRequests() throws Exception
+  {
+    final int numRequests = TEST_IDS.size();
+    Capture<Request> captured = Capture.newInstance(CaptureType.ALL);
+    EasyMock.expect(responseHolder.getStatus()).andReturn(HttpResponseStatus.OK).anyTimes();
+    EasyMock.expect(responseHolder.getContent()).andReturn("{\"0\":\"1\"}").anyTimes();
+    EasyMock.expect(httpClient.go(
+            EasyMock.capture(captured),
+            EasyMock.anyObject(ObjectOrErrorResponseHandler.class),
+            EasyMock.eq(TEST_HTTP_TIMEOUT)
+    )).andReturn(
+            okResponseHolder()
+    ).times(numRequests);
+    replayAll();
+
+    List<ListenableFuture<Map<Integer, Long>>> futures = new ArrayList<>();
+    for (String testId : TEST_IDS) {
+      futures.add(client.pauseAsync(testId));
+    }
+
+    Assert.assertEquals(client.getPauseFutureSize(), TEST_IDS.size());
+
+    Futures.allAsList(futures).get();
+    verifyAll();
+
+    client.cancelTaskPauseRequests();
+    Assert.assertEquals(client.getPauseFutureSize(), 0);

Review Comment:
   This assertion can lead to flaky tests as it is possible that at this point some of the pauses would have already succeeded (or failed) thus changing the value returned by `getPauseFuturesSize`.
   
   Added some suggestions in `SeekableStreamTaskClient` to rewrite these tests.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org