You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by jo...@apache.org on 2019/08/13 00:42:16 UTC
[incubator-druid] branch master updated: Add TaskResourceCleaner;
fix a couple of concurrency bugs in batch tasks (#8236)
This is an automated email from the ASF dual-hosted git repository.
jonwei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-druid.git
The following commit(s) were added to refs/heads/master by this push:
new 312cdc2 Add TaskResourceCleaner; fix a couple of concurrency bugs in batch tasks (#8236)
312cdc2 is described below
commit 312cdc245239aa1411665e91eaeeadfd13301d25
Author: Jihoon Son <ji...@apache.org>
AuthorDate: Mon Aug 12 17:42:06 2019 -0700
Add TaskResourceCleaner; fix a couple of concurrency bugs in batch tasks (#8236)
* Add TaskResourceCleaner; fix a couple of concurrency bugs in batch tasks
* kill runner when it's ready
* add comment
* kill run thread
* fix test
* Take closeable out of Appenderator
* add javadoc
* fix test
* fix test
* update javadoc
* add javadoc about killed task
* address comment
* handling missing exceptions
* more clear javadoc for stopGracefully
* update javadoc
* Add missing statement in javadoc
* typo
---
.../common/task/AbstractBatchIndexTask.java | 65 +++++++++++++++++++
.../common/task/AbstractFixedIntervalTask.java | 6 ++
.../druid/indexing/common/task/AbstractTask.java | 11 ----
.../task/AppenderatorDriverRealtimeIndexTask.java | 2 +-
.../druid/indexing/common/task/CompactionTask.java | 39 ++++++++++--
.../indexing/common/task/HadoopIndexTask.java | 55 +++++++---------
.../druid/indexing/common/task/IndexTask.java | 64 +++++++------------
.../druid/indexing/common/task/NoopTask.java | 6 ++
.../apache/druid/indexing/common/task/Task.java | 18 +++++-
.../indexing/common/task/TaskResourceCleaner.java | 73 ++++++++++++++++++++++
.../task/batch/parallel/ParallelIndexSubTask.java | 46 +++++---------
.../parallel/ParallelIndexSupervisorTask.java | 64 ++++++-------------
.../apache/druid/indexing/common/TestTasks.java | 11 ++++
.../druid/indexing/common/task/HadoopTaskTest.java | 7 ++-
.../ParallelIndexSupervisorTaskKillTest.java | 11 +++-
.../druid/indexing/overlord/RealtimeishTask.java | 6 ++
.../druid/indexing/overlord/TaskLifecycleTest.java | 15 +++++
.../druid/indexing/overlord/TaskLockboxTest.java | 6 ++
.../overlord/http/OverlordResourceTest.java | 6 ++
.../realtime/appenderator/Appenderator.java | 4 +-
.../DefaultOfflineAppenderatorFactoryTest.java | 8 ++-
21 files changed, 347 insertions(+), 176 deletions(-)
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java
index b00c646..56dccbe 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java
@@ -22,12 +22,15 @@ package org.apache.druid.indexing.common.task;
import com.google.common.base.Preconditions;
import com.google.common.collect.Iterables;
import org.apache.druid.data.input.FirehoseFactory;
+import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexing.common.LockGranularity;
import org.apache.druid.indexing.common.TaskLock;
import org.apache.druid.indexing.common.TaskLockType;
+import org.apache.druid.indexing.common.TaskToolbox;
import org.apache.druid.indexing.common.actions.SegmentListUsedAction;
import org.apache.druid.indexing.common.actions.TaskActionClient;
import org.apache.druid.indexing.common.actions.TimeChunkLockTryAcquireAction;
+import org.apache.druid.indexing.common.config.TaskConfig;
import org.apache.druid.indexing.firehose.IngestSegmentFirehoseFactory;
import org.apache.druid.indexing.firehose.WindowedSegmentId;
import org.apache.druid.java.util.common.JodaUtils;
@@ -43,6 +46,7 @@ import org.joda.time.Interval;
import org.joda.time.Period;
import javax.annotation.Nullable;
+import javax.annotation.concurrent.GuardedBy;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
@@ -50,6 +54,7 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.function.Consumer;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
@@ -64,12 +69,18 @@ public abstract class AbstractBatchIndexTask extends AbstractTask
private final SegmentLockHelper segmentLockHelper;
+ @GuardedBy("this")
+ private final TaskResourceCleaner resourceCloserOnAbnormalExit = new TaskResourceCleaner();
+
/**
* State to indicate that this task will use segmentLock or timeChunkLock.
* This is automatically set when {@link #determineLockGranularityandTryLock} is called.
*/
private boolean useSegmentLock;
+ @GuardedBy("this")
+ private boolean stopped = false;
+
protected AbstractBatchIndexTask(String id, String dataSource, Map<String, Object> context)
{
super(id, dataSource, context);
@@ -89,6 +100,60 @@ public abstract class AbstractBatchIndexTask extends AbstractTask
}
/**
+ * Run this task. Before running the task, it checks the the current task is already stopped and
+ * registers a cleaner to interrupt the thread running this task on abnormal exits.
+ *
+ * @see #runTask(TaskToolbox)
+ * @see #stopGracefully(TaskConfig)
+ */
+ @Override
+ public TaskStatus run(TaskToolbox toolbox) throws Exception
+ {
+ synchronized (this) {
+ if (stopped) {
+ return TaskStatus.failure(getId());
+ } else {
+ // Register the cleaner to interrupt the current thread first.
+ // Since the resource closer cleans up the registered resources in LIFO order,
+ // this will be executed last on abnormal exists.
+ // The order is sometimes important. For example, Appenderator has two methods of close() and closeNow(), and
+ // closeNow() is supposed to be called on abnormal exits. Interrupting the current thread could lead to close()
+ // to be called indirectly, e.g., for Appenderators in try-with-resources. In this case, closeNow() should be
+ // called before the current thread is interrupted, so that subsequent close() calls can be ignored.
+ final Thread currentThread = Thread.currentThread();
+ resourceCloserOnAbnormalExit.register(config -> currentThread.interrupt());
+ }
+ }
+ return runTask(toolbox);
+ }
+
+ @Override
+ public void stopGracefully(TaskConfig taskConfig)
+ {
+ synchronized (this) {
+ stopped = true;
+ resourceCloserOnAbnormalExit.clean(taskConfig);
+ }
+ }
+
+ /**
+ * Registers a resource cleaner which is executed on abnormal exits.
+ *
+ * @see Task#stopGracefully
+ */
+ protected void registerResourceCloserOnAbnormalExit(Consumer<TaskConfig> cleaner)
+ {
+ synchronized (this) {
+ resourceCloserOnAbnormalExit.register(cleaner);
+ }
+ }
+
+ /**
+ * The method to acutally process this task. This method is executed in {@link #run(TaskToolbox)}.
+ */
+ public abstract TaskStatus runTask(TaskToolbox toolbox) throws Exception;
+
+ /**
* Return true if this task can overwrite existing segments.
*/
public abstract boolean requireLockExistingSegments();
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractFixedIntervalTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractFixedIntervalTask.java
index 9582b98..3b117de 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractFixedIntervalTask.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractFixedIntervalTask.java
@@ -25,6 +25,7 @@ import com.google.common.base.Preconditions;
import org.apache.druid.indexing.common.TaskLockType;
import org.apache.druid.indexing.common.actions.TaskActionClient;
import org.apache.druid.indexing.common.actions.TimeChunkLockTryAcquireAction;
+import org.apache.druid.indexing.common.config.TaskConfig;
import org.joda.time.Interval;
import java.util.Map;
@@ -80,4 +81,9 @@ public abstract class AbstractFixedIntervalTask extends AbstractTask
{
return interval;
}
+
+ @Override
+ public void stopGracefully(TaskConfig taskConfig)
+ {
+ }
}
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractTask.java
index 370b30e..c901e68 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractTask.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractTask.java
@@ -28,7 +28,6 @@ import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexing.common.TaskLock;
import org.apache.druid.indexing.common.actions.LockListAction;
import org.apache.druid.indexing.common.actions.TaskActionClient;
-import org.apache.druid.indexing.common.config.TaskConfig;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.query.Query;
import org.apache.druid.query.QueryRunner;
@@ -154,16 +153,6 @@ public abstract class AbstractTask implements Task
return false;
}
- /**
- * Should be called independent of canRestore so that resource cleaning can be achieved.
- * If resource cleaning is required, concrete class should override this method
- */
- @Override
- public void stopGracefully(TaskConfig taskConfig)
- {
- // Do nothing and let the concrete class handle it
- }
-
@Override
public String toString()
{
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java
index 3c22d20..564c965 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java
@@ -437,7 +437,7 @@ public class AppenderatorDriverRealtimeIndexTask extends AbstractTask implements
}
CloseQuietly.close(firehose);
- CloseQuietly.close(appenderator);
+ appenderator.close();
CloseQuietly.close(driver);
toolbox.getMonitorScheduler().removeMonitor(metricsMonitor);
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java
index 8aee62f..4e550f4 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java
@@ -100,6 +100,7 @@ import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.TreeMap;
+import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.StreamSupport;
@@ -109,6 +110,12 @@ public class CompactionTask extends AbstractBatchIndexTask
private static final Logger log = new Logger(CompactionTask.class);
private static final String TYPE = "compact";
+ /**
+ * A flag to indicate this task is already stopped and its child indexTasks shouldn't be created.
+ * See {@link #currentRunningTaskSpec} for more details.
+ */
+ private static final Object SPECIAL_VALUE_STOPPED = new Object();
+
private final Interval interval;
private final List<DataSegment> segments;
@Nullable
@@ -146,10 +153,22 @@ public class CompactionTask extends AbstractBatchIndexTask
private final RetryPolicyFactory retryPolicyFactory;
@JsonIgnore
- private List<IndexTask> indexTaskSpecs;
+ private final AppenderatorsManager appenderatorsManager;
@JsonIgnore
- private AppenderatorsManager appenderatorsManager;
+ private List<IndexTask> indexTaskSpecs;
+
+ /**
+ * Reference to the sub-task that is currently running.
+ *
+ * When {@link #stopGracefully} is called, the compaction task gets the reference to the current running task,
+ * and calls {@link #stopGracefully} for that task. This reference will be updated to {@link #SPECIAL_VALUE_STOPPED}.
+ *
+ * Note that {@link #stopGracefully} can be called at any time during {@link #run}. Calling {@link #stopGracefully}
+ * on the current running task and setting this reference to {@link #SPECIAL_VALUE_STOPPED} should be done atomically.
+ */
+ @Nullable
+ private final AtomicReference<Object> currentRunningTaskSpec = new AtomicReference<>();
@JsonCreator
public CompactionTask(
@@ -289,7 +308,7 @@ public class CompactionTask extends AbstractBatchIndexTask
}
@Override
- public TaskStatus run(final TaskToolbox toolbox) throws Exception
+ public TaskStatus runTask(TaskToolbox toolbox) throws Exception
{
if (indexTaskSpecs == null) {
final List<IndexIngestionSpec> ingestionSpecs = createIngestionSchema(
@@ -330,12 +349,24 @@ public class CompactionTask extends AbstractBatchIndexTask
log.info("Generated [%d] compaction task specs", totalNumSpecs);
int failCnt = 0;
+ registerResourceCloserOnAbnormalExit(config -> {
+ Object currentRunningTask = currentRunningTaskSpec.getAndSet(SPECIAL_VALUE_STOPPED);
+ if (currentRunningTask != null) {
+ ((IndexTask) currentRunningTask).stopGracefully(config);
+ }
+ });
for (IndexTask eachSpec : indexTaskSpecs) {
+ Object prevSpec = currentRunningTaskSpec.get();
+ //noinspection ObjectEquality
+ if (prevSpec == SPECIAL_VALUE_STOPPED || !currentRunningTaskSpec.compareAndSet(prevSpec, eachSpec)) {
+ log.info("Task is asked to stop. Finish as failed.");
+ return TaskStatus.failure(getId());
+ }
final String json = jsonMapper.writerWithDefaultPrettyPrinter().writeValueAsString(eachSpec);
- log.info("Running indexSpec: " + json);
try {
if (eachSpec.isReady(toolbox.getTaskActionClient())) {
+ log.info("Running indexSpec: " + json);
final TaskStatus eachResult = eachSpec.run(toolbox);
if (!eachResult.isSuccess()) {
failCnt++;
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/HadoopIndexTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/HadoopIndexTask.java
index 7557242..86ac882 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/HadoopIndexTask.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/HadoopIndexTask.java
@@ -128,13 +128,6 @@ public class HadoopIndexTask extends HadoopTask implements ChatHandler
@JsonIgnore
private String errorMsg;
- @JsonIgnore
- private Thread runThread;
-
- @JsonIgnore
- private boolean stopped = false;
-
-
/**
* @param spec is used by the HadoopDruidIndexerJob to set up the appropriate parameters
* for creating Druid index segments. It may be modified.
@@ -264,22 +257,24 @@ public class HadoopIndexTask extends HadoopTask implements ChatHandler
return classpathPrefix;
}
- public String getHadoopJobIdFileName()
+ private String getHadoopJobIdFileName()
{
- return new File(taskConfig.getTaskDir(getId()), HADOOP_JOB_ID_FILENAME).getAbsolutePath();
+ return getHadoopJobIdFile().getAbsolutePath();
}
- @Override
- public TaskStatus run(TaskToolbox toolbox)
+ private boolean hadoopJobIdFileExists()
{
- synchronized (this) {
- if (stopped) {
- return TaskStatus.failure(getId());
- } else {
- runThread = Thread.currentThread();
- }
- }
+ return getHadoopJobIdFile().exists();
+ }
+ private File getHadoopJobIdFile()
+ {
+ return new File(taskConfig.getTaskDir(getId()), HADOOP_JOB_ID_FILENAME);
+ }
+
+ @Override
+ public TaskStatus runTask(TaskToolbox toolbox)
+ {
try {
taskConfig = toolbox.getConfig();
if (chatHandlerProvider.isPresent()) {
@@ -319,6 +314,7 @@ public class HadoopIndexTask extends HadoopTask implements ChatHandler
@SuppressWarnings("unchecked")
private TaskStatus runInternal(TaskToolbox toolbox) throws Exception
{
+ registerResourceCloserOnAbnormalExit(config -> killHadoopJob());
String hadoopJobIdFile = getHadoopJobIdFileName();
final ClassLoader loader = buildClassLoader(toolbox);
boolean determineIntervals = !spec.getDataSchema().getGranularitySpec().bucketIntervals().isPresent();
@@ -475,33 +471,25 @@ public class HadoopIndexTask extends HadoopTask implements ChatHandler
}
}
- @Override
- public void stopGracefully(TaskConfig taskConfig)
+ private void killHadoopJob()
{
- synchronized (this) {
- stopped = true;
- if (runThread == null) {
- // didn't actually start, just return
- return;
- }
- }
// To avoid issue of kill command once the ingestion task is actually completed
- if (!ingestionState.equals(IngestionState.COMPLETED)) {
+ if (hadoopJobIdFileExists() && !ingestionState.equals(IngestionState.COMPLETED)) {
final ClassLoader oldLoader = Thread.currentThread().getContextClassLoader();
String hadoopJobIdFile = getHadoopJobIdFileName();
try {
- ClassLoader loader = HadoopTask.buildClassLoader(getHadoopDependencyCoordinates(),
- taskConfig.getDefaultHadoopCoordinates());
+ ClassLoader loader = HadoopTask.buildClassLoader(
+ getHadoopDependencyCoordinates(),
+ taskConfig.getDefaultHadoopCoordinates()
+ );
Object killMRJobInnerProcessingRunner = getForeignClassloaderObject(
"org.apache.druid.indexing.common.task.HadoopIndexTask$HadoopKillMRJobIdProcessingRunner",
loader
);
- String[] buildKillJobInput = new String[]{
- hadoopJobIdFile
- };
+ String[] buildKillJobInput = new String[]{hadoopJobIdFile};
Class<?> buildKillJobRunnerClass = killMRJobInnerProcessingRunner.getClass();
Method innerProcessingRunTask = buildKillJobRunnerClass.getMethod("runTask", buildKillJobInput.getClass());
@@ -519,7 +507,6 @@ public class HadoopIndexTask extends HadoopTask implements ChatHandler
}
finally {
Thread.currentThread().setContextClassLoader(oldLoader);
- runThread.interrupt();
}
}
}
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java
index 16254de..c314cef 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java
@@ -51,7 +51,6 @@ import org.apache.druid.indexing.common.TaskReport;
import org.apache.druid.indexing.common.TaskToolbox;
import org.apache.druid.indexing.common.actions.SegmentTransactionalInsertAction;
import org.apache.druid.indexing.common.actions.TaskActionClient;
-import org.apache.druid.indexing.common.config.TaskConfig;
import org.apache.druid.indexing.common.stats.RowIngestionMeters;
import org.apache.druid.indexing.common.stats.RowIngestionMetersFactory;
import org.apache.druid.indexing.common.stats.TaskRealtimeMetricsMonitor;
@@ -178,15 +177,6 @@ public class IndexTask extends AbstractBatchIndexTask implements ChatHandler
@JsonIgnore
private final AppenderatorsManager appenderatorsManager;
- @JsonIgnore
- private Thread runThread;
-
- @JsonIgnore
- private boolean stopped = false;
-
- @JsonIgnore
- private Appenderator appenderator;
-
@JsonCreator
public IndexTask(
@JsonProperty("id") final String id,
@@ -421,16 +411,8 @@ public class IndexTask extends AbstractBatchIndexTask implements ChatHandler
}
@Override
- public TaskStatus run(final TaskToolbox toolbox)
+ public TaskStatus runTask(final TaskToolbox toolbox)
{
- synchronized (this) {
- if (stopped) {
- return TaskStatus.failure(getId());
- } else {
- runThread = Thread.currentThread();
- }
- }
-
try {
if (chatHandlerProvider.isPresent()) {
log.info("Found chat handler of class[%s]", chatHandlerProvider.get().getClass().getName());
@@ -513,23 +495,6 @@ public class IndexTask extends AbstractBatchIndexTask implements ChatHandler
}
}
- @Override
- public void stopGracefully(TaskConfig taskConfig)
- {
- synchronized (this) {
- stopped = true;
- // Nothing else to do for native batch except terminate
- if (ingestionState != IngestionState.COMPLETED) {
- if (appenderator != null) {
- appenderator.closeNow();
- }
- if (runThread != null) {
- runThread.interrupt();
- }
- }
- }
- }
-
private Map<String, TaskReport> getTaskCompletionReports()
{
return TaskReport.buildTaskReports(
@@ -925,17 +890,17 @@ public class IndexTask extends AbstractBatchIndexTask implements ChatHandler
toolbox.getTaskActionClient()
.submit(SegmentTransactionalInsertAction.overwriteAction(segmentsToBeOverwritten, segmentsToPublish));
+ final Appenderator appenderator = newAppenderator(
+ buildSegmentsFireDepartmentMetrics,
+ toolbox,
+ dataSchema,
+ tuningConfig
+ );
+ boolean exceptionOccurred = false;
try (
- final Appenderator appenderator = newAppenderator(
- buildSegmentsFireDepartmentMetrics,
- toolbox,
- dataSchema,
- tuningConfig
- );
final BatchAppenderatorDriver driver = newDriver(appenderator, toolbox, segmentAllocator);
final Firehose firehose = firehoseFactory.connect(dataSchema.getParser(), firehoseTempDir)
) {
- this.appenderator = appenderator;
driver.startJob();
@@ -1007,6 +972,7 @@ public class IndexTask extends AbstractBatchIndexTask implements ChatHandler
: null;
// Probably we can publish atomicUpdateGroup along with segments.
final SegmentsAndMetadata published = awaitPublish(driver.publishAll(inputSegments, publisher), pushTimeout);
+ appenderator.close();
ingestionState = IngestionState.COMPLETED;
if (published == null) {
@@ -1031,8 +997,20 @@ public class IndexTask extends AbstractBatchIndexTask implements ChatHandler
}
}
catch (TimeoutException | ExecutionException e) {
+ exceptionOccurred = true;
throw new RuntimeException(e);
}
+ catch (Exception e) {
+ exceptionOccurred = true;
+ throw e;
+ }
+ finally {
+ if (exceptionOccurred) {
+ appenderator.closeNow();
+ } else {
+ appenderator.close();
+ }
+ }
}
private void handleParseException(ParseException e)
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/NoopTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/NoopTask.java
index 1bbe4ee..8fcf252 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/NoopTask.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/NoopTask.java
@@ -28,6 +28,7 @@ import org.apache.druid.data.input.FirehoseFactory;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexing.common.TaskToolbox;
import org.apache.druid.indexing.common.actions.TaskActionClient;
+import org.apache.druid.indexing.common.config.TaskConfig;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.StringUtils;
@@ -139,6 +140,11 @@ public class NoopTask extends AbstractTask
}
@Override
+ public void stopGracefully(TaskConfig taskConfig)
+ {
+ }
+
+ @Override
public TaskStatus run(TaskToolbox toolbox) throws Exception
{
if (firehoseFactory != null) {
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/Task.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/Task.java
index 45aabd8..097bf74 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/Task.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/Task.java
@@ -161,8 +161,22 @@ public interface Task
boolean canRestore();
/**
- * Asks a task to arrange for its "run" method to exit promptly. Tasks that take too long to stop gracefully will be terminated with
- * extreme prejudice.
+ * Asks a task to arrange for its "run" method to exit promptly. Tasks that take too long to stop gracefully will be
+ * terminated with extreme prejudice.
+ *
+ * This method can be called at any time no matter when {@link #run} is executed. Regardless of when this method is
+ * called with respect to {@link #run}, its implementations must not allow a resource leak or lingering executions
+ * (local or remote).
+ *
+ * Depending on the task executor type, one of the two cases below can happen when the task is killed.
+ * - When the task is executed by a middleManager, {@link org.apache.druid.indexing.overlord.ForkingTaskRunner} kills
+ * the process running the task, which triggers
+ * {@link org.apache.druid.indexing.overlord.SingleTaskBackgroundRunner#stop}.
+ * - When the task is executed by an indexer, {@link org.apache.druid.indexing.overlord.ThreadingTaskRunner#shutdown}
+ * calls this method directly.
+ *
+ * If the task has some resources to clean up on abnormal exit, e.g., sub tasks of parallel indexing task
+ * or Hadoop jobs spawned by Hadoop indexing tasks, those resource cleanups should be done in this method.
*
* @param taskConfig TaskConfig for this task
*/
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/TaskResourceCleaner.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/TaskResourceCleaner.java
new file mode 100644
index 0000000..a96cd70
--- /dev/null
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/TaskResourceCleaner.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.common.task;
+
+import org.apache.druid.indexing.common.config.TaskConfig;
+
+import java.util.ArrayDeque;
+import java.util.Deque;
+import java.util.function.Consumer;
+
+/**
+ * Executes all registered {@link Consumer}s in LIFO order.
+ * Similar to {@link org.apache.druid.java.util.common.io.Closer}, but this class is tweaked to be used in
+ * {@link Task#stopGracefully(TaskConfig)}.
+ */
+public class TaskResourceCleaner
+{
+ private final Deque<Consumer<TaskConfig>> stack = new ArrayDeque<>(4);
+
+ public void register(Consumer<TaskConfig> cleaner)
+ {
+ stack.addFirst(cleaner);
+ }
+
+ public void clean(TaskConfig config)
+ {
+ Throwable throwable = null;
+
+ // Clean up in LIFO order
+ while (!stack.isEmpty()) {
+ final Consumer<TaskConfig> cleaner = stack.removeFirst();
+ try {
+ cleaner.accept(config);
+ }
+ catch (Throwable t) {
+ if (throwable == null) {
+ throwable = t;
+ } else {
+ suppress(throwable, t);
+ }
+ }
+ }
+
+ if (throwable != null) {
+ throw new RuntimeException(throwable);
+ }
+ }
+
+ private void suppress(Throwable thrown, Throwable suppressed)
+ {
+ //noinspection ObjectEquality
+ if (thrown != suppressed) {
+ thrown.addSuppressed(suppressed);
+ }
+ }
+}
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSubTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSubTask.java
index fb902cb..3e3cf80 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSubTask.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSubTask.java
@@ -38,7 +38,6 @@ import org.apache.druid.indexing.common.TaskToolbox;
import org.apache.druid.indexing.common.actions.SegmentAllocateAction;
import org.apache.druid.indexing.common.actions.SurrogateAction;
import org.apache.druid.indexing.common.actions.TaskActionClient;
-import org.apache.druid.indexing.common.config.TaskConfig;
import org.apache.druid.indexing.common.task.AbstractBatchIndexTask;
import org.apache.druid.indexing.common.task.ClientBasedTaskInfoProvider;
import org.apache.druid.indexing.common.task.IndexTask;
@@ -107,10 +106,6 @@ public class ParallelIndexSubTask extends AbstractBatchIndexTask
private final IndexTaskClientFactory<ParallelIndexTaskClient> taskClientFactory;
private final AppenderatorsManager appenderatorsManager;
- private Appenderator appenderator;
- private Thread runThread;
- private boolean stopped = false;
-
/**
* If intervals are missing in the granularitySpec, parallel index task runs in "dynamic locking mode".
* In this mode, sub tasks ask new locks whenever they see a new row which is not covered by existing locks.
@@ -205,16 +200,8 @@ public class ParallelIndexSubTask extends AbstractBatchIndexTask
}
@Override
- public TaskStatus run(final TaskToolbox toolbox) throws Exception
+ public TaskStatus runTask(final TaskToolbox toolbox) throws Exception
{
- synchronized (this) {
- if (stopped) {
- return TaskStatus.failure(getId());
- } else {
- runThread = Thread.currentThread();
- }
- }
-
if (missingIntervalsInOverwriteMode) {
LOG.warn(
"Intervals are missing in granularitySpec while this task is potentially overwriting existing segments. "
@@ -425,12 +412,12 @@ public class ParallelIndexSubTask extends AbstractBatchIndexTask
final boolean explicitIntervals = granularitySpec.bucketIntervals().isPresent();
final SegmentAllocator segmentAllocator = createSegmentAllocator(toolbox, taskClient);
+ final Appenderator appenderator = newAppenderator(fireDepartmentMetrics, toolbox, dataSchema, tuningConfig);
+ boolean exceptionOccurred = false;
try (
- final Appenderator appenderator = newAppenderator(fireDepartmentMetrics, toolbox, dataSchema, tuningConfig);
final BatchAppenderatorDriver driver = newDriver(appenderator, toolbox, segmentAllocator);
final Firehose firehose = firehoseFactory.connect(dataSchema.getParser(), firehoseTempDir)
) {
- this.appenderator = appenderator;
driver.startJob();
final Set<DataSegment> pushedSegments = new HashSet<>();
@@ -496,12 +483,25 @@ public class ParallelIndexSubTask extends AbstractBatchIndexTask
final SegmentsAndMetadata pushed = driver.pushAllAndClear(pushTimeout);
pushedSegments.addAll(pushed.getSegments());
LOG.info("Pushed segments[%s]", pushed.getSegments());
+ appenderator.close();
return pushedSegments;
}
catch (TimeoutException | ExecutionException e) {
+ exceptionOccurred = true;
throw new RuntimeException(e);
}
+ catch (Exception e) {
+ exceptionOccurred = true;
+ throw e;
+ }
+ finally {
+ if (exceptionOccurred) {
+ appenderator.closeNow();
+ } else {
+ appenderator.close();
+ }
+ }
}
private Appenderator newAppenderator(
@@ -536,18 +536,4 @@ public class ParallelIndexSubTask extends AbstractBatchIndexTask
toolbox.getDataSegmentKiller()
);
}
-
- @Override
- public void stopGracefully(TaskConfig taskConfig)
- {
- synchronized (this) {
- stopped = true;
- if (appenderator != null) {
- appenderator.closeNow();
- }
- if (runThread != null) {
- runThread.interrupt();
- }
- }
- }
}
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java
index c8cd3d9..e6ef2c4 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java
@@ -38,7 +38,6 @@ import org.apache.druid.indexing.common.TaskToolbox;
import org.apache.druid.indexing.common.actions.LockListAction;
import org.apache.druid.indexing.common.actions.TaskActionClient;
import org.apache.druid.indexing.common.actions.TimeChunkLockTryAcquireAction;
-import org.apache.druid.indexing.common.config.TaskConfig;
import org.apache.druid.indexing.common.stats.RowIngestionMetersFactory;
import org.apache.druid.indexing.common.task.AbstractBatchIndexTask;
import org.apache.druid.indexing.common.task.IndexTask;
@@ -128,8 +127,6 @@ public class ParallelIndexSupervisorTask extends AbstractBatchIndexTask implemen
private volatile ParallelIndexTaskRunner runner;
private volatile IndexTask sequentialIndexTask;
- private boolean stopped = false;
-
// toolbox is initlized when run() is called, and can be used for processing HTTP endpoint requests.
private volatile TaskToolbox toolbox;
@@ -287,20 +284,7 @@ public class ParallelIndexSupervisorTask extends AbstractBatchIndexTask implemen
}
@Override
- public void stopGracefully(TaskConfig taskConfig)
- {
- synchronized (this) {
- stopped = true;
- }
- if (runner != null) {
- runner.stopGracefully();
- } else if (sequentialIndexTask != null) {
- sequentialIndexTask.stopGracefully(taskConfig);
- }
- }
-
- @Override
- public TaskStatus run(TaskToolbox toolbox) throws Exception
+ public TaskStatus runTask(TaskToolbox toolbox) throws Exception
{
if (missingIntervalsInOverwriteMode) {
LOG.warn(
@@ -357,39 +341,31 @@ public class ParallelIndexSupervisorTask extends AbstractBatchIndexTask implemen
private TaskStatus runParallel(TaskToolbox toolbox) throws Exception
{
- synchronized (this) {
- if (stopped) {
- return TaskStatus.failure(getId());
- }
- createRunner(toolbox);
- }
+ createRunner(toolbox);
+ registerResourceCloserOnAbnormalExit(config -> runner.stopGracefully());
return TaskStatus.fromCode(getId(), Preconditions.checkNotNull(runner, "runner").run());
}
private TaskStatus runSequential(TaskToolbox toolbox) throws Exception
{
- synchronized (this) {
- if (stopped) {
- return TaskStatus.failure(getId());
- }
- sequentialIndexTask = new IndexTask(
- getId(),
- getGroupId(),
- getTaskResource(),
- getDataSource(),
- new IndexIngestionSpec(
- getIngestionSchema().getDataSchema(),
- getIngestionSchema().getIOConfig(),
- convertToIndexTuningConfig(getIngestionSchema().getTuningConfig())
- ),
- getContext(),
- authorizerMapper,
- chatHandlerProvider,
- rowIngestionMetersFactory,
- appenderatorsManager
- );
- }
+ sequentialIndexTask = new IndexTask(
+ getId(),
+ getGroupId(),
+ getTaskResource(),
+ getDataSource(),
+ new IndexIngestionSpec(
+ getIngestionSchema().getDataSchema(),
+ getIngestionSchema().getIOConfig(),
+ convertToIndexTuningConfig(getIngestionSchema().getTuningConfig())
+ ),
+ getContext(),
+ authorizerMapper,
+ chatHandlerProvider,
+ rowIngestionMetersFactory,
+ appenderatorsManager
+ );
if (sequentialIndexTask.isReady(toolbox.getTaskActionClient())) {
+ registerResourceCloserOnAbnormalExit(config -> sequentialIndexTask.stopGracefully(config));
return sequentialIndexTask.run(toolbox);
} else {
return TaskStatus.failure(getId());
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/TestTasks.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/TestTasks.java
index 7879d87..d501b98 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/common/TestTasks.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/TestTasks.java
@@ -25,6 +25,7 @@ import com.fasterxml.jackson.annotation.JsonTypeName;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexing.common.actions.TaskActionClient;
+import org.apache.druid.indexing.common.config.TaskConfig;
import org.apache.druid.indexing.common.task.AbstractTask;
import org.apache.druid.indexing.common.task.Task;
@@ -69,6 +70,11 @@ public class TestTasks
}
@Override
+ public void stopGracefully(TaskConfig taskConfig)
+ {
+ }
+
+ @Override
public TaskStatus run(TaskToolbox toolbox)
{
return TaskStatus.success(getId());
@@ -97,6 +103,11 @@ public class TestTasks
}
@Override
+ public void stopGracefully(TaskConfig taskConfig)
+ {
+ }
+
+ @Override
public TaskStatus run(TaskToolbox toolbox) throws Exception
{
while (!Thread.currentThread().isInterrupted()) {
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/HadoopTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/HadoopTaskTest.java
index 42c8641..2ec6b12 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/HadoopTaskTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/HadoopTaskTest.java
@@ -70,6 +70,11 @@ public class HadoopTaskTest
}
@Override
+ public void stopGracefully(TaskConfig taskConfig)
+ {
+ }
+
+ @Override
public boolean requireLockExistingSegments()
{
return true;
@@ -95,7 +100,7 @@ public class HadoopTaskTest
}
@Override
- public TaskStatus run(TaskToolbox toolbox)
+ public TaskStatus runTask(TaskToolbox toolbox)
{
return null;
}
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskKillTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskKillTest.java
index 7d90a06..8ddf6cc 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskKillTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskKillTest.java
@@ -40,11 +40,14 @@ import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.aggregation.LongSumAggregatorFactory;
import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec;
+import org.hamcrest.CoreMatchers;
import org.joda.time.Interval;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
+import org.junit.Rule;
import org.junit.Test;
+import org.junit.rules.ExpectedException;
import javax.annotation.Nullable;
import java.io.IOException;
@@ -52,12 +55,16 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.stream.Stream;
public class ParallelIndexSupervisorTaskKillTest extends AbstractParallelIndexSupervisorTaskTest
{
+ @Rule
+ public ExpectedException expectedException = ExpectedException.none();
+
private ExecutorService service;
@Before
@@ -98,7 +105,9 @@ public class ParallelIndexSupervisorTaskKillTest extends AbstractParallelIndexSu
Thread.sleep(100);
}
task.stopGracefully(null);
- Assert.assertEquals(TaskState.FAILED, future.get());
+ expectedException.expect(ExecutionException.class);
+ expectedException.expectCause(CoreMatchers.instanceOf(InterruptedException.class));
+ future.get();
final TestParallelIndexTaskRunner runner = (TestParallelIndexTaskRunner) task.getRunner();
Assert.assertTrue(runner.getRunningTaskIds().isEmpty());
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/RealtimeishTask.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/RealtimeishTask.java
index 8724ba9..4014141 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/RealtimeishTask.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/RealtimeishTask.java
@@ -30,6 +30,7 @@ import org.apache.druid.indexing.common.actions.LockReleaseAction;
import org.apache.druid.indexing.common.actions.SegmentInsertAction;
import org.apache.druid.indexing.common.actions.TaskActionClient;
import org.apache.druid.indexing.common.actions.TimeChunkLockAcquireAction;
+import org.apache.druid.indexing.common.config.TaskConfig;
import org.apache.druid.indexing.common.task.AbstractTask;
import org.apache.druid.indexing.common.task.TaskResource;
import org.apache.druid.java.util.common.Intervals;
@@ -61,6 +62,11 @@ public class RealtimeishTask extends AbstractTask
}
@Override
+ public void stopGracefully(TaskConfig taskConfig)
+ {
+ }
+
+ @Override
public TaskStatus run(TaskToolbox toolbox) throws Exception
{
final Interval interval1 = Intervals.of("2010-01-01T00/PT1H");
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java
index f7dbfde..475b8ef 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java
@@ -948,6 +948,11 @@ public class TaskLifecycleTest
}
@Override
+ public void stopGracefully(TaskConfig taskConfig)
+ {
+ }
+
+ @Override
public TaskStatus run(TaskToolbox toolbox) throws Exception
{
final Interval interval = Intervals.of("2012-01-01/P1D");
@@ -991,6 +996,11 @@ public class TaskLifecycleTest
}
@Override
+ public void stopGracefully(TaskConfig taskConfig)
+ {
+ }
+
+ @Override
public TaskStatus run(TaskToolbox toolbox) throws Exception
{
final TaskLock myLock = Iterables.getOnlyElement(toolbox.getTaskActionClient().submit(new LockListAction()));
@@ -1025,6 +1035,11 @@ public class TaskLifecycleTest
}
@Override
+ public void stopGracefully(TaskConfig taskConfig)
+ {
+ }
+
+ @Override
public TaskStatus run(TaskToolbox toolbox) throws Exception
{
final TaskLock myLock = Iterables.getOnlyElement(toolbox.getTaskActionClient().submit(new LockListAction()));
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockboxTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockboxTest.java
index 004c15b..db0fe75 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockboxTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockboxTest.java
@@ -34,6 +34,7 @@ import org.apache.druid.indexing.common.TaskLockType;
import org.apache.druid.indexing.common.TaskToolbox;
import org.apache.druid.indexing.common.TimeChunkLock;
import org.apache.druid.indexing.common.actions.TaskActionClient;
+import org.apache.druid.indexing.common.config.TaskConfig;
import org.apache.druid.indexing.common.config.TaskStorageConfig;
import org.apache.druid.indexing.common.task.AbstractTask;
import org.apache.druid.indexing.common.task.NoopTask;
@@ -1233,6 +1234,11 @@ public class TaskLockboxTest
}
@Override
+ public void stopGracefully(TaskConfig taskConfig)
+ {
+ }
+
+ @Override
public TaskStatus run(TaskToolbox toolbox)
{
return TaskStatus.failure("how?");
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordResourceTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordResourceTest.java
index 3a58c0c..1591925 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordResourceTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordResourceTest.java
@@ -31,6 +31,7 @@ import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexer.TaskStatusPlus;
import org.apache.druid.indexing.common.TaskToolbox;
import org.apache.druid.indexing.common.actions.TaskActionClient;
+import org.apache.druid.indexing.common.config.TaskConfig;
import org.apache.druid.indexing.common.task.AbstractTask;
import org.apache.druid.indexing.common.task.NoopTask;
import org.apache.druid.indexing.common.task.Task;
@@ -1280,6 +1281,11 @@ public class OverlordResourceTest
}
@Override
+ public void stopGracefully(TaskConfig taskConfig)
+ {
+ }
+
+ @Override
public TaskStatus run(TaskToolbox toolbox)
{
return null;
diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/Appenderator.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/Appenderator.java
index 7ad67ec..ce5ac8f 100644
--- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/Appenderator.java
+++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/Appenderator.java
@@ -29,7 +29,6 @@ import org.apache.druid.query.QuerySegmentWalker;
import org.apache.druid.segment.incremental.IndexSizeExceededException;
import javax.annotation.Nullable;
-import java.io.Closeable;
import java.util.Collection;
import java.util.List;
@@ -46,7 +45,7 @@ import java.util.List;
* all methods of the data appending and indexing lifecycle except {@link #drop} must be called from a single thread.
* Methods inherited from {@link QuerySegmentWalker} can be called concurrently from multiple threads.
*/
-public interface Appenderator extends QuerySegmentWalker, Closeable
+public interface Appenderator extends QuerySegmentWalker
{
/**
* Return the name of the dataSource associated with this Appenderator.
@@ -200,7 +199,6 @@ public interface Appenderator extends QuerySegmentWalker, Closeable
* pushes to finish. This will not remove any on-disk persisted data, but it will drop any data that has not yet been
* persisted.
*/
- @Override
void close();
/**
diff --git a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/DefaultOfflineAppenderatorFactoryTest.java b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/DefaultOfflineAppenderatorFactoryTest.java
index b92bd12..fed1d70 100644
--- a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/DefaultOfflineAppenderatorFactoryTest.java
+++ b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/DefaultOfflineAppenderatorFactoryTest.java
@@ -154,11 +154,12 @@ public class DefaultOfflineAppenderatorFactoryTest
null
);
- try (Appenderator appenderator = defaultOfflineAppenderatorFactory.build(
+ Appenderator appenderator = defaultOfflineAppenderatorFactory.build(
schema,
tuningConfig,
new FireDepartmentMetrics()
- )) {
+ );
+ try {
Assert.assertEquals("dataSourceName", appenderator.getDataSource());
Assert.assertEquals(null, appenderator.startJob());
SegmentIdWithShardSpec identifier = new SegmentIdWithShardSpec(
@@ -175,5 +176,8 @@ public class DefaultOfflineAppenderatorFactoryTest
appenderator.close();
Assert.assertEquals(0, ((AppenderatorImpl) appenderator).getRowsInMemory());
}
+ finally {
+ appenderator.close();
+ }
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org