You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by jo...@apache.org on 2019/08/13 00:42:16 UTC

[incubator-druid] branch master updated: Add TaskResourceCleaner; fix a couple of concurrency bugs in batch tasks (#8236)

This is an automated email from the ASF dual-hosted git repository.

jonwei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 312cdc2  Add TaskResourceCleaner; fix a couple of concurrency bugs in batch tasks (#8236)
312cdc2 is described below

commit 312cdc245239aa1411665e91eaeeadfd13301d25
Author: Jihoon Son <ji...@apache.org>
AuthorDate: Mon Aug 12 17:42:06 2019 -0700

    Add TaskResourceCleaner; fix a couple of concurrency bugs in batch tasks (#8236)
    
    * Add TaskResourceCleaner; fix a couple of concurrency bugs in batch tasks
    
    * kill runner when it's ready
    
    * add comment
    
    * kill run thread
    
    * fix test
    
    * Take closeable out of Appenderator
    
    * add javadoc
    
    * fix test
    
    * fix test
    
    * update javadoc
    
    * add javadoc about killed task
    
    * address comment
    
    * handling missing exceptions
    
    * more clear javadoc for stopGracefully
    
    * update javadoc
    
    * Add missing statement in javadoc
    
    * typo
---
 .../common/task/AbstractBatchIndexTask.java        | 65 +++++++++++++++++++
 .../common/task/AbstractFixedIntervalTask.java     |  6 ++
 .../druid/indexing/common/task/AbstractTask.java   | 11 ----
 .../task/AppenderatorDriverRealtimeIndexTask.java  |  2 +-
 .../druid/indexing/common/task/CompactionTask.java | 39 ++++++++++--
 .../indexing/common/task/HadoopIndexTask.java      | 55 +++++++---------
 .../druid/indexing/common/task/IndexTask.java      | 64 +++++++------------
 .../druid/indexing/common/task/NoopTask.java       |  6 ++
 .../apache/druid/indexing/common/task/Task.java    | 18 +++++-
 .../indexing/common/task/TaskResourceCleaner.java  | 73 ++++++++++++++++++++++
 .../task/batch/parallel/ParallelIndexSubTask.java  | 46 +++++---------
 .../parallel/ParallelIndexSupervisorTask.java      | 64 ++++++-------------
 .../apache/druid/indexing/common/TestTasks.java    | 11 ++++
 .../druid/indexing/common/task/HadoopTaskTest.java |  7 ++-
 .../ParallelIndexSupervisorTaskKillTest.java       | 11 +++-
 .../druid/indexing/overlord/RealtimeishTask.java   |  6 ++
 .../druid/indexing/overlord/TaskLifecycleTest.java | 15 +++++
 .../druid/indexing/overlord/TaskLockboxTest.java   |  6 ++
 .../overlord/http/OverlordResourceTest.java        |  6 ++
 .../realtime/appenderator/Appenderator.java        |  4 +-
 .../DefaultOfflineAppenderatorFactoryTest.java     |  8 ++-
 21 files changed, 347 insertions(+), 176 deletions(-)

diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java
index b00c646..56dccbe 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java
@@ -22,12 +22,15 @@ package org.apache.druid.indexing.common.task;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Iterables;
 import org.apache.druid.data.input.FirehoseFactory;
+import org.apache.druid.indexer.TaskStatus;
 import org.apache.druid.indexing.common.LockGranularity;
 import org.apache.druid.indexing.common.TaskLock;
 import org.apache.druid.indexing.common.TaskLockType;
+import org.apache.druid.indexing.common.TaskToolbox;
 import org.apache.druid.indexing.common.actions.SegmentListUsedAction;
 import org.apache.druid.indexing.common.actions.TaskActionClient;
 import org.apache.druid.indexing.common.actions.TimeChunkLockTryAcquireAction;
+import org.apache.druid.indexing.common.config.TaskConfig;
 import org.apache.druid.indexing.firehose.IngestSegmentFirehoseFactory;
 import org.apache.druid.indexing.firehose.WindowedSegmentId;
 import org.apache.druid.java.util.common.JodaUtils;
@@ -43,6 +46,7 @@ import org.joda.time.Interval;
 import org.joda.time.Period;
 
 import javax.annotation.Nullable;
+import javax.annotation.concurrent.GuardedBy;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
@@ -50,6 +54,7 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.function.Consumer;
 import java.util.stream.Collectors;
 import java.util.stream.StreamSupport;
 
@@ -64,12 +69,18 @@ public abstract class AbstractBatchIndexTask extends AbstractTask
 
   private final SegmentLockHelper segmentLockHelper;
 
+  @GuardedBy("this")
+  private final TaskResourceCleaner resourceCloserOnAbnormalExit = new TaskResourceCleaner();
+
   /**
    * State to indicate that this task will use segmentLock or timeChunkLock.
    * This is automatically set when {@link #determineLockGranularityandTryLock} is called.
    */
   private boolean useSegmentLock;
 
+  @GuardedBy("this")
+  private boolean stopped = false;
+
   protected AbstractBatchIndexTask(String id, String dataSource, Map<String, Object> context)
   {
     super(id, dataSource, context);
@@ -89,6 +100,60 @@ public abstract class AbstractBatchIndexTask extends AbstractTask
   }
 
   /**
+   * Run this task. Before running the task, it checks the the current task is already stopped and
+   * registers a cleaner to interrupt the thread running this task on abnormal exits.
+   *
+   * @see #runTask(TaskToolbox)
+   * @see #stopGracefully(TaskConfig)
+   */
+  @Override
+  public TaskStatus run(TaskToolbox toolbox) throws Exception
+  {
+    synchronized (this) {
+      if (stopped) {
+        return TaskStatus.failure(getId());
+      } else {
+        // Register the cleaner to interrupt the current thread first.
+        // Since the resource closer cleans up the registered resources in LIFO order,
+        // this will be executed last on abnormal exists.
+        // The order is sometimes important. For example, Appenderator has two methods of close() and closeNow(), and
+        // closeNow() is supposed to be called on abnormal exits. Interrupting the current thread could lead to close()
+        // to be called indirectly, e.g., for Appenderators in try-with-resources. In this case, closeNow() should be
+        // called before the current thread is interrupted, so that subsequent close() calls can be ignored.
+        final Thread currentThread = Thread.currentThread();
+        resourceCloserOnAbnormalExit.register(config -> currentThread.interrupt());
+      }
+    }
+    return runTask(toolbox);
+  }
+
+  @Override
+  public void stopGracefully(TaskConfig taskConfig)
+  {
+    synchronized (this) {
+      stopped = true;
+      resourceCloserOnAbnormalExit.clean(taskConfig);
+    }
+  }
+
+  /**
+   * Registers a resource cleaner which is executed on abnormal exits.
+   *
+   * @see Task#stopGracefully
+   */
+  protected void registerResourceCloserOnAbnormalExit(Consumer<TaskConfig> cleaner)
+  {
+    synchronized (this) {
+      resourceCloserOnAbnormalExit.register(cleaner);
+    }
+  }
+
+  /**
+   * The method to acutally process this task. This method is executed in {@link #run(TaskToolbox)}.
+   */
+  public abstract TaskStatus runTask(TaskToolbox toolbox) throws Exception;
+
+  /**
    * Return true if this task can overwrite existing segments.
    */
   public abstract boolean requireLockExistingSegments();
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractFixedIntervalTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractFixedIntervalTask.java
index 9582b98..3b117de 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractFixedIntervalTask.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractFixedIntervalTask.java
@@ -25,6 +25,7 @@ import com.google.common.base.Preconditions;
 import org.apache.druid.indexing.common.TaskLockType;
 import org.apache.druid.indexing.common.actions.TaskActionClient;
 import org.apache.druid.indexing.common.actions.TimeChunkLockTryAcquireAction;
+import org.apache.druid.indexing.common.config.TaskConfig;
 import org.joda.time.Interval;
 
 import java.util.Map;
@@ -80,4 +81,9 @@ public abstract class AbstractFixedIntervalTask extends AbstractTask
   {
     return interval;
   }
+
+  @Override
+  public void stopGracefully(TaskConfig taskConfig)
+  {
+  }
 }
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractTask.java
index 370b30e..c901e68 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractTask.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractTask.java
@@ -28,7 +28,6 @@ import org.apache.druid.indexer.TaskStatus;
 import org.apache.druid.indexing.common.TaskLock;
 import org.apache.druid.indexing.common.actions.LockListAction;
 import org.apache.druid.indexing.common.actions.TaskActionClient;
-import org.apache.druid.indexing.common.config.TaskConfig;
 import org.apache.druid.java.util.common.DateTimes;
 import org.apache.druid.query.Query;
 import org.apache.druid.query.QueryRunner;
@@ -154,16 +153,6 @@ public abstract class AbstractTask implements Task
     return false;
   }
 
-  /**
-   * Should be called independent of canRestore so that resource cleaning can be achieved.
-   * If resource cleaning is required, concrete class should override this method
-   */
-  @Override
-  public void stopGracefully(TaskConfig taskConfig)
-  {
-    // Do nothing and let the concrete class handle it
-  }
-
   @Override
   public String toString()
   {
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java
index 3c22d20..564c965 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java
@@ -437,7 +437,7 @@ public class AppenderatorDriverRealtimeIndexTask extends AbstractTask implements
       }
 
       CloseQuietly.close(firehose);
-      CloseQuietly.close(appenderator);
+      appenderator.close();
       CloseQuietly.close(driver);
 
       toolbox.getMonitorScheduler().removeMonitor(metricsMonitor);
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java
index 8aee62f..4e550f4 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java
@@ -100,6 +100,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.TreeMap;
+import java.util.concurrent.atomic.AtomicReference;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 import java.util.stream.StreamSupport;
@@ -109,6 +110,12 @@ public class CompactionTask extends AbstractBatchIndexTask
   private static final Logger log = new Logger(CompactionTask.class);
   private static final String TYPE = "compact";
 
+  /**
+   * A flag to indicate this task is already stopped and its child indexTasks shouldn't be created.
+   * See {@link #currentRunningTaskSpec} for more details.
+   */
+  private static final Object SPECIAL_VALUE_STOPPED = new Object();
+
   private final Interval interval;
   private final List<DataSegment> segments;
   @Nullable
@@ -146,10 +153,22 @@ public class CompactionTask extends AbstractBatchIndexTask
   private final RetryPolicyFactory retryPolicyFactory;
 
   @JsonIgnore
-  private List<IndexTask> indexTaskSpecs;
+  private final AppenderatorsManager appenderatorsManager;
 
   @JsonIgnore
-  private AppenderatorsManager appenderatorsManager;
+  private List<IndexTask> indexTaskSpecs;
+
+  /**
+   * Reference to the sub-task that is currently running.
+   *
+   * When {@link #stopGracefully} is called, the compaction task gets the reference to the current running task,
+   * and calls {@link #stopGracefully} for that task. This reference will be updated to {@link #SPECIAL_VALUE_STOPPED}.
+   *
+   * Note that {@link #stopGracefully} can be called at any time during {@link #run}. Calling {@link #stopGracefully}
+   * on the current running task and setting this reference to {@link #SPECIAL_VALUE_STOPPED} should be done atomically.
+   */
+  @Nullable
+  private final AtomicReference<Object> currentRunningTaskSpec = new AtomicReference<>();
 
   @JsonCreator
   public CompactionTask(
@@ -289,7 +308,7 @@ public class CompactionTask extends AbstractBatchIndexTask
   }
 
   @Override
-  public TaskStatus run(final TaskToolbox toolbox) throws Exception
+  public TaskStatus runTask(TaskToolbox toolbox) throws Exception
   {
     if (indexTaskSpecs == null) {
       final List<IndexIngestionSpec> ingestionSpecs = createIngestionSchema(
@@ -330,12 +349,24 @@ public class CompactionTask extends AbstractBatchIndexTask
       log.info("Generated [%d] compaction task specs", totalNumSpecs);
 
       int failCnt = 0;
+      registerResourceCloserOnAbnormalExit(config -> {
+        Object currentRunningTask = currentRunningTaskSpec.getAndSet(SPECIAL_VALUE_STOPPED);
+        if (currentRunningTask != null) {
+          ((IndexTask) currentRunningTask).stopGracefully(config);
+        }
+      });
       for (IndexTask eachSpec : indexTaskSpecs) {
+        Object prevSpec = currentRunningTaskSpec.get();
+        //noinspection ObjectEquality
+        if (prevSpec == SPECIAL_VALUE_STOPPED || !currentRunningTaskSpec.compareAndSet(prevSpec, eachSpec)) {
+          log.info("Task is asked to stop. Finish as failed.");
+          return TaskStatus.failure(getId());
+        }
         final String json = jsonMapper.writerWithDefaultPrettyPrinter().writeValueAsString(eachSpec);
-        log.info("Running indexSpec: " + json);
 
         try {
           if (eachSpec.isReady(toolbox.getTaskActionClient())) {
+            log.info("Running indexSpec: " + json);
             final TaskStatus eachResult = eachSpec.run(toolbox);
             if (!eachResult.isSuccess()) {
               failCnt++;
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/HadoopIndexTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/HadoopIndexTask.java
index 7557242..86ac882 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/HadoopIndexTask.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/HadoopIndexTask.java
@@ -128,13 +128,6 @@ public class HadoopIndexTask extends HadoopTask implements ChatHandler
   @JsonIgnore
   private String errorMsg;
 
-  @JsonIgnore
-  private Thread runThread;
-
-  @JsonIgnore
-  private boolean stopped = false;
-
-
   /**
    * @param spec is used by the HadoopDruidIndexerJob to set up the appropriate parameters
    *             for creating Druid index segments. It may be modified.
@@ -264,22 +257,24 @@ public class HadoopIndexTask extends HadoopTask implements ChatHandler
     return classpathPrefix;
   }
 
-  public String getHadoopJobIdFileName()
+  private String getHadoopJobIdFileName()
   {
-    return new File(taskConfig.getTaskDir(getId()), HADOOP_JOB_ID_FILENAME).getAbsolutePath();
+    return getHadoopJobIdFile().getAbsolutePath();
   }
 
-  @Override
-  public TaskStatus run(TaskToolbox toolbox)
+  private boolean hadoopJobIdFileExists()
   {
-    synchronized (this) {
-      if (stopped) {
-        return TaskStatus.failure(getId());
-      } else {
-        runThread = Thread.currentThread();
-      }
-    }
+    return getHadoopJobIdFile().exists();
+  }
 
+  private File getHadoopJobIdFile()
+  {
+    return new File(taskConfig.getTaskDir(getId()), HADOOP_JOB_ID_FILENAME);
+  }
+
+  @Override
+  public TaskStatus runTask(TaskToolbox toolbox)
+  {
     try {
       taskConfig = toolbox.getConfig();
       if (chatHandlerProvider.isPresent()) {
@@ -319,6 +314,7 @@ public class HadoopIndexTask extends HadoopTask implements ChatHandler
   @SuppressWarnings("unchecked")
   private TaskStatus runInternal(TaskToolbox toolbox) throws Exception
   {
+    registerResourceCloserOnAbnormalExit(config -> killHadoopJob());
     String hadoopJobIdFile = getHadoopJobIdFileName();
     final ClassLoader loader = buildClassLoader(toolbox);
     boolean determineIntervals = !spec.getDataSchema().getGranularitySpec().bucketIntervals().isPresent();
@@ -475,33 +471,25 @@ public class HadoopIndexTask extends HadoopTask implements ChatHandler
     }
   }
 
-  @Override
-  public void stopGracefully(TaskConfig taskConfig)
+  private void killHadoopJob()
   {
-    synchronized (this) {
-      stopped = true;
-      if (runThread == null) {
-        // didn't actually start, just return
-        return;
-      }
-    }
     // To avoid issue of kill command once the ingestion task is actually completed
-    if (!ingestionState.equals(IngestionState.COMPLETED)) {
+    if (hadoopJobIdFileExists() && !ingestionState.equals(IngestionState.COMPLETED)) {
       final ClassLoader oldLoader = Thread.currentThread().getContextClassLoader();
       String hadoopJobIdFile = getHadoopJobIdFileName();
 
       try {
-        ClassLoader loader = HadoopTask.buildClassLoader(getHadoopDependencyCoordinates(),
-            taskConfig.getDefaultHadoopCoordinates());
+        ClassLoader loader = HadoopTask.buildClassLoader(
+            getHadoopDependencyCoordinates(),
+            taskConfig.getDefaultHadoopCoordinates()
+        );
 
         Object killMRJobInnerProcessingRunner = getForeignClassloaderObject(
             "org.apache.druid.indexing.common.task.HadoopIndexTask$HadoopKillMRJobIdProcessingRunner",
             loader
         );
 
-        String[] buildKillJobInput = new String[]{
-            hadoopJobIdFile
-        };
+        String[] buildKillJobInput = new String[]{hadoopJobIdFile};
 
         Class<?> buildKillJobRunnerClass = killMRJobInnerProcessingRunner.getClass();
         Method innerProcessingRunTask = buildKillJobRunnerClass.getMethod("runTask", buildKillJobInput.getClass());
@@ -519,7 +507,6 @@ public class HadoopIndexTask extends HadoopTask implements ChatHandler
       }
       finally {
         Thread.currentThread().setContextClassLoader(oldLoader);
-        runThread.interrupt();
       }
     }
   }
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java
index 16254de..c314cef 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java
@@ -51,7 +51,6 @@ import org.apache.druid.indexing.common.TaskReport;
 import org.apache.druid.indexing.common.TaskToolbox;
 import org.apache.druid.indexing.common.actions.SegmentTransactionalInsertAction;
 import org.apache.druid.indexing.common.actions.TaskActionClient;
-import org.apache.druid.indexing.common.config.TaskConfig;
 import org.apache.druid.indexing.common.stats.RowIngestionMeters;
 import org.apache.druid.indexing.common.stats.RowIngestionMetersFactory;
 import org.apache.druid.indexing.common.stats.TaskRealtimeMetricsMonitor;
@@ -178,15 +177,6 @@ public class IndexTask extends AbstractBatchIndexTask implements ChatHandler
   @JsonIgnore
   private final AppenderatorsManager appenderatorsManager;
 
-  @JsonIgnore
-  private Thread runThread;
-
-  @JsonIgnore
-  private boolean stopped = false;
-
-  @JsonIgnore
-  private Appenderator appenderator;
-
   @JsonCreator
   public IndexTask(
       @JsonProperty("id") final String id,
@@ -421,16 +411,8 @@ public class IndexTask extends AbstractBatchIndexTask implements ChatHandler
   }
 
   @Override
-  public TaskStatus run(final TaskToolbox toolbox)
+  public TaskStatus runTask(final TaskToolbox toolbox)
   {
-    synchronized (this) {
-      if (stopped) {
-        return TaskStatus.failure(getId());
-      } else {
-        runThread = Thread.currentThread();
-      }
-    }
-
     try {
       if (chatHandlerProvider.isPresent()) {
         log.info("Found chat handler of class[%s]", chatHandlerProvider.get().getClass().getName());
@@ -513,23 +495,6 @@ public class IndexTask extends AbstractBatchIndexTask implements ChatHandler
     }
   }
 
-  @Override
-  public void stopGracefully(TaskConfig taskConfig)
-  {
-    synchronized (this) {
-      stopped = true;
-      // Nothing else to do for native batch except terminate
-      if (ingestionState != IngestionState.COMPLETED) {
-        if (appenderator != null) {
-          appenderator.closeNow();
-        }
-        if (runThread != null) {
-          runThread.interrupt();
-        }
-      }
-    }
-  }
-
   private Map<String, TaskReport> getTaskCompletionReports()
   {
     return TaskReport.buildTaskReports(
@@ -925,17 +890,17 @@ public class IndexTask extends AbstractBatchIndexTask implements ChatHandler
         toolbox.getTaskActionClient()
                .submit(SegmentTransactionalInsertAction.overwriteAction(segmentsToBeOverwritten, segmentsToPublish));
 
+    final Appenderator appenderator = newAppenderator(
+        buildSegmentsFireDepartmentMetrics,
+        toolbox,
+        dataSchema,
+        tuningConfig
+    );
+    boolean exceptionOccurred = false;
     try (
-        final Appenderator appenderator = newAppenderator(
-            buildSegmentsFireDepartmentMetrics,
-            toolbox,
-            dataSchema,
-            tuningConfig
-        );
         final BatchAppenderatorDriver driver = newDriver(appenderator, toolbox, segmentAllocator);
         final Firehose firehose = firehoseFactory.connect(dataSchema.getParser(), firehoseTempDir)
     ) {
-      this.appenderator = appenderator;
 
       driver.startJob();
 
@@ -1007,6 +972,7 @@ public class IndexTask extends AbstractBatchIndexTask implements ChatHandler
                                              : null;
       // Probably we can publish atomicUpdateGroup along with segments.
       final SegmentsAndMetadata published = awaitPublish(driver.publishAll(inputSegments, publisher), pushTimeout);
+      appenderator.close();
 
       ingestionState = IngestionState.COMPLETED;
       if (published == null) {
@@ -1031,8 +997,20 @@ public class IndexTask extends AbstractBatchIndexTask implements ChatHandler
       }
     }
     catch (TimeoutException | ExecutionException e) {
+      exceptionOccurred = true;
       throw new RuntimeException(e);
     }
+    catch (Exception e) {
+      exceptionOccurred = true;
+      throw e;
+    }
+    finally {
+      if (exceptionOccurred) {
+        appenderator.closeNow();
+      } else {
+        appenderator.close();
+      }
+    }
   }
 
   private void handleParseException(ParseException e)
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/NoopTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/NoopTask.java
index 1bbe4ee..8fcf252 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/NoopTask.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/NoopTask.java
@@ -28,6 +28,7 @@ import org.apache.druid.data.input.FirehoseFactory;
 import org.apache.druid.indexer.TaskStatus;
 import org.apache.druid.indexing.common.TaskToolbox;
 import org.apache.druid.indexing.common.actions.TaskActionClient;
+import org.apache.druid.indexing.common.config.TaskConfig;
 import org.apache.druid.java.util.common.DateTimes;
 import org.apache.druid.java.util.common.ISE;
 import org.apache.druid.java.util.common.StringUtils;
@@ -139,6 +140,11 @@ public class NoopTask extends AbstractTask
   }
 
   @Override
+  public void stopGracefully(TaskConfig taskConfig)
+  {
+  }
+
+  @Override
   public TaskStatus run(TaskToolbox toolbox) throws Exception
   {
     if (firehoseFactory != null) {
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/Task.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/Task.java
index 45aabd8..097bf74 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/Task.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/Task.java
@@ -161,8 +161,22 @@ public interface Task
   boolean canRestore();
 
   /**
-   * Asks a task to arrange for its "run" method to exit promptly. Tasks that take too long to stop gracefully will be terminated with
-   * extreme prejudice.
+   * Asks a task to arrange for its "run" method to exit promptly. Tasks that take too long to stop gracefully will be
+   * terminated with extreme prejudice.
+   *
+   * This method can be called at any time no matter when {@link #run} is executed. Regardless of when this method is
+   * called with respect to {@link #run}, its implementations must not allow a resource leak or lingering executions
+   * (local or remote).
+   *
+   * Depending on the task executor type, one of the two cases below can happen when the task is killed.
+   * - When the task is executed by a middleManager, {@link org.apache.druid.indexing.overlord.ForkingTaskRunner} kills
+   *   the process running the task, which triggers
+   *   {@link org.apache.druid.indexing.overlord.SingleTaskBackgroundRunner#stop}.
+   * - When the task is executed by an indexer, {@link org.apache.druid.indexing.overlord.ThreadingTaskRunner#shutdown}
+   *   calls this method directly.
+   *
+   * If the task has some resources to clean up on abnormal exit, e.g., sub tasks of parallel indexing task
+   * or Hadoop jobs spawned by Hadoop indexing tasks, those resource cleanups should be done in this method.
    *
    * @param taskConfig TaskConfig for this task
    */
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/TaskResourceCleaner.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/TaskResourceCleaner.java
new file mode 100644
index 0000000..a96cd70
--- /dev/null
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/TaskResourceCleaner.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.common.task;
+
+import org.apache.druid.indexing.common.config.TaskConfig;
+
+import java.util.ArrayDeque;
+import java.util.Deque;
+import java.util.function.Consumer;
+
+/**
+ * Executes all registered {@link Consumer}s in LIFO order.
+ * Similar to {@link org.apache.druid.java.util.common.io.Closer}, but this class is tweaked to be used in
+ * {@link Task#stopGracefully(TaskConfig)}.
+ */
+public class TaskResourceCleaner
+{
+  private final Deque<Consumer<TaskConfig>> stack = new ArrayDeque<>(4);
+
+  public void register(Consumer<TaskConfig> cleaner)
+  {
+    stack.addFirst(cleaner);
+  }
+
+  public void clean(TaskConfig config)
+  {
+    Throwable throwable = null;
+
+    // Clean up in LIFO order
+    while (!stack.isEmpty()) {
+      final Consumer<TaskConfig> cleaner = stack.removeFirst();
+      try {
+        cleaner.accept(config);
+      }
+      catch (Throwable t) {
+        if (throwable == null) {
+          throwable = t;
+        } else {
+          suppress(throwable, t);
+        }
+      }
+    }
+
+    if (throwable != null) {
+      throw new RuntimeException(throwable);
+    }
+  }
+
+  private void suppress(Throwable thrown, Throwable suppressed)
+  {
+    //noinspection ObjectEquality
+    if (thrown != suppressed) {
+      thrown.addSuppressed(suppressed);
+    }
+  }
+}
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSubTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSubTask.java
index fb902cb..3e3cf80 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSubTask.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSubTask.java
@@ -38,7 +38,6 @@ import org.apache.druid.indexing.common.TaskToolbox;
 import org.apache.druid.indexing.common.actions.SegmentAllocateAction;
 import org.apache.druid.indexing.common.actions.SurrogateAction;
 import org.apache.druid.indexing.common.actions.TaskActionClient;
-import org.apache.druid.indexing.common.config.TaskConfig;
 import org.apache.druid.indexing.common.task.AbstractBatchIndexTask;
 import org.apache.druid.indexing.common.task.ClientBasedTaskInfoProvider;
 import org.apache.druid.indexing.common.task.IndexTask;
@@ -107,10 +106,6 @@ public class ParallelIndexSubTask extends AbstractBatchIndexTask
   private final IndexTaskClientFactory<ParallelIndexTaskClient> taskClientFactory;
   private final AppenderatorsManager appenderatorsManager;
 
-  private Appenderator appenderator;
-  private Thread runThread;
-  private boolean stopped = false;
-
   /**
    * If intervals are missing in the granularitySpec, parallel index task runs in "dynamic locking mode".
    * In this mode, sub tasks ask new locks whenever they see a new row which is not covered by existing locks.
@@ -205,16 +200,8 @@ public class ParallelIndexSubTask extends AbstractBatchIndexTask
   }
 
   @Override
-  public TaskStatus run(final TaskToolbox toolbox) throws Exception
+  public TaskStatus runTask(final TaskToolbox toolbox) throws Exception
   {
-    synchronized (this) {
-      if (stopped) {
-        return TaskStatus.failure(getId());
-      } else {
-        runThread = Thread.currentThread();
-      }
-    }
-
     if (missingIntervalsInOverwriteMode) {
       LOG.warn(
           "Intervals are missing in granularitySpec while this task is potentially overwriting existing segments. "
@@ -425,12 +412,12 @@ public class ParallelIndexSubTask extends AbstractBatchIndexTask
     final boolean explicitIntervals = granularitySpec.bucketIntervals().isPresent();
     final SegmentAllocator segmentAllocator = createSegmentAllocator(toolbox, taskClient);
 
+    final Appenderator appenderator = newAppenderator(fireDepartmentMetrics, toolbox, dataSchema, tuningConfig);
+    boolean exceptionOccurred = false;
     try (
-        final Appenderator appenderator = newAppenderator(fireDepartmentMetrics, toolbox, dataSchema, tuningConfig);
         final BatchAppenderatorDriver driver = newDriver(appenderator, toolbox, segmentAllocator);
         final Firehose firehose = firehoseFactory.connect(dataSchema.getParser(), firehoseTempDir)
     ) {
-      this.appenderator = appenderator;
       driver.startJob();
 
       final Set<DataSegment> pushedSegments = new HashSet<>();
@@ -496,12 +483,25 @@ public class ParallelIndexSubTask extends AbstractBatchIndexTask
       final SegmentsAndMetadata pushed = driver.pushAllAndClear(pushTimeout);
       pushedSegments.addAll(pushed.getSegments());
       LOG.info("Pushed segments[%s]", pushed.getSegments());
+      appenderator.close();
 
       return pushedSegments;
     }
     catch (TimeoutException | ExecutionException e) {
+      exceptionOccurred = true;
       throw new RuntimeException(e);
     }
+    catch (Exception e) {
+      exceptionOccurred = true;
+      throw e;
+    }
+    finally {
+      if (exceptionOccurred) {
+        appenderator.closeNow();
+      } else {
+        appenderator.close();
+      }
+    }
   }
 
   private Appenderator newAppenderator(
@@ -536,18 +536,4 @@ public class ParallelIndexSubTask extends AbstractBatchIndexTask
         toolbox.getDataSegmentKiller()
     );
   }
-
-  @Override
-  public void stopGracefully(TaskConfig taskConfig)
-  {
-    synchronized (this) {
-      stopped = true;
-      if (appenderator != null) {
-        appenderator.closeNow();
-      }
-      if (runThread != null) {
-        runThread.interrupt();
-      }
-    }
-  }
 }
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java
index c8cd3d9..e6ef2c4 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java
@@ -38,7 +38,6 @@ import org.apache.druid.indexing.common.TaskToolbox;
 import org.apache.druid.indexing.common.actions.LockListAction;
 import org.apache.druid.indexing.common.actions.TaskActionClient;
 import org.apache.druid.indexing.common.actions.TimeChunkLockTryAcquireAction;
-import org.apache.druid.indexing.common.config.TaskConfig;
 import org.apache.druid.indexing.common.stats.RowIngestionMetersFactory;
 import org.apache.druid.indexing.common.task.AbstractBatchIndexTask;
 import org.apache.druid.indexing.common.task.IndexTask;
@@ -128,8 +127,6 @@ public class ParallelIndexSupervisorTask extends AbstractBatchIndexTask implemen
   private volatile ParallelIndexTaskRunner runner;
   private volatile IndexTask sequentialIndexTask;
 
-  private boolean stopped = false;
-
   // toolbox is initlized when run() is called, and can be used for processing HTTP endpoint requests.
   private volatile TaskToolbox toolbox;
 
@@ -287,20 +284,7 @@ public class ParallelIndexSupervisorTask extends AbstractBatchIndexTask implemen
   }
 
   @Override
-  public void stopGracefully(TaskConfig taskConfig)
-  {
-    synchronized (this) {
-      stopped = true;
-    }
-    if (runner != null) {
-      runner.stopGracefully();
-    } else if (sequentialIndexTask != null) {
-      sequentialIndexTask.stopGracefully(taskConfig);
-    }
-  }
-
-  @Override
-  public TaskStatus run(TaskToolbox toolbox) throws Exception
+  public TaskStatus runTask(TaskToolbox toolbox) throws Exception
   {
     if (missingIntervalsInOverwriteMode) {
       LOG.warn(
@@ -357,39 +341,31 @@ public class ParallelIndexSupervisorTask extends AbstractBatchIndexTask implemen
 
   private TaskStatus runParallel(TaskToolbox toolbox) throws Exception
   {
-    synchronized (this) {
-      if (stopped) {
-        return TaskStatus.failure(getId());
-      }
-      createRunner(toolbox);
-    }
+    createRunner(toolbox);
+    registerResourceCloserOnAbnormalExit(config -> runner.stopGracefully());
     return TaskStatus.fromCode(getId(), Preconditions.checkNotNull(runner, "runner").run());
   }
 
   private TaskStatus runSequential(TaskToolbox toolbox) throws Exception
   {
-    synchronized (this) {
-      if (stopped) {
-        return TaskStatus.failure(getId());
-      }
-      sequentialIndexTask = new IndexTask(
-          getId(),
-          getGroupId(),
-          getTaskResource(),
-          getDataSource(),
-          new IndexIngestionSpec(
-              getIngestionSchema().getDataSchema(),
-              getIngestionSchema().getIOConfig(),
-              convertToIndexTuningConfig(getIngestionSchema().getTuningConfig())
-          ),
-          getContext(),
-          authorizerMapper,
-          chatHandlerProvider,
-          rowIngestionMetersFactory,
-          appenderatorsManager
-      );
-    }
+    sequentialIndexTask = new IndexTask(
+        getId(),
+        getGroupId(),
+        getTaskResource(),
+        getDataSource(),
+        new IndexIngestionSpec(
+            getIngestionSchema().getDataSchema(),
+            getIngestionSchema().getIOConfig(),
+            convertToIndexTuningConfig(getIngestionSchema().getTuningConfig())
+        ),
+        getContext(),
+        authorizerMapper,
+        chatHandlerProvider,
+        rowIngestionMetersFactory,
+        appenderatorsManager
+    );
     if (sequentialIndexTask.isReady(toolbox.getTaskActionClient())) {
+      registerResourceCloserOnAbnormalExit(config -> sequentialIndexTask.stopGracefully(config));
       return sequentialIndexTask.run(toolbox);
     } else {
       return TaskStatus.failure(getId());
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/TestTasks.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/TestTasks.java
index 7879d87..d501b98 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/common/TestTasks.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/TestTasks.java
@@ -25,6 +25,7 @@ import com.fasterxml.jackson.annotation.JsonTypeName;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.druid.indexer.TaskStatus;
 import org.apache.druid.indexing.common.actions.TaskActionClient;
+import org.apache.druid.indexing.common.config.TaskConfig;
 import org.apache.druid.indexing.common.task.AbstractTask;
 import org.apache.druid.indexing.common.task.Task;
 
@@ -69,6 +70,11 @@ public class TestTasks
     }
 
     @Override
+    public void stopGracefully(TaskConfig taskConfig)
+    {
+    }
+
+    @Override
     public TaskStatus run(TaskToolbox toolbox)
     {
       return TaskStatus.success(getId());
@@ -97,6 +103,11 @@ public class TestTasks
     }
 
     @Override
+    public void stopGracefully(TaskConfig taskConfig)
+    {
+    }
+
+    @Override
     public TaskStatus run(TaskToolbox toolbox) throws Exception
     {
       while (!Thread.currentThread().isInterrupted()) {
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/HadoopTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/HadoopTaskTest.java
index 42c8641..2ec6b12 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/HadoopTaskTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/HadoopTaskTest.java
@@ -70,6 +70,11 @@ public class HadoopTaskTest
       }
 
       @Override
+      public void stopGracefully(TaskConfig taskConfig)
+      {
+      }
+
+      @Override
       public boolean requireLockExistingSegments()
       {
         return true;
@@ -95,7 +100,7 @@ public class HadoopTaskTest
       }
 
       @Override
-      public TaskStatus run(TaskToolbox toolbox)
+      public TaskStatus runTask(TaskToolbox toolbox)
       {
         return null;
       }
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskKillTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskKillTest.java
index 7d90a06..8ddf6cc 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskKillTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskKillTest.java
@@ -40,11 +40,14 @@ import org.apache.druid.query.aggregation.AggregatorFactory;
 import org.apache.druid.query.aggregation.LongSumAggregatorFactory;
 import org.apache.druid.segment.indexing.DataSchema;
 import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec;
+import org.hamcrest.CoreMatchers;
 import org.joda.time.Interval;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
+import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.ExpectedException;
 
 import javax.annotation.Nullable;
 import java.io.IOException;
@@ -52,12 +55,16 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
 import java.util.stream.Stream;
 
 public class ParallelIndexSupervisorTaskKillTest extends AbstractParallelIndexSupervisorTaskTest
 {
+  @Rule
+  public ExpectedException expectedException = ExpectedException.none();
+
   private ExecutorService service;
 
   @Before
@@ -98,7 +105,9 @@ public class ParallelIndexSupervisorTaskKillTest extends AbstractParallelIndexSu
       Thread.sleep(100);
     }
     task.stopGracefully(null);
-    Assert.assertEquals(TaskState.FAILED, future.get());
+    expectedException.expect(ExecutionException.class);
+    expectedException.expectCause(CoreMatchers.instanceOf(InterruptedException.class));
+    future.get();
 
     final TestParallelIndexTaskRunner runner = (TestParallelIndexTaskRunner) task.getRunner();
     Assert.assertTrue(runner.getRunningTaskIds().isEmpty());
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/RealtimeishTask.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/RealtimeishTask.java
index 8724ba9..4014141 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/RealtimeishTask.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/RealtimeishTask.java
@@ -30,6 +30,7 @@ import org.apache.druid.indexing.common.actions.LockReleaseAction;
 import org.apache.druid.indexing.common.actions.SegmentInsertAction;
 import org.apache.druid.indexing.common.actions.TaskActionClient;
 import org.apache.druid.indexing.common.actions.TimeChunkLockAcquireAction;
+import org.apache.druid.indexing.common.config.TaskConfig;
 import org.apache.druid.indexing.common.task.AbstractTask;
 import org.apache.druid.indexing.common.task.TaskResource;
 import org.apache.druid.java.util.common.Intervals;
@@ -61,6 +62,11 @@ public class RealtimeishTask extends AbstractTask
   }
 
   @Override
+  public void stopGracefully(TaskConfig taskConfig)
+  {
+  }
+
+  @Override
   public TaskStatus run(TaskToolbox toolbox) throws Exception
   {
     final Interval interval1 = Intervals.of("2010-01-01T00/PT1H");
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java
index f7dbfde..475b8ef 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java
@@ -948,6 +948,11 @@ public class TaskLifecycleTest
       }
 
       @Override
+      public void stopGracefully(TaskConfig taskConfig)
+      {
+      }
+
+      @Override
       public TaskStatus run(TaskToolbox toolbox) throws Exception
       {
         final Interval interval = Intervals.of("2012-01-01/P1D");
@@ -991,6 +996,11 @@ public class TaskLifecycleTest
       }
 
       @Override
+      public void stopGracefully(TaskConfig taskConfig)
+      {
+      }
+
+      @Override
       public TaskStatus run(TaskToolbox toolbox) throws Exception
       {
         final TaskLock myLock = Iterables.getOnlyElement(toolbox.getTaskActionClient().submit(new LockListAction()));
@@ -1025,6 +1035,11 @@ public class TaskLifecycleTest
       }
 
       @Override
+      public void stopGracefully(TaskConfig taskConfig)
+      {
+      }
+
+      @Override
       public TaskStatus run(TaskToolbox toolbox) throws Exception
       {
         final TaskLock myLock = Iterables.getOnlyElement(toolbox.getTaskActionClient().submit(new LockListAction()));
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockboxTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockboxTest.java
index 004c15b..db0fe75 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockboxTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockboxTest.java
@@ -34,6 +34,7 @@ import org.apache.druid.indexing.common.TaskLockType;
 import org.apache.druid.indexing.common.TaskToolbox;
 import org.apache.druid.indexing.common.TimeChunkLock;
 import org.apache.druid.indexing.common.actions.TaskActionClient;
+import org.apache.druid.indexing.common.config.TaskConfig;
 import org.apache.druid.indexing.common.config.TaskStorageConfig;
 import org.apache.druid.indexing.common.task.AbstractTask;
 import org.apache.druid.indexing.common.task.NoopTask;
@@ -1233,6 +1234,11 @@ public class TaskLockboxTest
     }
 
     @Override
+    public void stopGracefully(TaskConfig taskConfig)
+    {
+    }
+
+    @Override
     public TaskStatus run(TaskToolbox toolbox)
     {
       return TaskStatus.failure("how?");
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordResourceTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordResourceTest.java
index 3a58c0c..1591925 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordResourceTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordResourceTest.java
@@ -31,6 +31,7 @@ import org.apache.druid.indexer.TaskStatus;
 import org.apache.druid.indexer.TaskStatusPlus;
 import org.apache.druid.indexing.common.TaskToolbox;
 import org.apache.druid.indexing.common.actions.TaskActionClient;
+import org.apache.druid.indexing.common.config.TaskConfig;
 import org.apache.druid.indexing.common.task.AbstractTask;
 import org.apache.druid.indexing.common.task.NoopTask;
 import org.apache.druid.indexing.common.task.Task;
@@ -1280,6 +1281,11 @@ public class OverlordResourceTest
       }
 
       @Override
+      public void stopGracefully(TaskConfig taskConfig)
+      {
+      }
+
+      @Override
       public TaskStatus run(TaskToolbox toolbox)
       {
         return null;
diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/Appenderator.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/Appenderator.java
index 7ad67ec..ce5ac8f 100644
--- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/Appenderator.java
+++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/Appenderator.java
@@ -29,7 +29,6 @@ import org.apache.druid.query.QuerySegmentWalker;
 import org.apache.druid.segment.incremental.IndexSizeExceededException;
 
 import javax.annotation.Nullable;
-import java.io.Closeable;
 import java.util.Collection;
 import java.util.List;
 
@@ -46,7 +45,7 @@ import java.util.List;
  * all methods of the data appending and indexing lifecycle except {@link #drop} must be called from a single thread.
  * Methods inherited from {@link QuerySegmentWalker} can be called concurrently from multiple threads.
  */
-public interface Appenderator extends QuerySegmentWalker, Closeable
+public interface Appenderator extends QuerySegmentWalker
 {
   /**
    * Return the name of the dataSource associated with this Appenderator.
@@ -200,7 +199,6 @@ public interface Appenderator extends QuerySegmentWalker, Closeable
    * pushes to finish. This will not remove any on-disk persisted data, but it will drop any data that has not yet been
    * persisted.
    */
-  @Override
   void close();
 
   /**
diff --git a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/DefaultOfflineAppenderatorFactoryTest.java b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/DefaultOfflineAppenderatorFactoryTest.java
index b92bd12..fed1d70 100644
--- a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/DefaultOfflineAppenderatorFactoryTest.java
+++ b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/DefaultOfflineAppenderatorFactoryTest.java
@@ -154,11 +154,12 @@ public class DefaultOfflineAppenderatorFactoryTest
         null
     );
 
-    try (Appenderator appenderator = defaultOfflineAppenderatorFactory.build(
+    Appenderator appenderator = defaultOfflineAppenderatorFactory.build(
         schema,
         tuningConfig,
         new FireDepartmentMetrics()
-    )) {
+    );
+    try {
       Assert.assertEquals("dataSourceName", appenderator.getDataSource());
       Assert.assertEquals(null, appenderator.startJob());
       SegmentIdWithShardSpec identifier = new SegmentIdWithShardSpec(
@@ -175,5 +176,8 @@ public class DefaultOfflineAppenderatorFactoryTest
       appenderator.close();
       Assert.assertEquals(0, ((AppenderatorImpl) appenderator).getRowsInMemory());
     }
+    finally {
+      appenderator.close();
+    }
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org