You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by hi...@apache.org on 2019/07/30 00:06:45 UTC

[incubator-druid] branch master updated: Add CliIndexer process type and initial task runner implementation (#8107)

This is an automated email from the ASF dual-hosted git repository.

himanshug pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 640b7af  Add CliIndexer process type and initial task runner implementation (#8107)
640b7af is described below

commit 640b7afc1cee911a27de7bf938dda24a85ba1510
Author: Jonathan Wei <jo...@users.noreply.github.com>
AuthorDate: Mon Jul 29 17:06:33 2019 -0700

    Add CliIndexer process type and initial task runner implementation (#8107)
    
    * Add CliIndexer process type and initial task runner implementation
    
    * Fix HttpRemoteTaskRunnerTest
    
    * Remove batch sanity check on PeonAppenderatorsManager
    
    * Fix paralle index tests
    
    * PR comments
    
    * Adjust Jersey resource logging
    
    * Additional cleanup
    
    * Fix SystemSchemaTest
    
    * Add comment to LocalDataSegmentPusherTest absolute path test
    
    * More PR comments
    
    * Use Server annotated with RemoteChatHandler
    
    * More PR comments
    
    * Checkstyle
    
    * PR comments
    
    * Add task shutdown to stopGracefully
    
    * Small cleanup
    
    * Compile fix
    
    * Address PR comments
    
    * Adjust TaskReportFileWriter and fix nits
    
    * Remove unnecessary closer
    
    * More PR comments
    
    * Minor adjustments
    
    * PR comments
    
    * ThreadingTaskRunner: cancel  task run future not shutdownFuture and remove thread from workitem
---
 .../main/java/org/apache/druid/guice/Jerseys.java  |   4 +
 docs/content/querying/lookups.md                   |   2 +
 .../druid/security/basic/CommonCacheNotifier.java  |   3 +-
 .../IncrementalPublishingKafkaIndexTaskRunner.java |   3 +
 .../druid/indexing/kafka/KafkaIndexTask.java       |   8 +-
 .../indexing/kafka/supervisor/KafkaSupervisor.java |   3 +-
 .../druid/indexing/kafka/KafkaIndexTaskTest.java   |  12 +-
 .../kafka/supervisor/KafkaSupervisorTest.java      |   4 +-
 .../druid/indexing/kinesis/KinesisIndexTask.java   |   8 +-
 .../indexing/kinesis/KinesisIndexTaskRunner.java   |   3 +
 .../kinesis/supervisor/KinesisSupervisor.java      |   3 +-
 .../indexing/kinesis/KinesisIndexTaskTest.java     |  19 +-
 .../kinesis/supervisor/KinesisSupervisorTest.java  |   4 +-
 ....java => MultipleFileTaskReportFileWriter.java} |  31 +-
 ...er.java => SingleFileTaskReportFileWriter.java} |  13 +-
 .../indexing/common/TaskReportFileWriter.java      |  34 +-
 .../task/AppenderatorDriverRealtimeIndexTask.java  |  39 +-
 .../druid/indexing/common/task/CompactionTask.java |  20 +-
 .../indexing/common/task/HadoopIndexTask.java      |  34 +-
 .../druid/indexing/common/task/IndexTask.java      |  63 ++-
 .../indexing/common/task/RealtimeIndexTask.java    |   8 +
 .../task/batch/parallel/ParallelIndexSubTask.java  |  41 +-
 .../batch/parallel/ParallelIndexSubTaskSpec.java   |   4 +-
 .../parallel/ParallelIndexSupervisorTask.java      |  61 ++-
 .../overlord/BaseRestorableTaskRunner.java         | 209 +++++++++
 .../druid/indexing/overlord/ForkingTaskRunner.java | 217 ++-------
 .../indexing/overlord/ThreadingTaskRunner.java     | 521 +++++++++++++++++++++
 .../overlord/hrtr/HttpRemoteTaskRunner.java        |   6 +-
 .../seekablestream/SeekableStreamIndexTask.java    |  12 +-
 .../SeekableStreamIndexTaskRunner.java             |  26 +-
 .../druid/indexing/worker/WorkerTaskManager.java   |   3 +-
 .../druid/indexing/common/TaskToolboxTest.java     |   4 +-
 .../apache/druid/indexing/common/TestUtils.java    |   3 +
 .../AppenderatorDriverRealtimeIndexTaskTest.java   |  11 +-
 .../common/task/CompactionTaskRunTest.java         |  32 +-
 .../indexing/common/task/CompactionTaskTest.java   |  17 +-
 .../druid/indexing/common/task/IndexTaskTest.java  |  70 ++-
 .../indexing/common/task/IngestionTestBase.java    |   4 +-
 ...iter.java => NoopTestTaskReportFileWriter.java} |  10 +-
 .../common/task/RealtimeIndexTaskTest.java         |   2 +-
 .../druid/indexing/common/task/TaskSerdeTest.java  |   6 +-
 .../common/task/TestAppenderatorsManager.java      | 147 ++++++
 .../AbstractParallelIndexSupervisorTaskTest.java   |   8 +-
 .../ParallelIndexSupervisorTaskKillTest.java       |   4 +-
 .../ParallelIndexSupervisorTaskResourceTest.java   |   4 +-
 .../ParallelIndexSupervisorTaskSerdeTest.java      |   4 +-
 .../parallel/ParallelIndexSupervisorTaskTest.java  |   7 +-
 .../overlord/SingleTaskBackgroundRunnerTest.java   |   4 +-
 .../druid/indexing/overlord/TaskLifecycleTest.java |  19 +-
 .../overlord/hrtr/HttpRemoteTaskRunnerTest.java    |  14 +-
 .../SeekableStreamSupervisorStateTest.java         |   3 +-
 .../indexing/worker/WorkerTaskManagerTest.java     |   4 +-
 .../indexing/worker/WorkerTaskMonitorTest.java     |   4 +-
 .../discovery/DruidNodeDiscoveryProvider.java      |   6 +-
 .../java/org/apache/druid/discovery/NodeType.java  |   3 +-
 .../apache/druid/query/lookup/LookupModule.java    |   3 +-
 ...DataSegmentServerAnnouncerLifecycleHandler.java | 104 ++++
 .../realtime/appenderator/AppenderatorImpl.java    |  75 ++-
 .../appenderator/AppenderatorsManager.java         | 130 +++++
 .../DummyForInjectionAppenderatorsManager.java     | 120 +++++
 .../appenderator/PeonAppenderatorsManager.java     | 170 +++++++
 .../appenderator/SinkQuerySegmentWalker.java       |   6 +
 .../UnifiedIndexerAppenderatorsManager.java        | 199 ++++++++
 .../realtime/firehose/ChatHandlerResource.java     |   4 +-
 .../apache/druid/server/http/ClusterResource.java  |   5 +
 .../druid/server/initialization/ServerConfig.java  |  47 +-
 .../jetty/ChatHandlerServerModule.java             |   6 +-
 ...rverModule.java => CliIndexerServerModule.java} |  68 ++-
 .../server/initialization/jetty/JettyBindings.java |  29 +-
 .../initialization/jetty/JettyServerInitUtils.java |  15 +-
 .../initialization/jetty/ServletFilterHolder.java  |  14 +-
 .../initialization/ServerConfigSerdeTest.java      |  59 +++
 .../loading/LocalDataSegmentPusherTest.java        |   2 +
 .../cli/{CliMiddleManager.java => CliIndexer.java} | 125 ++---
 .../org/apache/druid/cli/CliMiddleManager.java     |  23 +-
 .../java/org/apache/druid/cli/CliOverlord.java     |   7 +
 .../main/java/org/apache/druid/cli/CliPeon.java    | 223 +++++----
 .../src/main/java/org/apache/druid/cli/Main.java   |   1 +
 .../druid/sql/calcite/schema/SystemSchemaTest.java |  43 +-
 79 files changed, 2695 insertions(+), 591 deletions(-)

diff --git a/core/src/main/java/org/apache/druid/guice/Jerseys.java b/core/src/main/java/org/apache/druid/guice/Jerseys.java
index 08f09bd..5a9ab5b 100644
--- a/core/src/main/java/org/apache/druid/guice/Jerseys.java
+++ b/core/src/main/java/org/apache/druid/guice/Jerseys.java
@@ -24,14 +24,18 @@ import com.google.inject.TypeLiteral;
 import com.google.inject.multibindings.Multibinder;
 import org.apache.druid.guice.annotations.JSR311Resource;
 import org.apache.druid.guice.annotations.PublicApi;
+import org.apache.druid.java.util.common.logger.Logger;
 
 /**
  */
 @PublicApi
 public class Jerseys
 {
+  private static final Logger LOG = new Logger(Jerseys.class);
+
   public static void addResource(Binder binder, Class<?> resourceClazz)
   {
+    LOG.info("Adding Jersey resource: " + resourceClazz.getName());
     Multibinder.newSetBinder(binder, new TypeLiteral<Class<?>>(){}, JSR311Resource.class)
                .addBinding()
                .toInstance(resourceClazz);
diff --git a/docs/content/querying/lookups.md b/docs/content/querying/lookups.md
index 7af2bcd..aa20d64 100644
--- a/docs/content/querying/lookups.md
+++ b/docs/content/querying/lookups.md
@@ -132,6 +132,8 @@ The configuration is propagated to the query serving processes (Broker / Router
 The query serving processes have an internal API for managing lookups on the process and those are used by the Coordinator.
 The Coordinator periodically checks if any of the processes need to load/drop lookups and updates them appropriately.
 
+Please note that only 2 simultaneous lookup configuration propagation requests can be concurrently handled by a single query serving process. This limit is applied to prevent lookup handling from consuming too many server HTTP connections.
+
 # API for configuring lookups
 
 ## Bulk update
diff --git a/extensions-core/druid-basic-security/src/main/java/org/apache/druid/security/basic/CommonCacheNotifier.java b/extensions-core/druid-basic-security/src/main/java/org/apache/druid/security/basic/CommonCacheNotifier.java
index f1e61eb..98de383 100644
--- a/extensions-core/druid-basic-security/src/main/java/org/apache/druid/security/basic/CommonCacheNotifier.java
+++ b/extensions-core/druid-basic-security/src/main/java/org/apache/druid/security/basic/CommonCacheNotifier.java
@@ -68,7 +68,8 @@ public class CommonCacheNotifier
       NodeType.HISTORICAL,
       NodeType.PEON,
       NodeType.ROUTER,
-      NodeType.MIDDLE_MANAGER
+      NodeType.MIDDLE_MANAGER,
+      NodeType.INDEXER
   );
 
   private final DruidNodeDiscoveryProvider discoveryProvider;
diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/IncrementalPublishingKafkaIndexTaskRunner.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/IncrementalPublishingKafkaIndexTaskRunner.java
index c77725d..4498787 100644
--- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/IncrementalPublishingKafkaIndexTaskRunner.java
+++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/IncrementalPublishingKafkaIndexTaskRunner.java
@@ -37,6 +37,7 @@ import org.apache.druid.indexing.seekablestream.common.RecordSupplier;
 import org.apache.druid.indexing.seekablestream.common.StreamPartition;
 import org.apache.druid.java.util.common.ISE;
 import org.apache.druid.java.util.emitter.EmittingLogger;
+import org.apache.druid.segment.realtime.appenderator.AppenderatorsManager;
 import org.apache.druid.segment.realtime.firehose.ChatHandlerProvider;
 import org.apache.druid.server.security.AuthorizerMapper;
 import org.apache.druid.utils.CircularBuffer;
@@ -72,6 +73,7 @@ public class IncrementalPublishingKafkaIndexTaskRunner extends SeekableStreamInd
       Optional<ChatHandlerProvider> chatHandlerProvider,
       CircularBuffer<Throwable> savedParseExceptions,
       RowIngestionMetersFactory rowIngestionMetersFactory,
+      AppenderatorsManager appenderatorsManager,
       LockGranularity lockGranularityToUse
   )
   {
@@ -82,6 +84,7 @@ public class IncrementalPublishingKafkaIndexTaskRunner extends SeekableStreamInd
         chatHandlerProvider,
         savedParseExceptions,
         rowIngestionMetersFactory,
+        appenderatorsManager,
         lockGranularityToUse
     );
     this.task = task;
diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTask.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTask.java
index 25140d8..fa900d2 100644
--- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTask.java
+++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTask.java
@@ -29,6 +29,7 @@ import org.apache.druid.indexing.common.task.TaskResource;
 import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTask;
 import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner;
 import org.apache.druid.segment.indexing.DataSchema;
+import org.apache.druid.segment.realtime.appenderator.AppenderatorsManager;
 import org.apache.druid.segment.realtime.firehose.ChatHandlerProvider;
 import org.apache.druid.server.security.AuthorizerMapper;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
@@ -63,7 +64,8 @@ public class KafkaIndexTask extends SeekableStreamIndexTask<Integer, Long>
       @JacksonInject ChatHandlerProvider chatHandlerProvider,
       @JacksonInject AuthorizerMapper authorizerMapper,
       @JacksonInject RowIngestionMetersFactory rowIngestionMetersFactory,
-      @JacksonInject ObjectMapper configMapper
+      @JacksonInject ObjectMapper configMapper,
+      @JacksonInject AppenderatorsManager appenderatorsManager
   )
   {
     super(
@@ -76,7 +78,8 @@ public class KafkaIndexTask extends SeekableStreamIndexTask<Integer, Long>
         chatHandlerProvider,
         authorizerMapper,
         rowIngestionMetersFactory,
-        getFormattedGroupId(dataSchema.getDataSource(), TYPE)
+        getFormattedGroupId(dataSchema.getDataSource(), TYPE),
+        appenderatorsManager
     );
     this.configMapper = configMapper;
     this.ioConfig = ioConfig;
@@ -136,6 +139,7 @@ public class KafkaIndexTask extends SeekableStreamIndexTask<Integer, Long>
         chatHandlerProvider,
         savedParseExceptions,
         rowIngestionMetersFactory,
+        appenderatorsManager,
         lockGranularityToUse
     );
   }
diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java
index e9c7d2a..d6fb3c7 100644
--- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java
+++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java
@@ -258,7 +258,8 @@ public class KafkaSupervisor extends SeekableStreamSupervisor<Integer, Long>
           null,
           null,
           rowIngestionMetersFactory,
-          sortingMapper
+          sortingMapper,
+          null
       ));
     }
     return taskList;
diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java
index 87f522e..9702493 100644
--- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java
+++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java
@@ -51,9 +51,9 @@ import org.apache.druid.indexer.TaskStatus;
 import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReportData;
 import org.apache.druid.indexing.common.LockGranularity;
 import org.apache.druid.indexing.common.SegmentLoaderFactory;
+import org.apache.druid.indexing.common.SingleFileTaskReportFileWriter;
 import org.apache.druid.indexing.common.TaskLock;
 import org.apache.druid.indexing.common.TaskReport;
-import org.apache.druid.indexing.common.TaskReportFileWriter;
 import org.apache.druid.indexing.common.TaskToolbox;
 import org.apache.druid.indexing.common.TaskToolboxFactory;
 import org.apache.druid.indexing.common.TestUtils;
@@ -68,6 +68,7 @@ import org.apache.druid.indexing.common.stats.RowIngestionMetersFactory;
 import org.apache.druid.indexing.common.task.IndexTaskTest;
 import org.apache.druid.indexing.common.task.Task;
 import org.apache.druid.indexing.common.task.Tasks;
+import org.apache.druid.indexing.common.task.TestAppenderatorsManager;
 import org.apache.druid.indexing.kafka.supervisor.KafkaSupervisor;
 import org.apache.druid.indexing.kafka.supervisor.KafkaSupervisorIOConfig;
 import org.apache.druid.indexing.kafka.test.TestBroker;
@@ -142,6 +143,7 @@ import org.apache.druid.segment.loading.DataSegmentPusher;
 import org.apache.druid.segment.loading.LocalDataSegmentPusher;
 import org.apache.druid.segment.loading.LocalDataSegmentPusherConfig;
 import org.apache.druid.segment.realtime.appenderator.AppenderatorImpl;
+import org.apache.druid.segment.realtime.appenderator.AppenderatorsManager;
 import org.apache.druid.segment.realtime.plumber.SegmentHandoffNotifier;
 import org.apache.druid.segment.realtime.plumber.SegmentHandoffNotifierFactory;
 import org.apache.druid.segment.transform.ExpressionTransform;
@@ -230,6 +232,7 @@ public class KafkaIndexTaskTest
   private Long maxTotalRows = null;
   private Period intermediateHandoffPeriod = null;
 
+  private AppenderatorsManager appenderatorsManager;
   private TaskToolboxFactory toolboxFactory;
   private IndexerMetadataStorageCoordinator metadataStorageCoordinator;
   private TaskStorage taskStorage;
@@ -372,6 +375,7 @@ public class KafkaIndexTaskTest
     topic = getTopicName();
     records = generateRecords(topic);
     reportsFile = File.createTempFile("KafkaIndexTaskTestReports-" + System.currentTimeMillis(), "json");
+    appenderatorsManager = new TestAppenderatorsManager();
     makeToolboxFactory();
   }
 
@@ -2534,7 +2538,8 @@ public class KafkaIndexTaskTest
         null,
         null,
         rowIngestionMetersFactory,
-        OBJECT_MAPPER
+        OBJECT_MAPPER,
+        appenderatorsManager
     );
     task.setPollRetryMs(POLL_RETRY_MS);
     return task;
@@ -2701,6 +2706,7 @@ public class KafkaIndexTaskTest
     final LocalDataSegmentPusherConfig dataSegmentPusherConfig = new LocalDataSegmentPusherConfig();
     dataSegmentPusherConfig.storageDirectory = getSegmentDirectory();
     final DataSegmentPusher dataSegmentPusher = new LocalDataSegmentPusher(dataSegmentPusherConfig);
+
     toolboxFactory = new TaskToolboxFactory(
         taskConfig,
         taskActionClientFactory,
@@ -2726,7 +2732,7 @@ public class KafkaIndexTaskTest
         EasyMock.createNiceMock(DruidNode.class),
         new LookupNodeService("tier"),
         new DataNodeService("tier", 1, ServerType.INDEXER_EXECUTOR, 0),
-        new TaskReportFileWriter(reportsFile)
+        new SingleFileTaskReportFileWriter(reportsFile)
     );
   }
 
diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
index 6474c22..8a9571f 100644
--- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
+++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
@@ -82,6 +82,7 @@ import org.apache.druid.segment.indexing.DataSchema;
 import org.apache.druid.segment.indexing.RealtimeIOConfig;
 import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec;
 import org.apache.druid.segment.realtime.FireDepartment;
+import org.apache.druid.segment.realtime.appenderator.DummyForInjectionAppenderatorsManager;
 import org.apache.druid.server.metrics.DruidMonitorSchedulerConfig;
 import org.apache.druid.server.metrics.ExceptionCapturingServiceEmitter;
 import org.apache.druid.server.metrics.NoopServiceEmitter;
@@ -3556,7 +3557,8 @@ public class KafkaSupervisorTest extends EasyMockSupport
         null,
         null,
         rowIngestionMetersFactory,
-        objectMapper
+        objectMapper,
+        new DummyForInjectionAppenderatorsManager()
     );
   }
 
diff --git a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTask.java b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTask.java
index 3490ee9..b2209ea 100644
--- a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTask.java
+++ b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTask.java
@@ -28,6 +28,7 @@ import org.apache.druid.indexing.common.task.TaskResource;
 import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTask;
 import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner;
 import org.apache.druid.segment.indexing.DataSchema;
+import org.apache.druid.segment.realtime.appenderator.AppenderatorsManager;
 import org.apache.druid.segment.realtime.firehose.ChatHandlerProvider;
 import org.apache.druid.server.security.AuthorizerMapper;
 
@@ -50,7 +51,8 @@ public class KinesisIndexTask extends SeekableStreamIndexTask<String, String>
       @JacksonInject ChatHandlerProvider chatHandlerProvider,
       @JacksonInject AuthorizerMapper authorizerMapper,
       @JacksonInject RowIngestionMetersFactory rowIngestionMetersFactory,
-      @JacksonInject AWSCredentialsConfig awsCredentialsConfig
+      @JacksonInject AWSCredentialsConfig awsCredentialsConfig,
+      @JacksonInject AppenderatorsManager appenderatorsManager
   )
   {
     super(
@@ -63,7 +65,8 @@ public class KinesisIndexTask extends SeekableStreamIndexTask<String, String>
         chatHandlerProvider,
         authorizerMapper,
         rowIngestionMetersFactory,
-        getFormattedGroupId(dataSchema.getDataSource(), TYPE)
+        getFormattedGroupId(dataSchema.getDataSource(), TYPE),
+        appenderatorsManager
     );
     this.awsCredentialsConfig = awsCredentialsConfig;
   }
@@ -79,6 +82,7 @@ public class KinesisIndexTask extends SeekableStreamIndexTask<String, String>
         chatHandlerProvider,
         savedParseExceptions,
         rowIngestionMetersFactory,
+        appenderatorsManager,
         lockGranularityToUse
     );
   }
diff --git a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskRunner.java b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskRunner.java
index 3e88bfd..c46d223 100644
--- a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskRunner.java
+++ b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskRunner.java
@@ -37,6 +37,7 @@ import org.apache.druid.indexing.seekablestream.common.RecordSupplier;
 import org.apache.druid.indexing.seekablestream.common.StreamPartition;
 import org.apache.druid.java.util.common.ISE;
 import org.apache.druid.java.util.emitter.EmittingLogger;
+import org.apache.druid.segment.realtime.appenderator.AppenderatorsManager;
 import org.apache.druid.segment.realtime.firehose.ChatHandlerProvider;
 import org.apache.druid.server.security.AuthorizerMapper;
 import org.apache.druid.utils.CircularBuffer;
@@ -66,6 +67,7 @@ public class KinesisIndexTaskRunner extends SeekableStreamIndexTaskRunner<String
       Optional<ChatHandlerProvider> chatHandlerProvider,
       CircularBuffer<Throwable> savedParseExceptions,
       RowIngestionMetersFactory rowIngestionMetersFactory,
+      AppenderatorsManager appenderatorsManager,
       LockGranularity lockGranularityToUse
   )
   {
@@ -76,6 +78,7 @@ public class KinesisIndexTaskRunner extends SeekableStreamIndexTaskRunner<String
         chatHandlerProvider,
         savedParseExceptions,
         rowIngestionMetersFactory,
+        appenderatorsManager,
         lockGranularityToUse
     );
     this.task = task;
diff --git a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisor.java b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisor.java
index 5a0c861..0e919b3 100644
--- a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisor.java
+++ b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisor.java
@@ -173,7 +173,8 @@ public class KinesisSupervisor extends SeekableStreamSupervisor<String, String>
           null,
           null,
           rowIngestionMetersFactory,
-          awsCredentialsConfig
+          awsCredentialsConfig,
+          null
       ));
     }
     return taskList;
diff --git a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java
index 006677b..09d0b45 100644
--- a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java
+++ b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java
@@ -57,9 +57,9 @@ import org.apache.druid.indexer.TaskStatus;
 import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReportData;
 import org.apache.druid.indexing.common.LockGranularity;
 import org.apache.druid.indexing.common.SegmentLoaderFactory;
+import org.apache.druid.indexing.common.SingleFileTaskReportFileWriter;
 import org.apache.druid.indexing.common.TaskLock;
 import org.apache.druid.indexing.common.TaskReport;
-import org.apache.druid.indexing.common.TaskReportFileWriter;
 import org.apache.druid.indexing.common.TaskToolbox;
 import org.apache.druid.indexing.common.TaskToolboxFactory;
 import org.apache.druid.indexing.common.TestUtils;
@@ -75,6 +75,7 @@ import org.apache.druid.indexing.common.task.IndexTaskTest;
 import org.apache.druid.indexing.common.task.Task;
 import org.apache.druid.indexing.common.task.TaskResource;
 import org.apache.druid.indexing.common.task.Tasks;
+import org.apache.druid.indexing.common.task.TestAppenderatorsManager;
 import org.apache.druid.indexing.kinesis.supervisor.KinesisSupervisor;
 import org.apache.druid.indexing.overlord.DataSourceMetadata;
 import org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator;
@@ -141,6 +142,7 @@ import org.apache.druid.segment.loading.DataSegmentPusher;
 import org.apache.druid.segment.loading.LocalDataSegmentPusher;
 import org.apache.druid.segment.loading.LocalDataSegmentPusherConfig;
 import org.apache.druid.segment.realtime.appenderator.AppenderatorImpl;
+import org.apache.druid.segment.realtime.appenderator.AppenderatorsManager;
 import org.apache.druid.segment.realtime.firehose.ChatHandlerProvider;
 import org.apache.druid.segment.realtime.plumber.SegmentHandoffNotifier;
 import org.apache.druid.segment.realtime.plumber.SegmentHandoffNotifierFactory;
@@ -231,6 +233,7 @@ public class KinesisIndexTaskTest extends EasyMockSupport
   private final Period intermediateHandoffPeriod = null;
   private int maxRecordsPerPoll;
 
+  private AppenderatorsManager appenderatorsManager;
   private TaskToolboxFactory toolboxFactory;
   private IndexerMetadataStorageCoordinator metadataStorageCoordinator;
   private TaskStorage taskStorage;
@@ -316,6 +319,8 @@ public class KinesisIndexTaskTest extends EasyMockSupport
 
     recordSupplier = mock(KinesisRecordSupplier.class);
 
+    appenderatorsManager = new TestAppenderatorsManager();
+
     // sleep required because of kinesalite
     Thread.sleep(500);
     makeToolboxFactory();
@@ -2720,7 +2725,8 @@ public class KinesisIndexTaskTest extends EasyMockSupport
         null,
         null,
         rowIngestionMetersFactory,
-        null
+        null,
+        appenderatorsManager
     );
   }
 
@@ -2880,6 +2886,7 @@ public class KinesisIndexTaskTest extends EasyMockSupport
     final LocalDataSegmentPusherConfig dataSegmentPusherConfig = new LocalDataSegmentPusherConfig();
     dataSegmentPusherConfig.storageDirectory = getSegmentDirectory();
     final DataSegmentPusher dataSegmentPusher = new LocalDataSegmentPusher(dataSegmentPusherConfig);
+
     toolboxFactory = new TaskToolboxFactory(
         taskConfig,
         taskActionClientFactory,
@@ -2905,7 +2912,7 @@ public class KinesisIndexTaskTest extends EasyMockSupport
         EasyMock.createNiceMock(DruidNode.class),
         new LookupNodeService("tier"),
         new DataNodeService("tier", 1, ServerType.INDEXER_EXECUTOR, 0),
-        new TaskReportFileWriter(reportsFile)
+        new SingleFileTaskReportFileWriter(reportsFile)
     );
   }
 
@@ -3088,7 +3095,8 @@ public class KinesisIndexTaskTest extends EasyMockSupport
         @JacksonInject ChatHandlerProvider chatHandlerProvider,
         @JacksonInject AuthorizerMapper authorizerMapper,
         @JacksonInject RowIngestionMetersFactory rowIngestionMetersFactory,
-        @JacksonInject AWSCredentialsConfig awsCredentialsConfig
+        @JacksonInject AWSCredentialsConfig awsCredentialsConfig,
+        @JacksonInject AppenderatorsManager appenderatorsManager
     )
     {
       super(
@@ -3101,7 +3109,8 @@ public class KinesisIndexTaskTest extends EasyMockSupport
           chatHandlerProvider,
           authorizerMapper,
           rowIngestionMetersFactory,
-          awsCredentialsConfig
+          awsCredentialsConfig,
+          appenderatorsManager
       );
     }
 
diff --git a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java
index be5a87e..7b5fcf8 100644
--- a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java
+++ b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java
@@ -80,6 +80,7 @@ import org.apache.druid.segment.indexing.DataSchema;
 import org.apache.druid.segment.indexing.RealtimeIOConfig;
 import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec;
 import org.apache.druid.segment.realtime.FireDepartment;
+import org.apache.druid.segment.realtime.appenderator.DummyForInjectionAppenderatorsManager;
 import org.apache.druid.server.metrics.DruidMonitorSchedulerConfig;
 import org.apache.druid.server.metrics.ExceptionCapturingServiceEmitter;
 import org.apache.druid.server.metrics.NoopServiceEmitter;
@@ -4259,7 +4260,8 @@ public class KinesisSupervisorTest extends EasyMockSupport
         null,
         null,
         rowIngestionMetersFactory,
-        null
+        null,
+        new DummyForInjectionAppenderatorsManager()
     );
   }
 
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/TaskReportFileWriter.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/MultipleFileTaskReportFileWriter.java
similarity index 68%
copy from indexing-service/src/main/java/org/apache/druid/indexing/common/TaskReportFileWriter.java
copy to indexing-service/src/main/java/org/apache/druid/indexing/common/MultipleFileTaskReportFileWriter.java
index d9c9e81..3e77f1f 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/TaskReportFileWriter.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/MultipleFileTaskReportFileWriter.java
@@ -24,22 +24,26 @@ import org.apache.commons.io.FileUtils;
 import org.apache.druid.java.util.common.logger.Logger;
 
 import java.io.File;
+import java.util.HashMap;
 import java.util.Map;
 
-public class TaskReportFileWriter
+public class MultipleFileTaskReportFileWriter implements TaskReportFileWriter
 {
-  private static final Logger log = new Logger(TaskReportFileWriter.class);
+  private static final Logger log = new Logger(MultipleFileTaskReportFileWriter.class);
+
+  private final Map<String, File> taskReportFiles = new HashMap<>();
 
-  private final File reportsFile;
   private ObjectMapper objectMapper;
 
-  public TaskReportFileWriter(File reportFile)
+  @Override
+  public void write(String taskId, Map<String, TaskReport> reports)
   {
-    this.reportsFile = reportFile;
-  }
+    final File reportsFile = taskReportFiles.get(taskId);
+    if (reportsFile == null) {
+      log.error("Could not find report file for task[%s]", taskId);
+      return;
+    }
 
-  public void write(Map<String, TaskReport> reports)
-  {
     try {
       final File reportsFileParent = reportsFile.getParentFile();
       if (reportsFileParent != null) {
@@ -52,8 +56,19 @@ public class TaskReportFileWriter
     }
   }
 
+  @Override
   public void setObjectMapper(ObjectMapper objectMapper)
   {
     this.objectMapper = objectMapper;
   }
+
+  public void add(String taskId, File reportsFile)
+  {
+    taskReportFiles.put(taskId, reportsFile);
+  }
+
+  public void delete(String taskId)
+  {
+    taskReportFiles.remove(taskId);
+  }
 }
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/TaskReportFileWriter.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/SingleFileTaskReportFileWriter.java
similarity index 81%
copy from indexing-service/src/main/java/org/apache/druid/indexing/common/TaskReportFileWriter.java
copy to indexing-service/src/main/java/org/apache/druid/indexing/common/SingleFileTaskReportFileWriter.java
index d9c9e81..adf6ad5 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/TaskReportFileWriter.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/SingleFileTaskReportFileWriter.java
@@ -26,19 +26,20 @@ import org.apache.druid.java.util.common.logger.Logger;
 import java.io.File;
 import java.util.Map;
 
-public class TaskReportFileWriter
+public class SingleFileTaskReportFileWriter implements TaskReportFileWriter
 {
-  private static final Logger log = new Logger(TaskReportFileWriter.class);
+  private static final Logger log = new Logger(SingleFileTaskReportFileWriter.class);
 
   private final File reportsFile;
   private ObjectMapper objectMapper;
 
-  public TaskReportFileWriter(File reportFile)
+  public SingleFileTaskReportFileWriter(File reportsFile)
   {
-    this.reportsFile = reportFile;
+    this.reportsFile = reportsFile;
   }
 
-  public void write(Map<String, TaskReport> reports)
+  @Override
+  public void write(String taskId, Map<String, TaskReport> reports)
   {
     try {
       final File reportsFileParent = reportsFile.getParentFile();
@@ -52,6 +53,8 @@ public class TaskReportFileWriter
     }
   }
 
+
+  @Override
   public void setObjectMapper(ObjectMapper objectMapper)
   {
     this.objectMapper = objectMapper;
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/TaskReportFileWriter.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/TaskReportFileWriter.java
index d9c9e81..908efe6 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/TaskReportFileWriter.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/TaskReportFileWriter.java
@@ -20,40 +20,12 @@
 package org.apache.druid.indexing.common;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
-import org.apache.commons.io.FileUtils;
-import org.apache.druid.java.util.common.logger.Logger;
 
-import java.io.File;
 import java.util.Map;
 
-public class TaskReportFileWriter
+public interface TaskReportFileWriter
 {
-  private static final Logger log = new Logger(TaskReportFileWriter.class);
+  void write(String taskId, Map<String, TaskReport> reports);
 
-  private final File reportsFile;
-  private ObjectMapper objectMapper;
-
-  public TaskReportFileWriter(File reportFile)
-  {
-    this.reportsFile = reportFile;
-  }
-
-  public void write(Map<String, TaskReport> reports)
-  {
-    try {
-      final File reportsFileParent = reportsFile.getParentFile();
-      if (reportsFileParent != null) {
-        FileUtils.forceMkdir(reportsFileParent);
-      }
-      objectMapper.writeValue(reportsFile, reports);
-    }
-    catch (Exception e) {
-      log.error(e, "Encountered exception in write().");
-    }
-  }
-
-  public void setObjectMapper(ObjectMapper objectMapper)
-  {
-    this.objectMapper = objectMapper;
-  }
+  void setObjectMapper(ObjectMapper objectMapper);
 }
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java
index 2d5867b..9ec84de 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java
@@ -76,7 +76,7 @@ import org.apache.druid.segment.realtime.FireDepartment;
 import org.apache.druid.segment.realtime.FireDepartmentMetrics;
 import org.apache.druid.segment.realtime.appenderator.Appenderator;
 import org.apache.druid.segment.realtime.appenderator.AppenderatorDriverAddResult;
-import org.apache.druid.segment.realtime.appenderator.Appenderators;
+import org.apache.druid.segment.realtime.appenderator.AppenderatorsManager;
 import org.apache.druid.segment.realtime.appenderator.SegmentsAndMetadata;
 import org.apache.druid.segment.realtime.appenderator.StreamAppenderatorDriver;
 import org.apache.druid.segment.realtime.appenderator.TransactionalSegmentPublisher;
@@ -175,6 +175,9 @@ public class AppenderatorDriverRealtimeIndexTask extends AbstractTask implements
   @JsonIgnore
   private String errorMsg;
 
+  @JsonIgnore
+  private AppenderatorsManager appenderatorsManager;
+
   @JsonCreator
   public AppenderatorDriverRealtimeIndexTask(
       @JsonProperty("id") String id,
@@ -183,7 +186,8 @@ public class AppenderatorDriverRealtimeIndexTask extends AbstractTask implements
       @JsonProperty("context") Map<String, Object> context,
       @JacksonInject ChatHandlerProvider chatHandlerProvider,
       @JacksonInject AuthorizerMapper authorizerMapper,
-      @JacksonInject RowIngestionMetersFactory rowIngestionMetersFactory
+      @JacksonInject RowIngestionMetersFactory rowIngestionMetersFactory,
+      @JacksonInject AppenderatorsManager appenderatorsManager
   )
   {
     super(
@@ -204,6 +208,7 @@ public class AppenderatorDriverRealtimeIndexTask extends AbstractTask implements
 
     this.ingestionState = IngestionState.NOT_STARTED;
     this.rowIngestionMeters = rowIngestionMetersFactory.createRowIngestionMeters();
+    this.appenderatorsManager = appenderatorsManager;
     this.lockGranularity = getContextValue(Tasks.FORCE_TIME_CHUNK_LOCK_KEY, Tasks.DEFAULT_FORCE_TIME_CHUNK_LOCK)
                            ? LockGranularity.TIME_CHUNK
                            : LockGranularity.SEGMENT;
@@ -282,9 +287,10 @@ public class AppenderatorDriverRealtimeIndexTask extends AbstractTask implements
         log.warn("No chat handler detected");
       }
 
-
-      toolbox.getDataSegmentServerAnnouncer().announce();
-      toolbox.getDruidNodeAnnouncer().announce(discoveryDruidNode);
+      if (appenderatorsManager.shouldTaskMakeNodeAnnouncements()) {
+        toolbox.getDataSegmentServerAnnouncer().announce();
+        toolbox.getDruidNodeAnnouncer().announce(discoveryDruidNode);
+      }
 
       driver.startJob(
           segmentId -> {
@@ -415,7 +421,7 @@ public class AppenderatorDriverRealtimeIndexTask extends AbstractTask implements
       log.makeAlert(e, "Exception aborted realtime processing[%s]", dataSchema.getDataSource())
          .emit();
       errorMsg = Throwables.getStackTraceAsString(e);
-      toolbox.getTaskReportFileWriter().write(getTaskCompletionReports());
+      toolbox.getTaskReportFileWriter().write(getId(), getTaskCompletionReports());
       return TaskStatus.failure(
           getId(),
           errorMsg
@@ -432,12 +438,14 @@ public class AppenderatorDriverRealtimeIndexTask extends AbstractTask implements
 
       toolbox.getMonitorScheduler().removeMonitor(metricsMonitor);
 
-      toolbox.getDataSegmentServerAnnouncer().unannounce();
-      toolbox.getDruidNodeAnnouncer().unannounce(discoveryDruidNode);
+      if (appenderatorsManager.shouldTaskMakeNodeAnnouncements()) {
+        toolbox.getDataSegmentServerAnnouncer().unannounce();
+        toolbox.getDruidNodeAnnouncer().unannounce(discoveryDruidNode);
+      }
     }
 
     log.info("Job done!");
-    toolbox.getTaskReportFileWriter().write(getTaskCompletionReports());
+    toolbox.getTaskReportFileWriter().write(getId(), getTaskCompletionReports());
     return TaskStatus.success(getId());
   }
 
@@ -473,6 +481,14 @@ public class AppenderatorDriverRealtimeIndexTask extends AbstractTask implements
       catch (Exception e) {
         throw new RuntimeException(e);
       }
+    } else {
+      synchronized (this) {
+        if (!gracefullyStopped) {
+          // If task restore is not enabled, just interrupt immediately.
+          gracefullyStopped = true;
+          runThread.interrupt();
+        }
+      }
     }
   }
 
@@ -728,14 +744,15 @@ public class AppenderatorDriverRealtimeIndexTask extends AbstractTask implements
     );
   }
 
-  private static Appenderator newAppenderator(
+  private Appenderator newAppenderator(
       final DataSchema dataSchema,
       final RealtimeAppenderatorTuningConfig tuningConfig,
       final FireDepartmentMetrics metrics,
       final TaskToolbox toolbox
   )
   {
-    return Appenderators.createRealtime(
+    return appenderatorsManager.createRealtimeAppenderatorForTask(
+        getId(),
         dataSchema,
         tuningConfig.withBasePersistDirectory(toolbox.getPersistDir()),
         metrics,
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java
index 5d3bdf8..6dc1e0f 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java
@@ -74,6 +74,7 @@ import org.apache.druid.segment.indexing.DataSchema;
 import org.apache.druid.segment.indexing.granularity.GranularitySpec;
 import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec;
 import org.apache.druid.segment.loading.SegmentLoadingException;
+import org.apache.druid.segment.realtime.appenderator.AppenderatorsManager;
 import org.apache.druid.segment.realtime.firehose.ChatHandlerProvider;
 import org.apache.druid.server.coordinator.DataSourceCompactionConfig;
 import org.apache.druid.server.security.AuthorizerMapper;
@@ -145,6 +146,9 @@ public class CompactionTask extends AbstractBatchIndexTask
   @JsonIgnore
   private List<IndexTask> indexTaskSpecs;
 
+  @JsonIgnore
+  private AppenderatorsManager appenderatorsManager;
+
   @JsonCreator
   public CompactionTask(
       @JsonProperty("id") final String id,
@@ -165,7 +169,8 @@ public class CompactionTask extends AbstractBatchIndexTask
       @JacksonInject RowIngestionMetersFactory rowIngestionMetersFactory,
       @JacksonInject CoordinatorClient coordinatorClient,
       @JacksonInject SegmentLoaderFactory segmentLoaderFactory,
-      @JacksonInject RetryPolicyFactory retryPolicyFactory
+      @JacksonInject RetryPolicyFactory retryPolicyFactory,
+      @JacksonInject AppenderatorsManager appenderatorsManager
   )
   {
     super(getOrMakeId(id, TYPE, dataSource), null, taskResource, dataSource, context);
@@ -192,6 +197,7 @@ public class CompactionTask extends AbstractBatchIndexTask
     this.coordinatorClient = coordinatorClient;
     this.segmentLoaderFactory = segmentLoaderFactory;
     this.retryPolicyFactory = retryPolicyFactory;
+    this.appenderatorsManager = appenderatorsManager;
   }
 
   @JsonProperty
@@ -307,7 +313,9 @@ public class CompactionTask extends AbstractBatchIndexTask
               getContext(),
               authorizerMapper,
               chatHandlerProvider,
-              rowIngestionMetersFactory
+              rowIngestionMetersFactory,
+              appenderatorsManager
+
           ))
           .collect(Collectors.toList());
     }
@@ -909,6 +917,7 @@ public class CompactionTask extends AbstractBatchIndexTask
     private final CoordinatorClient coordinatorClient;
     private final SegmentLoaderFactory segmentLoaderFactory;
     private final RetryPolicyFactory retryPolicyFactory;
+    private final AppenderatorsManager appenderatorsManager;
 
     @Nullable
     private Interval interval;
@@ -935,7 +944,8 @@ public class CompactionTask extends AbstractBatchIndexTask
         RowIngestionMetersFactory rowIngestionMetersFactory,
         CoordinatorClient coordinatorClient,
         SegmentLoaderFactory segmentLoaderFactory,
-        RetryPolicyFactory retryPolicyFactory
+        RetryPolicyFactory retryPolicyFactory,
+        AppenderatorsManager appenderatorsManager
     )
     {
       this.dataSource = dataSource;
@@ -946,6 +956,7 @@ public class CompactionTask extends AbstractBatchIndexTask
       this.coordinatorClient = coordinatorClient;
       this.segmentLoaderFactory = segmentLoaderFactory;
       this.retryPolicyFactory = retryPolicyFactory;
+      this.appenderatorsManager = appenderatorsManager;
     }
 
     public Builder interval(Interval interval)
@@ -1017,7 +1028,8 @@ public class CompactionTask extends AbstractBatchIndexTask
           rowIngestionMetersFactory,
           coordinatorClient,
           segmentLoaderFactory,
-          retryPolicyFactory
+          retryPolicyFactory,
+          appenderatorsManager
       );
     }
   }
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/HadoopIndexTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/HadoopIndexTask.java
index f8728c0..7557242 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/HadoopIndexTask.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/HadoopIndexTask.java
@@ -128,6 +128,13 @@ public class HadoopIndexTask extends HadoopTask implements ChatHandler
   @JsonIgnore
   private String errorMsg;
 
+  @JsonIgnore
+  private Thread runThread;
+
+  @JsonIgnore
+  private boolean stopped = false;
+
+
   /**
    * @param spec is used by the HadoopDruidIndexerJob to set up the appropriate parameters
    *             for creating Druid index segments. It may be modified.
@@ -265,6 +272,14 @@ public class HadoopIndexTask extends HadoopTask implements ChatHandler
   @Override
   public TaskStatus run(TaskToolbox toolbox)
   {
+    synchronized (this) {
+      if (stopped) {
+        return TaskStatus.failure(getId());
+      } else {
+        runThread = Thread.currentThread();
+      }
+    }
+
     try {
       taskConfig = toolbox.getConfig();
       if (chatHandlerProvider.isPresent()) {
@@ -288,7 +303,7 @@ public class HadoopIndexTask extends HadoopTask implements ChatHandler
       }
 
       errorMsg = Throwables.getStackTraceAsString(effectiveException);
-      toolbox.getTaskReportFileWriter().write(getTaskCompletionReports());
+      toolbox.getTaskReportFileWriter().write(getId(), getTaskCompletionReports());
       return TaskStatus.failure(
           getId(),
           errorMsg
@@ -352,7 +367,7 @@ public class HadoopIndexTask extends HadoopTask implements ChatHandler
       indexerSchema = determineConfigStatus.getSchema();
       if (indexerSchema == null) {
         errorMsg = determineConfigStatus.getErrorMsg();
-        toolbox.getTaskReportFileWriter().write(getTaskCompletionReports());
+        toolbox.getTaskReportFileWriter().write(getId(), getTaskCompletionReports());
         return TaskStatus.failure(
             getId(),
             errorMsg
@@ -399,7 +414,7 @@ public class HadoopIndexTask extends HadoopTask implements ChatHandler
             specVersion,
             version
         );
-        toolbox.getTaskReportFileWriter().write(null);
+        toolbox.getTaskReportFileWriter().write(getId(), null);
         return TaskStatus.failure(getId());
       }
     }
@@ -438,14 +453,14 @@ public class HadoopIndexTask extends HadoopTask implements ChatHandler
       if (buildSegmentsStatus.getDataSegments() != null) {
         ingestionState = IngestionState.COMPLETED;
         toolbox.publishSegments(buildSegmentsStatus.getDataSegments());
-        toolbox.getTaskReportFileWriter().write(getTaskCompletionReports());
+        toolbox.getTaskReportFileWriter().write(getId(), getTaskCompletionReports());
         return TaskStatus.success(
             getId(),
             null
         );
       } else {
         errorMsg = buildSegmentsStatus.getErrorMsg();
-        toolbox.getTaskReportFileWriter().write(getTaskCompletionReports());
+        toolbox.getTaskReportFileWriter().write(getId(), getTaskCompletionReports());
         return TaskStatus.failure(
             getId(),
             errorMsg
@@ -463,6 +478,13 @@ public class HadoopIndexTask extends HadoopTask implements ChatHandler
   @Override
   public void stopGracefully(TaskConfig taskConfig)
   {
+    synchronized (this) {
+      stopped = true;
+      if (runThread == null) {
+        // didn't actually start, just return
+        return;
+      }
+    }
     // To avoid issue of kill command once the ingestion task is actually completed
     if (!ingestionState.equals(IngestionState.COMPLETED)) {
       final ClassLoader oldLoader = Thread.currentThread().getContextClassLoader();
@@ -497,9 +519,9 @@ public class HadoopIndexTask extends HadoopTask implements ChatHandler
       }
       finally {
         Thread.currentThread().setContextClassLoader(oldLoader);
+        runThread.interrupt();
       }
     }
-
   }
 
   @GET
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java
index b6a4126..b89772c 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java
@@ -48,6 +48,7 @@ import org.apache.druid.indexing.common.TaskReport;
 import org.apache.druid.indexing.common.TaskToolbox;
 import org.apache.druid.indexing.common.actions.SegmentTransactionalInsertAction;
 import org.apache.druid.indexing.common.actions.TaskActionClient;
+import org.apache.druid.indexing.common.config.TaskConfig;
 import org.apache.druid.indexing.common.stats.RowIngestionMeters;
 import org.apache.druid.indexing.common.stats.RowIngestionMetersFactory;
 import org.apache.druid.indexing.common.stats.TaskRealtimeMetricsMonitor;
@@ -73,7 +74,7 @@ import org.apache.druid.segment.realtime.FireDepartmentMetrics;
 import org.apache.druid.segment.realtime.appenderator.Appenderator;
 import org.apache.druid.segment.realtime.appenderator.AppenderatorConfig;
 import org.apache.druid.segment.realtime.appenderator.AppenderatorDriverAddResult;
-import org.apache.druid.segment.realtime.appenderator.Appenderators;
+import org.apache.druid.segment.realtime.appenderator.AppenderatorsManager;
 import org.apache.druid.segment.realtime.appenderator.BaseAppenderatorDriver;
 import org.apache.druid.segment.realtime.appenderator.BatchAppenderatorDriver;
 import org.apache.druid.segment.realtime.appenderator.SegmentAllocator;
@@ -171,6 +172,18 @@ public class IndexTask extends AbstractBatchIndexTask implements ChatHandler
   @JsonIgnore
   private String errorMsg;
 
+  @JsonIgnore
+  private final AppenderatorsManager appenderatorsManager;
+
+  @JsonIgnore
+  private Thread runThread;
+
+  @JsonIgnore
+  private boolean stopped = false;
+
+  @JsonIgnore
+  private Appenderator appenderator;
+
   @JsonCreator
   public IndexTask(
       @JsonProperty("id") final String id,
@@ -179,7 +192,8 @@ public class IndexTask extends AbstractBatchIndexTask implements ChatHandler
       @JsonProperty("context") final Map<String, Object> context,
       @JacksonInject AuthorizerMapper authorizerMapper,
       @JacksonInject ChatHandlerProvider chatHandlerProvider,
-      @JacksonInject RowIngestionMetersFactory rowIngestionMetersFactory
+      @JacksonInject RowIngestionMetersFactory rowIngestionMetersFactory,
+      @JacksonInject AppenderatorsManager appenderatorsManager
   )
   {
     this(
@@ -191,7 +205,8 @@ public class IndexTask extends AbstractBatchIndexTask implements ChatHandler
         context,
         authorizerMapper,
         chatHandlerProvider,
-        rowIngestionMetersFactory
+        rowIngestionMetersFactory,
+        appenderatorsManager
     );
   }
 
@@ -204,7 +219,8 @@ public class IndexTask extends AbstractBatchIndexTask implements ChatHandler
       Map<String, Object> context,
       AuthorizerMapper authorizerMapper,
       ChatHandlerProvider chatHandlerProvider,
-      RowIngestionMetersFactory rowIngestionMetersFactory
+      RowIngestionMetersFactory rowIngestionMetersFactory,
+      AppenderatorsManager appenderatorsManager
   )
   {
     super(
@@ -229,6 +245,7 @@ public class IndexTask extends AbstractBatchIndexTask implements ChatHandler
     this.ingestionState = IngestionState.NOT_STARTED;
     this.determinePartitionsMeters = rowIngestionMetersFactory.createRowIngestionMeters();
     this.buildSegmentsMeters = rowIngestionMetersFactory.createRowIngestionMeters();
+    this.appenderatorsManager = appenderatorsManager;
   }
 
   @Override
@@ -403,6 +420,14 @@ public class IndexTask extends AbstractBatchIndexTask implements ChatHandler
   @Override
   public TaskStatus run(final TaskToolbox toolbox)
   {
+    synchronized (this) {
+      if (stopped) {
+        return TaskStatus.failure(getId());
+      } else {
+        runThread = Thread.currentThread();
+      }
+    }
+
     try {
       if (chatHandlerProvider.isPresent()) {
         log.info("Found chat handler of class[%s]", chatHandlerProvider.get().getClass().getName());
@@ -478,7 +503,7 @@ public class IndexTask extends AbstractBatchIndexTask implements ChatHandler
     catch (Exception e) {
       log.error(e, "Encountered exception in %s.", ingestionState);
       errorMsg = Throwables.getStackTraceAsString(e);
-      toolbox.getTaskReportFileWriter().write(getTaskCompletionReports());
+      toolbox.getTaskReportFileWriter().write(getId(), getTaskCompletionReports());
       return TaskStatus.failure(
           getId(),
           errorMsg
@@ -492,6 +517,23 @@ public class IndexTask extends AbstractBatchIndexTask implements ChatHandler
     }
   }
 
+  @Override
+  public void stopGracefully(TaskConfig taskConfig)
+  {
+    synchronized (this) {
+      stopped = true;
+      // Nothing else to do for native batch except terminate
+      if (ingestionState != IngestionState.COMPLETED) {
+        if (appenderator != null) {
+          appenderator.closeNow();
+        }
+        if (runThread != null) {
+          runThread.interrupt();
+        }
+      }
+    }
+  }
+
   private Map<String, TaskReport> getTaskCompletionReports()
   {
     return TaskReport.buildTaskReports(
@@ -882,6 +924,8 @@ public class IndexTask extends AbstractBatchIndexTask implements ChatHandler
         final BatchAppenderatorDriver driver = newDriver(appenderator, toolbox, segmentAllocator);
         final Firehose firehose = firehoseFactory.connect(dataSchema.getParser(), firehoseTempDir)
     ) {
+      this.appenderator = appenderator;
+
       driver.startJob();
 
       while (firehose.hasMore()) {
@@ -950,7 +994,7 @@ public class IndexTask extends AbstractBatchIndexTask implements ChatHandler
       if (published == null) {
         log.error("Failed to publish segments, aborting!");
         errorMsg = "Failed to publish segments.";
-        toolbox.getTaskReportFileWriter().write(getTaskCompletionReports());
+        toolbox.getTaskReportFileWriter().write(getId(), getTaskCompletionReports());
         return TaskStatus.failure(
             getId(),
             errorMsg
@@ -964,7 +1008,7 @@ public class IndexTask extends AbstractBatchIndexTask implements ChatHandler
         );
         log.info("Published segments: %s", Lists.transform(published.getSegments(), DataSegment::getId));
 
-        toolbox.getTaskReportFileWriter().write(getTaskCompletionReports());
+        toolbox.getTaskReportFileWriter().write(getId(), getTaskCompletionReports());
         return TaskStatus.success(getId());
       }
     }
@@ -1046,14 +1090,15 @@ public class IndexTask extends AbstractBatchIndexTask implements ChatHandler
     }
   }
 
-  private static Appenderator newAppenderator(
+  private Appenderator newAppenderator(
       FireDepartmentMetrics metrics,
       TaskToolbox toolbox,
       DataSchema dataSchema,
       IndexTuningConfig tuningConfig
   )
   {
-    return Appenderators.createOffline(
+    return appenderatorsManager.createOfflineAppenderatorForTask(
+        getId(),
         dataSchema,
         tuningConfig.withBasePersistDirectory(toolbox.getPersistDir()),
         metrics,
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/RealtimeIndexTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/RealtimeIndexTask.java
index ce2b1c6..c0c7c13 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/RealtimeIndexTask.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/RealtimeIndexTask.java
@@ -518,6 +518,14 @@ public class RealtimeIndexTask extends AbstractTask
       catch (Exception e) {
         throw new RuntimeException(e);
       }
+    } else {
+      synchronized (this) {
+        if (!gracefullyStopped) {
+          // If task restore is not enabled, just interrupt immediately.
+          gracefullyStopped = true;
+          runThread.interrupt();
+        }
+      }
     }
   }
 
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSubTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSubTask.java
index 8d98df8..ae40153 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSubTask.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSubTask.java
@@ -37,6 +37,7 @@ import org.apache.druid.indexing.common.TaskToolbox;
 import org.apache.druid.indexing.common.actions.SegmentAllocateAction;
 import org.apache.druid.indexing.common.actions.SurrogateAction;
 import org.apache.druid.indexing.common.actions.TaskActionClient;
+import org.apache.druid.indexing.common.config.TaskConfig;
 import org.apache.druid.indexing.common.task.AbstractBatchIndexTask;
 import org.apache.druid.indexing.common.task.ClientBasedTaskInfoProvider;
 import org.apache.druid.indexing.common.task.IndexTask;
@@ -63,7 +64,7 @@ import org.apache.druid.segment.realtime.FireDepartmentMetrics;
 import org.apache.druid.segment.realtime.RealtimeMetricsMonitor;
 import org.apache.druid.segment.realtime.appenderator.Appenderator;
 import org.apache.druid.segment.realtime.appenderator.AppenderatorDriverAddResult;
-import org.apache.druid.segment.realtime.appenderator.Appenderators;
+import org.apache.druid.segment.realtime.appenderator.AppenderatorsManager;
 import org.apache.druid.segment.realtime.appenderator.BaseAppenderatorDriver;
 import org.apache.druid.segment.realtime.appenderator.BatchAppenderatorDriver;
 import org.apache.druid.segment.realtime.appenderator.SegmentAllocator;
@@ -105,6 +106,11 @@ public class ParallelIndexSubTask extends AbstractBatchIndexTask
   private final String supervisorTaskId;
   private final IndexingServiceClient indexingServiceClient;
   private final IndexTaskClientFactory<ParallelIndexTaskClient> taskClientFactory;
+  private final AppenderatorsManager appenderatorsManager;
+
+  private Appenderator appenderator;
+  private Thread runThread;
+  private boolean stopped = false;
 
   @JsonCreator
   public ParallelIndexSubTask(
@@ -117,7 +123,8 @@ public class ParallelIndexSubTask extends AbstractBatchIndexTask
       @JsonProperty("spec") final ParallelIndexIngestionSpec ingestionSchema,
       @JsonProperty("context") final Map<String, Object> context,
       @JacksonInject IndexingServiceClient indexingServiceClient,
-      @JacksonInject IndexTaskClientFactory<ParallelIndexTaskClient> taskClientFactory
+      @JacksonInject IndexTaskClientFactory<ParallelIndexTaskClient> taskClientFactory,
+      @JacksonInject AppenderatorsManager appenderatorsManager
   )
   {
     super(
@@ -137,6 +144,7 @@ public class ParallelIndexSubTask extends AbstractBatchIndexTask
     this.supervisorTaskId = supervisorTaskId;
     this.indexingServiceClient = indexingServiceClient;
     this.taskClientFactory = taskClientFactory;
+    this.appenderatorsManager = appenderatorsManager;
   }
 
   @Override
@@ -189,6 +197,14 @@ public class ParallelIndexSubTask extends AbstractBatchIndexTask
   @Override
   public TaskStatus run(final TaskToolbox toolbox) throws Exception
   {
+    synchronized (this) {
+      if (stopped) {
+        return TaskStatus.failure(getId());
+      } else {
+        runThread = Thread.currentThread();
+      }
+    }
+
     final FirehoseFactory firehoseFactory = ingestionSchema.getIOConfig().getFirehoseFactory();
 
     final File firehoseTempDir = toolbox.getFirehoseTemporaryDir();
@@ -399,6 +415,7 @@ public class ParallelIndexSubTask extends AbstractBatchIndexTask
         final BatchAppenderatorDriver driver = newDriver(appenderator, toolbox, segmentAllocator);
         final Firehose firehose = firehoseFactory.connect(dataSchema.getParser(), firehoseTempDir)
     ) {
+      this.appenderator = appenderator;
       driver.startJob();
 
       final Set<DataSegment> pushedSegments = new HashSet<>();
@@ -468,6 +485,7 @@ public class ParallelIndexSubTask extends AbstractBatchIndexTask
     }
   }
 
+
   private static Granularity findSegmentGranularity(GranularitySpec granularitySpec)
   {
     if (granularitySpec instanceof UniformGranularitySpec) {
@@ -477,14 +495,15 @@ public class ParallelIndexSubTask extends AbstractBatchIndexTask
     }
   }
 
-  private static Appenderator newAppenderator(
+  private Appenderator newAppenderator(
       FireDepartmentMetrics metrics,
       TaskToolbox toolbox,
       DataSchema dataSchema,
       ParallelIndexTuningConfig tuningConfig
   )
   {
-    return Appenderators.createOffline(
+    return appenderatorsManager.createOfflineAppenderatorForTask(
+        getId(),
         dataSchema,
         tuningConfig.withBasePersistDirectory(toolbox.getPersistDir()),
         metrics,
@@ -508,4 +527,18 @@ public class ParallelIndexSubTask extends AbstractBatchIndexTask
         toolbox.getDataSegmentKiller()
     );
   }
+
+  @Override
+  public void stopGracefully(TaskConfig taskConfig)
+  {
+    synchronized (this) {
+      stopped = true;
+      if (appenderator != null) {
+        appenderator.closeNow();
+      }
+      if (runThread != null) {
+        runThread.interrupt();
+      }
+    }
+  }
 }
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSubTaskSpec.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSubTaskSpec.java
index ee64f78..1393865 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSubTaskSpec.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSubTaskSpec.java
@@ -22,6 +22,7 @@ package org.apache.druid.indexing.common.task.batch.parallel;
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import org.apache.druid.data.input.InputSplit;
+import org.apache.druid.segment.realtime.appenderator.DummyForInjectionAppenderatorsManager;
 
 import java.util.Map;
 
@@ -61,7 +62,8 @@ class ParallelIndexSubTaskSpec extends SubTaskSpec<ParallelIndexSubTask>
         getIngestionSpec(),
         getContext(),
         null,
-        null
+        null,
+        new DummyForInjectionAppenderatorsManager()
     );
   }
 }
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java
index 8765884..1f1b571 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java
@@ -55,6 +55,7 @@ import org.apache.druid.java.util.common.logger.Logger;
 import org.apache.druid.segment.indexing.TuningConfig;
 import org.apache.druid.segment.indexing.granularity.ArbitraryGranularitySpec;
 import org.apache.druid.segment.indexing.granularity.GranularitySpec;
+import org.apache.druid.segment.realtime.appenderator.AppenderatorsManager;
 import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec;
 import org.apache.druid.segment.realtime.firehose.ChatHandler;
 import org.apache.druid.segment.realtime.firehose.ChatHandlerProvider;
@@ -106,10 +107,14 @@ public class ParallelIndexSupervisorTask extends AbstractBatchIndexTask implemen
   private final ChatHandlerProvider chatHandlerProvider;
   private final AuthorizerMapper authorizerMapper;
   private final RowIngestionMetersFactory rowIngestionMetersFactory;
+  private final AppenderatorsManager appenderatorsManager;
 
   private final ConcurrentHashMap<Interval, AtomicInteger> partitionNumCountersPerInterval = new ConcurrentHashMap<>();
 
   private volatile ParallelIndexTaskRunner runner;
+  private volatile IndexTask sequentialIndexTask;
+
+  private boolean stopped = false;
 
   // toolbox is initlized when run() is called, and can be used for processing HTTP endpoint requests.
   private volatile TaskToolbox toolbox;
@@ -123,7 +128,8 @@ public class ParallelIndexSupervisorTask extends AbstractBatchIndexTask implemen
       @JacksonInject @Nullable IndexingServiceClient indexingServiceClient, // null in overlords
       @JacksonInject @Nullable ChatHandlerProvider chatHandlerProvider,     // null in overlords
       @JacksonInject AuthorizerMapper authorizerMapper,
-      @JacksonInject RowIngestionMetersFactory rowIngestionMetersFactory
+      @JacksonInject RowIngestionMetersFactory rowIngestionMetersFactory,
+      @JacksonInject AppenderatorsManager appenderatorsManager
   )
   {
     super(
@@ -146,6 +152,7 @@ public class ParallelIndexSupervisorTask extends AbstractBatchIndexTask implemen
     this.chatHandlerProvider = chatHandlerProvider;
     this.authorizerMapper = authorizerMapper;
     this.rowIngestionMetersFactory = rowIngestionMetersFactory;
+    this.appenderatorsManager = appenderatorsManager;
 
     if (ingestionSchema.getTuningConfig().getMaxSavedParseExceptions()
         != TuningConfig.DEFAULT_MAX_SAVED_PARSE_EXCEPTIONS) {
@@ -271,8 +278,13 @@ public class ParallelIndexSupervisorTask extends AbstractBatchIndexTask implemen
   @Override
   public void stopGracefully(TaskConfig taskConfig)
   {
+    synchronized (this) {
+      stopped = true;
+    }
     if (runner != null) {
       runner.stopGracefully();
+    } else if (sequentialIndexTask != null) {
+      sequentialIndexTask.stopGracefully(taskConfig);
     }
   }
 
@@ -328,29 +340,40 @@ public class ParallelIndexSupervisorTask extends AbstractBatchIndexTask implemen
 
   private TaskStatus runParallel(TaskToolbox toolbox) throws Exception
   {
-    createRunner(toolbox);
+    synchronized (this) {
+      if (stopped) {
+        return TaskStatus.failure(getId());
+      }
+      createRunner(toolbox);
+    }
     return TaskStatus.fromCode(getId(), Preconditions.checkNotNull(runner, "runner").run());
   }
 
   private TaskStatus runSequential(TaskToolbox toolbox) throws Exception
   {
-    final IndexTask indexTask = new IndexTask(
-        getId(),
-        getGroupId(),
-        getTaskResource(),
-        getDataSource(),
-        new IndexIngestionSpec(
-            getIngestionSchema().getDataSchema(),
-            getIngestionSchema().getIOConfig(),
-            convertToIndexTuningConfig(getIngestionSchema().getTuningConfig())
-        ),
-        getContext(),
-        authorizerMapper,
-        chatHandlerProvider,
-        rowIngestionMetersFactory
-    );
-    if (indexTask.isReady(toolbox.getTaskActionClient())) {
-      return indexTask.run(toolbox);
+    synchronized (this) {
+      if (stopped) {
+        return TaskStatus.failure(getId());
+      }
+      sequentialIndexTask = new IndexTask(
+          getId(),
+          getGroupId(),
+          getTaskResource(),
+          getDataSource(),
+          new IndexIngestionSpec(
+              getIngestionSchema().getDataSchema(),
+              getIngestionSchema().getIOConfig(),
+              convertToIndexTuningConfig(getIngestionSchema().getTuningConfig())
+          ),
+          getContext(),
+          authorizerMapper,
+          chatHandlerProvider,
+          rowIngestionMetersFactory,
+          appenderatorsManager
+      );
+    }
+    if (sequentialIndexTask.isReady(toolbox.getTaskActionClient())) {
+      return sequentialIndexTask.run(toolbox);
     } else {
       return TaskStatus.failure(getId());
     }
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/BaseRestorableTaskRunner.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/BaseRestorableTaskRunner.java
new file mode 100644
index 0000000..4c9910f
--- /dev/null
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/BaseRestorableTaskRunner.java
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.overlord;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+import com.google.common.io.Files;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.errorprone.annotations.concurrent.GuardedBy;
+import org.apache.druid.indexer.RunnerTaskState;
+import org.apache.druid.indexer.TaskStatus;
+import org.apache.druid.indexing.common.config.TaskConfig;
+import org.apache.druid.indexing.common.task.Task;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.Pair;
+import org.apache.druid.java.util.emitter.EmittingLogger;
+
+import javax.annotation.Nullable;
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.Executor;
+
+/**
+ * Base class for {@link ForkingTaskRunner} and {@link ThreadingTaskRunner} which support task restoration.
+ */
+public abstract class BaseRestorableTaskRunner<WorkItemType extends TaskRunnerWorkItem> implements TaskRunner
+{
+  protected static final EmittingLogger LOG = new EmittingLogger(BaseRestorableTaskRunner.class);
+  protected static final String TASK_RESTORE_FILENAME = "restore.json";
+
+  protected final CopyOnWriteArrayList<Pair<TaskRunnerListener, Executor>> listeners = new CopyOnWriteArrayList<>();
+
+  /** Writes must be synchronized. This is only a ConcurrentMap so "informational" reads can occur without waiting. */
+  protected final ConcurrentHashMap<String, WorkItemType> tasks = new ConcurrentHashMap<>();
+  protected final ObjectMapper jsonMapper;
+  protected final TaskConfig taskConfig;
+
+  public BaseRestorableTaskRunner(
+      ObjectMapper jsonMapper,
+      TaskConfig taskConfig
+  )
+  {
+    this.jsonMapper = jsonMapper;
+    this.taskConfig = taskConfig;
+  }
+
+  @Override
+  public List<Pair<Task, ListenableFuture<TaskStatus>>> restore()
+  {
+    final File restoreFile = getRestoreFile();
+    final TaskRestoreInfo taskRestoreInfo;
+    if (restoreFile.exists()) {
+      try {
+        taskRestoreInfo = jsonMapper.readValue(restoreFile, TaskRestoreInfo.class);
+      }
+      catch (Exception e) {
+        LOG.error(e, "Failed to read restorable tasks from file[%s]. Skipping restore.", restoreFile);
+        return ImmutableList.of();
+      }
+    } else {
+      return ImmutableList.of();
+    }
+
+    final List<Pair<Task, ListenableFuture<TaskStatus>>> retVal = new ArrayList<>();
+    for (final String taskId : taskRestoreInfo.getRunningTasks()) {
+      try {
+        final File taskFile = new File(taskConfig.getTaskDir(taskId), "task.json");
+        final Task task = jsonMapper.readValue(taskFile, Task.class);
+
+        if (!task.getId().equals(taskId)) {
+          throw new ISE("WTF?! Task[%s] restore file had wrong id[%s].", taskId, task.getId());
+        }
+
+        if (taskConfig.isRestoreTasksOnRestart() && task.canRestore()) {
+          LOG.info("Restoring task[%s].", task.getId());
+          retVal.add(Pair.of(task, run(task)));
+        }
+      }
+      catch (Exception e) {
+        LOG.warn(e, "Failed to restore task[%s]. Trying to restore other tasks.", taskId);
+      }
+    }
+
+    LOG.info("Restored %,d tasks.", retVal.size());
+
+    return retVal;
+  }
+
+  @Override
+  public void registerListener(TaskRunnerListener listener, Executor executor)
+  {
+    for (Pair<TaskRunnerListener, Executor> pair : listeners) {
+      if (pair.lhs.getListenerId().equals(listener.getListenerId())) {
+        throw new ISE("Listener [%s] already registered", listener.getListenerId());
+      }
+    }
+
+    final Pair<TaskRunnerListener, Executor> listenerPair = Pair.of(listener, executor);
+
+    synchronized (tasks) {
+      for (TaskRunnerWorkItem item : tasks.values()) {
+        TaskRunnerUtils.notifyLocationChanged(ImmutableList.of(listenerPair), item.getTaskId(), item.getLocation());
+      }
+
+      listeners.add(listenerPair);
+      LOG.info("Registered listener [%s]", listener.getListenerId());
+    }
+  }
+
+  @Override
+  public void unregisterListener(String listenerId)
+  {
+    for (Pair<TaskRunnerListener, Executor> pair : listeners) {
+      if (pair.lhs.getListenerId().equals(listenerId)) {
+        listeners.remove(pair);
+        LOG.info("Unregistered listener [%s]", listenerId);
+        return;
+      }
+    }
+  }
+
+  @Override
+  public abstract Collection<TaskRunnerWorkItem> getRunningTasks();
+
+  @Override
+  public abstract Collection<TaskRunnerWorkItem> getPendingTasks();
+
+  @Nullable
+  @Override
+  public abstract RunnerTaskState getRunnerTaskState(String taskId);
+
+  @Override
+  public Collection<TaskRunnerWorkItem> getKnownTasks()
+  {
+    synchronized (tasks) {
+      return Lists.newArrayList(tasks.values());
+    }
+  }
+
+  /**
+   * Save running tasks to a file, so they can potentially be restored on next startup. Suppresses exceptions that
+   * occur while saving.
+   */
+  @GuardedBy("tasks")
+  protected void saveRunningTasks()
+  {
+    final File restoreFile = getRestoreFile();
+    final List<String> theTasks = new ArrayList<>();
+    for (TaskRunnerWorkItem forkingTaskRunnerWorkItem : tasks.values()) {
+      theTasks.add(forkingTaskRunnerWorkItem.getTaskId());
+    }
+
+    try {
+      Files.createParentDirs(restoreFile);
+      jsonMapper.writeValue(restoreFile, new TaskRestoreInfo(theTasks));
+    }
+    catch (Exception e) {
+      LOG.warn(e, "Failed to save tasks to restore file[%s]. Skipping this save.", restoreFile);
+    }
+  }
+
+  protected File getRestoreFile()
+  {
+    return new File(taskConfig.getBaseTaskDir(), TASK_RESTORE_FILENAME);
+  }
+
+  protected static class TaskRestoreInfo
+  {
+    @JsonProperty
+    private final List<String> runningTasks;
+
+    @JsonCreator
+    public TaskRestoreInfo(
+        @JsonProperty("runningTasks") List<String> runningTasks
+    )
+    {
+      this.runningTasks = runningTasks;
+    }
+
+    public List<String> getRunningTasks()
+    {
+      return runningTasks;
+    }
+  }
+}
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ForkingTaskRunner.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ForkingTaskRunner.java
index 88da445..acf234a 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ForkingTaskRunner.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ForkingTaskRunner.java
@@ -19,8 +19,6 @@
 
 package org.apache.druid.indexing.overlord;
 
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonProperty;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.base.CharMatcher;
 import com.google.common.base.Joiner;
@@ -30,7 +28,6 @@ import com.google.common.base.Splitter;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Iterables;
-import com.google.common.collect.Lists;
 import com.google.common.io.ByteSink;
 import com.google.common.io.ByteSource;
 import com.google.common.io.ByteStreams;
@@ -39,7 +36,6 @@ import com.google.common.io.Files;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.ListeningExecutorService;
 import com.google.common.util.concurrent.MoreExecutors;
-import com.google.errorprone.annotations.concurrent.GuardedBy;
 import com.google.inject.Inject;
 import org.apache.commons.io.FileUtils;
 import org.apache.druid.guice.annotations.Self;
@@ -55,7 +51,6 @@ import org.apache.druid.indexing.worker.config.WorkerConfig;
 import org.apache.druid.java.util.common.DateTimes;
 import org.apache.druid.java.util.common.IOE;
 import org.apache.druid.java.util.common.ISE;
-import org.apache.druid.java.util.common.Pair;
 import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.java.util.common.concurrent.Execs;
 import org.apache.druid.java.util.common.io.Closer;
@@ -83,31 +78,23 @@ import java.util.Properties;
 import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.Callable;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.Executor;
 import java.util.concurrent.TimeUnit;
 
 /**
  * Runs tasks in separate processes using the "internal peon" verb.
  */
-public class ForkingTaskRunner implements TaskRunner, TaskLogStreamer
+public class ForkingTaskRunner
+    extends BaseRestorableTaskRunner<ForkingTaskRunner.ForkingTaskRunnerWorkItem>
+    implements TaskLogStreamer
 {
-  private static final EmittingLogger log = new EmittingLogger(ForkingTaskRunner.class);
+  private static final EmittingLogger LOGGER = new EmittingLogger(ForkingTaskRunner.class);
   private static final String CHILD_PROPERTY_PREFIX = "druid.indexer.fork.property.";
-  private static final String TASK_RESTORE_FILENAME = "restore.json";
   private final ForkingTaskRunnerConfig config;
-  private final TaskConfig taskConfig;
   private final Properties props;
   private final TaskLogPusher taskLogPusher;
   private final DruidNode node;
   private final ListeningExecutorService exec;
-  private final ObjectMapper jsonMapper;
   private final PortFinder portFinder;
-  private final CopyOnWriteArrayList<Pair<TaskRunnerListener, Executor>> listeners = new CopyOnWriteArrayList<>();
-
-  /** Writes must be synchronized. This is only a ConcurrentMap so "informational" reads can occur without waiting. */
-  private final ConcurrentHashMap<String, ForkingTaskRunnerWorkItem> tasks = new ConcurrentHashMap<>();
 
   private volatile boolean stopping = false;
 
@@ -122,11 +109,10 @@ public class ForkingTaskRunner implements TaskRunner, TaskLogStreamer
       @Self DruidNode node
   )
   {
+    super(jsonMapper, taskConfig);
     this.config = config;
-    this.taskConfig = taskConfig;
     this.props = props;
     this.taskLogPusher = taskLogPusher;
-    this.jsonMapper = jsonMapper;
     this.node = node;
     this.portFinder = new PortFinder(config.getStartPort(), config.getEndPort(), config.getPorts());
     this.exec = MoreExecutors.listeningDecorator(
@@ -135,81 +121,6 @@ public class ForkingTaskRunner implements TaskRunner, TaskLogStreamer
   }
 
   @Override
-  public List<Pair<Task, ListenableFuture<TaskStatus>>> restore()
-  {
-    final File restoreFile = getRestoreFile();
-    final TaskRestoreInfo taskRestoreInfo;
-    if (restoreFile.exists()) {
-      try {
-        taskRestoreInfo = jsonMapper.readValue(restoreFile, TaskRestoreInfo.class);
-      }
-      catch (Exception e) {
-        log.error(e, "Failed to read restorable tasks from file[%s]. Skipping restore.", restoreFile);
-        return ImmutableList.of();
-      }
-    } else {
-      return ImmutableList.of();
-    }
-
-    final List<Pair<Task, ListenableFuture<TaskStatus>>> retVal = new ArrayList<>();
-    for (final String taskId : taskRestoreInfo.getRunningTasks()) {
-      try {
-        final File taskFile = new File(taskConfig.getTaskDir(taskId), "task.json");
-        final Task task = jsonMapper.readValue(taskFile, Task.class);
-
-        if (!task.getId().equals(taskId)) {
-          throw new ISE("WTF?! Task[%s] restore file had wrong id[%s].", taskId, task.getId());
-        }
-
-        if (taskConfig.isRestoreTasksOnRestart() && task.canRestore()) {
-          log.info("Restoring task[%s].", task.getId());
-          retVal.add(Pair.of(task, run(task)));
-        }
-      }
-      catch (Exception e) {
-        log.warn(e, "Failed to restore task[%s]. Trying to restore other tasks.", taskId);
-      }
-    }
-
-    log.info("Restored %,d tasks.", retVal.size());
-
-    return retVal;
-  }
-
-  @Override
-  public void registerListener(TaskRunnerListener listener, Executor executor)
-  {
-    for (Pair<TaskRunnerListener, Executor> pair : listeners) {
-      if (pair.lhs.getListenerId().equals(listener.getListenerId())) {
-        throw new ISE("Listener [%s] already registered", listener.getListenerId());
-      }
-    }
-
-    final Pair<TaskRunnerListener, Executor> listenerPair = Pair.of(listener, executor);
-
-    synchronized (tasks) {
-      for (ForkingTaskRunnerWorkItem item : tasks.values()) {
-        TaskRunnerUtils.notifyLocationChanged(ImmutableList.of(listenerPair), item.getTaskId(), item.getLocation());
-      }
-
-      listeners.add(listenerPair);
-      log.info("Registered listener [%s]", listener.getListenerId());
-    }
-  }
-
-  @Override
-  public void unregisterListener(String listenerId)
-  {
-    for (Pair<TaskRunnerListener, Executor> pair : listeners) {
-      if (pair.lhs.getListenerId().equals(listenerId)) {
-        listeners.remove(pair);
-        log.info("Unregistered listener [%s]", listenerId);
-        return;
-      }
-    }
-  }
-
-  @Override
   public ListenableFuture<TaskStatus> run(final Task task)
   {
     synchronized (tasks) {
@@ -257,19 +168,19 @@ public class ForkingTaskRunner implements TaskRunner, TaskLogStreamer
                       synchronized (tasks) {
                         final ForkingTaskRunnerWorkItem taskWorkItem = tasks.get(task.getId());
 
-                        if (taskWorkItem.shutdown) {
-                          throw new IllegalStateException("Task has been shut down!");
-                        }
-
                         if (taskWorkItem == null) {
-                          log.makeAlert("WTF?! TaskInfo disappeared!").addData("task", task.getId()).emit();
+                          LOGGER.makeAlert("WTF?! TaskInfo disappeared!").addData("task", task.getId()).emit();
                           throw new ISE("TaskInfo disappeared for task[%s]!", task.getId());
                         }
 
+                        if (taskWorkItem.shutdown) {
+                          throw new IllegalStateException("Task has been shut down!");
+                        }
+
                         if (taskWorkItem.processHolder != null) {
-                          log.makeAlert("WTF?! TaskInfo already has a processHolder")
-                            .addData("task", task.getId())
-                            .emit();
+                          LOGGER.makeAlert("WTF?! TaskInfo already has a processHolder")
+                                .addData("task", task.getId())
+                                .emit();
                           throw new ISE("TaskInfo already has processHolder for task[%s]!", task.getId());
                         }
 
@@ -403,7 +314,7 @@ public class ForkingTaskRunner implements TaskRunner, TaskLogStreamer
                           jsonMapper.writeValue(taskFile, task);
                         }
 
-                        log.info("Running command: %s", Joiner.on(" ").join(command));
+                        LOGGER.info("Running command: %s", Joiner.on(" ").join(command));
                         taskWorkItem.processHolder = new ProcessHolder(
                           new ProcessBuilder(ImmutableList.copyOf(command)).redirectErrorStream(true).start(),
                           logFile,
@@ -423,7 +334,7 @@ public class ForkingTaskRunner implements TaskRunner, TaskLogStreamer
                           TaskStatus.running(task.getId())
                       );
 
-                      log.info("Logging task %s output to: %s", task.getId(), logFile);
+                      LOGGER.info("Logging task %s output to: %s", task.getId(), logFile);
                       boolean runFailed = true;
 
                       final ByteSink logSink = Files.asByteSink(logFile, FileWriteMode.APPEND);
@@ -435,7 +346,7 @@ public class ForkingTaskRunner implements TaskRunner, TaskLogStreamer
                       try (final OutputStream toLogfile = logSink.openStream()) {
                         ByteStreams.copy(processHolder.process.getInputStream(), toLogfile);
                         final int statusCode = processHolder.process.waitFor();
-                        log.info("Process exited with status[%d] for task: %s", statusCode, task.getId());
+                        LOGGER.info("Process exited with status[%d] for task: %s", statusCode, task.getId());
                         if (statusCode == 0) {
                           runFailed = false;
                         }
@@ -469,7 +380,7 @@ public class ForkingTaskRunner implements TaskRunner, TaskLogStreamer
                     }
                   }
                   catch (Throwable t) {
-                    log.info(t, "Exception caught during execution");
+                    LOGGER.info(t, "Exception caught during execution");
                     throw new RuntimeException(t);
                   }
                   finally {
@@ -493,19 +404,19 @@ public class ForkingTaskRunner implements TaskRunner, TaskLogStreamer
 
                       try {
                         if (!stopping && taskDir.exists()) {
-                          log.info("Removing task directory: %s", taskDir);
+                          LOGGER.info("Removing task directory: %s", taskDir);
                           FileUtils.deleteDirectory(taskDir);
                         }
                       }
                       catch (Exception e) {
-                        log.makeAlert(e, "Failed to delete task directory")
-                          .addData("taskDir", taskDir.toString())
-                          .addData("task", task.getId())
-                          .emit();
+                        LOGGER.makeAlert(e, "Failed to delete task directory")
+                              .addData("taskDir", taskDir.toString())
+                              .addData("task", task.getId())
+                              .emit();
                       }
                     }
                     catch (Exception e) {
-                      log.error(e, "Suppressing exception caught while cleaning up task");
+                      LOGGER.error(e, "Suppressing exception caught while cleaning up task");
                     }
                   }
                 }
@@ -535,25 +446,25 @@ public class ForkingTaskRunner implements TaskRunner, TaskLogStreamer
     final long timeout = new Interval(start, taskConfig.getGracefulShutdownTimeout()).toDurationMillis();
 
     // Things should be terminating now. Wait for it to happen so logs can be uploaded and all that good stuff.
-    log.info("Waiting up to %,dms for shutdown.", timeout);
+    LOGGER.info("Waiting up to %,dms for shutdown.", timeout);
     if (timeout > 0) {
       try {
         final boolean terminated = exec.awaitTermination(timeout, TimeUnit.MILLISECONDS);
         final long elapsed = System.currentTimeMillis() - start.getMillis();
         if (terminated) {
-          log.info("Finished stopping in %,dms.", elapsed);
+          LOGGER.info("Finished stopping in %,dms.", elapsed);
         } else {
           final Set<String> stillRunning;
           synchronized (tasks) {
             stillRunning = ImmutableSet.copyOf(tasks.keySet());
           }
 
-          log.makeAlert("Failed to stop forked tasks")
-             .addData("stillRunning", stillRunning)
-             .addData("elapsed", elapsed)
-             .emit();
+          LOGGER.makeAlert("Failed to stop forked tasks")
+                .addData("stillRunning", stillRunning)
+                .addData("elapsed", elapsed)
+                .emit();
 
-          log.warn(
+          LOGGER.warn(
               "Executor failed to stop after %,dms, not waiting for it! Tasks still running: [%s]",
               elapsed,
               Joiner.on("; ").join(stillRunning)
@@ -561,25 +472,25 @@ public class ForkingTaskRunner implements TaskRunner, TaskLogStreamer
         }
       }
       catch (InterruptedException e) {
-        log.warn(e, "Interrupted while waiting for executor to finish.");
+        LOGGER.warn(e, "Interrupted while waiting for executor to finish.");
         Thread.currentThread().interrupt();
       }
     } else {
-      log.warn("Ran out of time, not waiting for executor to finish!");
+      LOGGER.warn("Ran out of time, not waiting for executor to finish!");
     }
   }
 
   @Override
   public void shutdown(final String taskid, String reason)
   {
-    log.info("Shutdown [%s] because: [%s]", taskid, reason);
+    LOGGER.info("Shutdown [%s] because: [%s]", taskid, reason);
     final ForkingTaskRunnerWorkItem taskInfo;
 
     synchronized (tasks) {
       taskInfo = tasks.get(taskid);
 
       if (taskInfo == null) {
-        log.info("Ignoring request to cancel unknown task: %s", taskid);
+        LOGGER.info("Ignoring request to cancel unknown task: %s", taskid);
         return;
       }
 
@@ -617,14 +528,6 @@ public class ForkingTaskRunner implements TaskRunner, TaskLogStreamer
     }
   }
 
-  @Override
-  public Collection<TaskRunnerWorkItem> getKnownTasks()
-  {
-    synchronized (tasks) {
-      return Lists.newArrayList(tasks.values());
-    }
-  }
-
   @Nullable
   @Override
   public RunnerTaskState getRunnerTaskState(String taskId)
@@ -682,28 +585,6 @@ public class ForkingTaskRunner implements TaskRunner, TaskLogStreamer
   }
 
   /**
-   * Save running tasks to a file, so they can potentially be restored on next startup. Suppresses exceptions that occur
-   * while saving.
-   */
-  @GuardedBy("tasks")
-  private void saveRunningTasks()
-  {
-    final File restoreFile = getRestoreFile();
-    final List<String> theTasks = new ArrayList<>();
-    for (ForkingTaskRunnerWorkItem forkingTaskRunnerWorkItem : tasks.values()) {
-      theTasks.add(forkingTaskRunnerWorkItem.getTaskId());
-    }
-
-    try {
-      Files.createParentDirs(restoreFile);
-      jsonMapper.writeValue(restoreFile, new TaskRestoreInfo(theTasks));
-    }
-    catch (Exception e) {
-      log.warn(e, "Failed to save tasks to restore file[%s]. Skipping this save.", restoreFile);
-    }
-  }
-
-  /**
    * Close task output stream (input stream of process) sending EOF telling process to terminate, destroying the process
    * if an exception is encountered.
    */
@@ -711,42 +592,18 @@ public class ForkingTaskRunner implements TaskRunner, TaskLogStreamer
   {
     if (taskInfo.processHolder != null) {
       // Will trigger normal failure mechanisms due to process exit
-      log.info("Closing output stream to task[%s].", taskInfo.getTask().getId());
+      LOGGER.info("Closing output stream to task[%s].", taskInfo.getTask().getId());
       try {
         taskInfo.processHolder.process.getOutputStream().close();
       }
       catch (Exception e) {
-        log.warn(e, "Failed to close stdout to task[%s]. Destroying task.", taskInfo.getTask().getId());
+        LOGGER.warn(e, "Failed to close stdout to task[%s]. Destroying task.", taskInfo.getTask().getId());
         taskInfo.processHolder.process.destroy();
       }
     }
   }
 
-  private File getRestoreFile()
-  {
-    return new File(taskConfig.getBaseTaskDir(), TASK_RESTORE_FILENAME);
-  }
-
-  private static class TaskRestoreInfo
-  {
-    @JsonProperty
-    private final List<String> runningTasks;
-
-    @JsonCreator
-    public TaskRestoreInfo(
-        @JsonProperty("runningTasks") List<String> runningTasks
-    )
-    {
-      this.runningTasks = runningTasks;
-    }
-
-    public List<String> getRunningTasks()
-    {
-      return runningTasks;
-    }
-  }
-
-  private static class ForkingTaskRunnerWorkItem extends TaskRunnerWorkItem
+  protected static class ForkingTaskRunnerWorkItem extends TaskRunnerWorkItem
   {
     private final Task task;
 
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ThreadingTaskRunner.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ThreadingTaskRunner.java
new file mode 100644
index 0000000..c0bcaf4
--- /dev/null
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ThreadingTaskRunner.java
@@ -0,0 +1,521 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.overlord;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Joiner;
+import com.google.common.base.Optional;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.io.ByteSource;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
+import com.google.inject.Inject;
+import org.apache.commons.io.FileUtils;
+import org.apache.druid.guice.annotations.Self;
+import org.apache.druid.indexer.RunnerTaskState;
+import org.apache.druid.indexer.TaskLocation;
+import org.apache.druid.indexer.TaskStatus;
+import org.apache.druid.indexing.common.MultipleFileTaskReportFileWriter;
+import org.apache.druid.indexing.common.TaskReportFileWriter;
+import org.apache.druid.indexing.common.TaskToolbox;
+import org.apache.druid.indexing.common.TaskToolboxFactory;
+import org.apache.druid.indexing.common.config.TaskConfig;
+import org.apache.druid.indexing.common.task.Task;
+import org.apache.druid.indexing.overlord.autoscaling.ScalingStats;
+import org.apache.druid.indexing.worker.config.WorkerConfig;
+import org.apache.druid.java.util.common.DateTimes;
+import org.apache.druid.java.util.common.IOE;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.concurrent.Execs;
+import org.apache.druid.java.util.emitter.EmittingLogger;
+import org.apache.druid.query.Query;
+import org.apache.druid.query.QueryRunner;
+import org.apache.druid.query.QuerySegmentWalker;
+import org.apache.druid.query.SegmentDescriptor;
+import org.apache.druid.segment.realtime.appenderator.AppenderatorsManager;
+import org.apache.druid.server.DruidNode;
+import org.apache.druid.tasklogs.TaskLogPusher;
+import org.apache.druid.tasklogs.TaskLogStreamer;
+import org.joda.time.DateTime;
+import org.joda.time.Interval;
+
+import javax.annotation.Nullable;
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.Callable;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+/**
+ * TaskRunner implemention for the CliIndexer task execution service, which runs all tasks in a single process.
+ *
+ * Two thread pools are used:
+ * - A task execution pool, sized to number of worker slots. This is used to setup and execute the Task run() methods.
+ * - A control thread pool, sized to number of worker slots. The control threads are responsible for running graceful
+ *   shutdown on the Task objects. Only one shutdown per-task can be running at a given time,
+ *   so we allocate one control thread per worker slot.
+ *
+ * Note that separate task logs are not currently supported, all task log entries will be written to the Indexer
+ * process log instead.
+ */
+public class ThreadingTaskRunner
+    extends BaseRestorableTaskRunner<ThreadingTaskRunner.ThreadingTaskRunnerWorkItem>
+    implements TaskLogStreamer, QuerySegmentWalker
+{
+  private static final EmittingLogger LOGGER = new EmittingLogger(ThreadingTaskRunner.class);
+
+  private final TaskToolboxFactory toolboxFactory;
+  private final TaskLogPusher taskLogPusher;
+  private final DruidNode node;
+  private final AppenderatorsManager appenderatorsManager;
+  private final MultipleFileTaskReportFileWriter taskReportFileWriter;
+  private final ListeningExecutorService taskExecutor;
+  private final ListeningExecutorService controlThreadExecutor;
+
+  private volatile boolean stopping = false;
+
+  @Inject
+  public ThreadingTaskRunner(
+      TaskToolboxFactory toolboxFactory,
+      TaskConfig taskConfig,
+      WorkerConfig workerConfig,
+      TaskLogPusher taskLogPusher,
+      ObjectMapper jsonMapper,
+      AppenderatorsManager appenderatorsManager,
+      TaskReportFileWriter taskReportFileWriter,
+      @Self DruidNode node
+  )
+  {
+    super(jsonMapper, taskConfig);
+    this.toolboxFactory = toolboxFactory;
+    this.taskLogPusher = taskLogPusher;
+    this.node = node;
+    this.appenderatorsManager = appenderatorsManager;
+    this.taskReportFileWriter = (MultipleFileTaskReportFileWriter) taskReportFileWriter;
+    this.taskExecutor = MoreExecutors.listeningDecorator(
+        Execs.multiThreaded(workerConfig.getCapacity(), "threading-task-runner-executor-%d")
+    );
+    this.controlThreadExecutor = MoreExecutors.listeningDecorator(
+        Execs.multiThreaded(workerConfig.getCapacity(), "threading-task-runner-control-%d")
+    );
+  }
+
+  @Override
+  public Optional<ByteSource> streamTaskLog(String taskid, long offset) throws IOException
+  {
+    // task logs will appear in the main indexer log, streaming individual task logs is not supported
+    return Optional.absent();
+  }
+
+  @Override
+  public void start()
+  {
+    // Nothing to start.
+  }
+
+  @Override
+  public ListenableFuture<TaskStatus> run(Task task)
+  {
+    synchronized (tasks) {
+      tasks.computeIfAbsent(
+          task.getId(), k ->
+              new ThreadingTaskRunnerWorkItem(
+                  task,
+                  taskExecutor.submit(
+                      new Callable<TaskStatus>() {
+                        @Override
+                        public TaskStatus call()
+                        {
+                          final String attemptUUID = UUID.randomUUID().toString();
+                          final File taskDir = taskConfig.getTaskDir(task.getId());
+                          final File attemptDir = new File(taskDir, attemptUUID);
+
+                          final TaskLocation taskLocation = TaskLocation.create(
+                              node.getHost(),
+                              node.getPlaintextPort(),
+                              node.getTlsPort()
+                          );
+
+                          final ThreadingTaskRunnerWorkItem taskWorkItem;
+
+                          try {
+                            if (!attemptDir.mkdirs()) {
+                              throw new IOE("Could not create directories: %s", attemptDir);
+                            }
+
+                            final File taskFile = new File(taskDir, "task.json");
+                            final File reportsFile = new File(attemptDir, "report.json");
+                            taskReportFileWriter.add(task.getId(), reportsFile);
+
+                            // time to adjust process holders
+                            synchronized (tasks) {
+                              taskWorkItem = tasks.get(task.getId());
+
+                              if (taskWorkItem == null) {
+                                LOGGER.makeAlert("WTF?! TaskInfo disappeared!").addData("task", task.getId()).emit();
+                                throw new ISE("TaskInfo disappeared for task[%s]!", task.getId());
+                              }
+
+                              if (taskWorkItem.shutdown) {
+                                throw new IllegalStateException("Task has been shut down!");
+                              }
+                            }
+
+
+                            if (!taskFile.exists()) {
+                              jsonMapper.writeValue(taskFile, task);
+                            }
+
+                            // This will block for a while. So we append the thread information with more details
+                            final String priorThreadName = Thread.currentThread().getName();
+                            Thread.currentThread()
+                                  .setName(StringUtils.format("%s-[%s]", priorThreadName, task.getId()));
+
+                            TaskStatus taskStatus = null;
+                            final TaskToolbox toolbox = toolboxFactory.build(task);
+                            TaskRunnerUtils.notifyLocationChanged(listeners, task.getId(), taskLocation);
+                            TaskRunnerUtils.notifyStatusChanged(
+                                listeners,
+                                task.getId(),
+                                TaskStatus.running(task.getId())
+                            );
+
+                            taskWorkItem.setState(RunnerTaskState.RUNNING);
+                            try {
+                              taskStatus = task.run(toolbox);
+                            }
+                            catch (Throwable t) {
+                              LOGGER.error(t, "Exception caught while running the task.");
+                            }
+                            finally {
+                              taskWorkItem.setState(RunnerTaskState.NONE);
+                              if (taskStatus == null) {
+                                taskStatus = TaskStatus.failure(task.getId());
+                              }
+                              Thread.currentThread().setName(priorThreadName);
+                              if (reportsFile.exists()) {
+                                taskLogPusher.pushTaskReports(task.getId(), reportsFile);
+                              }
+                            }
+
+                            TaskRunnerUtils.notifyStatusChanged(listeners, task.getId(), taskStatus);
+                            return taskStatus;
+                          }
+                          catch (Throwable t) {
+                            LOGGER.error(t, "Exception caught during execution");
+                            throw new RuntimeException(t);
+                          }
+                          finally {
+                            try {
+                              taskReportFileWriter.delete(task.getId());
+                              appenderatorsManager.removeAppenderatorForTask(task.getId());
+
+                              synchronized (tasks) {
+                                tasks.remove(task.getId());
+                                if (!stopping) {
+                                  saveRunningTasks();
+                                }
+                              }
+
+                              try {
+                                if (!stopping && taskDir.exists()) {
+                                  LOGGER.info("Removing task directory: %s", taskDir);
+                                  FileUtils.deleteDirectory(taskDir);
+                                }
+                              }
+                              catch (Exception e) {
+                                LOGGER.makeAlert(e, "Failed to delete task directory")
+                                      .addData("taskDir", taskDir.toString())
+                                      .addData("task", task.getId())
+                                      .emit();
+                              }
+                            }
+                            catch (Exception e) {
+                              LOGGER.error(e, "Suppressing exception caught while cleaning up task");
+                            }
+                          }
+                        }
+                      }
+                  )
+              )
+      );
+      saveRunningTasks();
+      return tasks.get(task.getId()).getResult();
+    }
+  }
+
+  @Override
+  public void shutdown(String taskid, String reason)
+  {
+    LOGGER.info("Shutdown [%s] because: [%s]", taskid, reason);
+    final ThreadingTaskRunnerWorkItem taskInfo;
+
+    synchronized (tasks) {
+      taskInfo = tasks.get(taskid);
+
+      if (taskInfo == null) {
+        LOGGER.info("Ignoring request to cancel unknown task: %s", taskid);
+        return;
+      }
+
+      if (taskInfo.shutdown) {
+        LOGGER.info(
+            "Task [%s] is already shutting down, ignoring duplicate shutdown request with reason [%s]",
+            taskid,
+            reason
+        );
+      } else {
+        taskInfo.shutdown = true;
+        scheduleTaskShutdown(taskInfo);
+      }
+    }
+  }
+
+  /**
+   * Submits a callable to the control thread pool that attempts a task graceful shutdown,
+   * if shutdown is not already scheduled.
+   *
+   * The shutdown will wait for the configured timeout and then interrupt the thread if the timeout is exceeded.
+   */
+  private ListenableFuture scheduleTaskShutdown(ThreadingTaskRunnerWorkItem taskInfo)
+  {
+    synchronized (tasks) {
+      if (taskInfo.shutdownFuture != null) {
+        return taskInfo.shutdownFuture;
+      }
+
+      taskInfo.shutdownFuture = controlThreadExecutor.submit(
+          new Callable<Void>()
+          {
+            @Override
+            public Void call()
+            {
+              LOGGER.info("Stopping thread for task: %s", taskInfo.getTaskId());
+              taskInfo.getTask().stopGracefully(taskConfig);
+
+              try {
+                taskInfo.getResult().get(
+                    taskConfig.getGracefulShutdownTimeout().toStandardDuration().getMillis(),
+                    TimeUnit.MILLISECONDS
+                );
+              }
+              catch (TimeoutException e) {
+                // Note that we can't truly force a hard termination of the task, interrupting the thread
+                // running the task to hopefully have it stop.
+                // In the future we may want to add a forceful shutdown method to the Task interface.
+                taskInfo.getResult().cancel(true);
+              }
+              catch (Exception e) {
+                LOGGER.info(e, "Encountered exception while waiting for task [%s] shutdown", taskInfo.getTaskId());
+                if (taskInfo.shutdownFuture != null) {
+                  taskInfo.shutdownFuture.cancel(true);
+                }
+              }
+              return null;
+            }
+          }
+      );
+
+      return taskInfo.shutdownFuture;
+    }
+  }
+
+  /**
+   * First shuts down the task execution pool and then schedules a graceful shutdown attempt for each active task.
+   *
+   * After the tasks shutdown gracefully or the graceful shutdown timeout is exceeded, the control thread pool
+   * will be terminated (also waiting for the graceful shutdown period for this termination).
+   */
+  @Override
+  public void stop()
+  {
+    stopping = true;
+    taskExecutor.shutdown();
+
+    List<ListenableFuture<Void>> shutdownFutures = new ArrayList<>();
+    synchronized (tasks) {
+      for (ThreadingTaskRunnerWorkItem taskWorkItem : tasks.values()) {
+        shutdownFutures.add(scheduleTaskShutdown(taskWorkItem));
+      }
+    }
+    controlThreadExecutor.shutdown();
+    try {
+      ListenableFuture<List<Void>> shutdownFuture = Futures.successfulAsList(shutdownFutures);
+      shutdownFuture.get();
+    }
+    catch (Exception e) {
+      LOGGER.error(e, "Encountered exception when stopping all tasks.");
+    }
+
+    final DateTime start = DateTimes.nowUtc();
+    final long gracefulShutdownMillis = taskConfig.getGracefulShutdownTimeout().toStandardDuration().getMillis();
+
+    LOGGER.info("Waiting up to %,dms for shutdown.", gracefulShutdownMillis);
+    if (gracefulShutdownMillis > 0) {
+      try {
+        final boolean terminated = controlThreadExecutor.awaitTermination(
+            gracefulShutdownMillis,
+            TimeUnit.MILLISECONDS
+        );
+        final long elapsed = System.currentTimeMillis() - start.getMillis();
+        if (terminated) {
+          LOGGER.info("Finished stopping in %,dms.", elapsed);
+        } else {
+          final Set<String> stillRunning;
+          synchronized (tasks) {
+            stillRunning = ImmutableSet.copyOf(tasks.keySet());
+          }
+          LOGGER.makeAlert("Failed to stop task threads")
+                .addData("stillRunning", stillRunning)
+                .addData("elapsed", elapsed)
+                .emit();
+
+          LOGGER.warn(
+              "Executor failed to stop after %,dms, not waiting for it! Tasks still running: [%s]",
+              elapsed,
+              Joiner.on("; ").join(stillRunning)
+          );
+        }
+      }
+      catch (InterruptedException e) {
+        LOGGER.warn(e, "Interrupted while waiting for executor to finish.");
+        Thread.currentThread().interrupt();
+      }
+    } else {
+      LOGGER.warn("Ran out of time, not waiting for executor to finish!");
+    }
+  }
+
+  @Override
+  public Collection<TaskRunnerWorkItem> getRunningTasks()
+  {
+    return getTasks(RunnerTaskState.RUNNING);
+  }
+
+  @Override
+  public Collection<TaskRunnerWorkItem> getPendingTasks()
+  {
+    return getTasks(RunnerTaskState.PENDING);
+  }
+
+  @Nullable
+  @Override
+  public RunnerTaskState getRunnerTaskState(String taskId)
+  {
+    final ThreadingTaskRunnerWorkItem workItem = tasks.get(taskId);
+    return workItem == null ? null : workItem.getState();
+  }
+
+  private Collection<TaskRunnerWorkItem> getTasks(RunnerTaskState state)
+  {
+    synchronized (tasks) {
+      final List<TaskRunnerWorkItem> ret = new ArrayList<>();
+      for (final ThreadingTaskRunnerWorkItem taskWorkItem : tasks.values()) {
+        if (taskWorkItem.getState() == state) {
+          ret.add(taskWorkItem);
+        }
+      }
+      return ret;
+    }
+  }
+
+  @Override
+  public Optional<ScalingStats> getScalingStats()
+  {
+    return Optional.absent();
+  }
+
+  @Override
+  public <T> QueryRunner<T> getQueryRunnerForIntervals(
+      Query<T> query,
+      Iterable<Interval> intervals
+  )
+  {
+    return appenderatorsManager.getQueryRunnerForIntervals(query, intervals);
+  }
+
+  @Override
+  public <T> QueryRunner<T> getQueryRunnerForSegments(
+      Query<T> query,
+      Iterable<SegmentDescriptor> specs
+  )
+  {
+    return appenderatorsManager.getQueryRunnerForSegments(query, specs);
+  }
+
+  protected static class ThreadingTaskRunnerWorkItem extends TaskRunnerWorkItem
+  {
+    private final Task task;
+    private volatile boolean shutdown = false;
+    private volatile ListenableFuture shutdownFuture;
+    private volatile RunnerTaskState state;
+
+    private ThreadingTaskRunnerWorkItem(
+        Task task,
+        ListenableFuture<TaskStatus> statusFuture
+    )
+    {
+      super(task.getId(), statusFuture);
+      this.task = task;
+      this.state = RunnerTaskState.PENDING;
+    }
+
+    public Task getTask()
+    {
+      return task;
+    }
+
+    @Override
+    public TaskLocation getLocation()
+    {
+      return null;
+    }
+
+    @Override
+    public String getTaskType()
+    {
+      return task.getType();
+    }
+
+    @Override
+    public String getDataSource()
+    {
+      return task.getDataSource();
+    }
+
+    public RunnerTaskState getState()
+    {
+      return state;
+    }
+
+    public void setState(RunnerTaskState state)
+    {
+      this.state = state;
+    }
+  }
+}
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java
index 532b2fc..03ee844 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java
@@ -41,7 +41,6 @@ import org.apache.druid.concurrent.LifecycleLock;
 import org.apache.druid.discovery.DiscoveryDruidNode;
 import org.apache.druid.discovery.DruidNodeDiscovery;
 import org.apache.druid.discovery.DruidNodeDiscoveryProvider;
-import org.apache.druid.discovery.NodeType;
 import org.apache.druid.discovery.WorkerNodeService;
 import org.apache.druid.indexer.RunnerTaskState;
 import org.apache.druid.indexer.TaskLocation;
@@ -438,7 +437,10 @@ public class HttpRemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer
   private void startWorkersHandling() throws InterruptedException
   {
     final CountDownLatch workerViewInitialized = new CountDownLatch(1);
-    DruidNodeDiscovery druidNodeDiscovery = druidNodeDiscoveryProvider.getForNodeType(NodeType.MIDDLE_MANAGER);
+
+    DruidNodeDiscovery druidNodeDiscovery = druidNodeDiscoveryProvider.getForService(
+        WorkerNodeService.DISCOVERY_SERVICE_KEY
+    );
     druidNodeDiscovery.registerListener(
         new DruidNodeDiscovery.Listener()
         {
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTask.java
index 0f3d2ed..2da2f96 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTask.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTask.java
@@ -51,7 +51,7 @@ import org.apache.druid.query.QueryRunner;
 import org.apache.druid.segment.indexing.DataSchema;
 import org.apache.druid.segment.realtime.FireDepartmentMetrics;
 import org.apache.druid.segment.realtime.appenderator.Appenderator;
-import org.apache.druid.segment.realtime.appenderator.Appenderators;
+import org.apache.druid.segment.realtime.appenderator.AppenderatorsManager;
 import org.apache.druid.segment.realtime.appenderator.StreamAppenderatorDriver;
 import org.apache.druid.segment.realtime.firehose.ChatHandler;
 import org.apache.druid.segment.realtime.firehose.ChatHandlerProvider;
@@ -77,6 +77,7 @@ public abstract class SeekableStreamIndexTask<PartitionIdType, SequenceOffsetTyp
   protected final AuthorizerMapper authorizerMapper;
   protected final RowIngestionMetersFactory rowIngestionMetersFactory;
   protected final CircularBuffer<Throwable> savedParseExceptions;
+  protected final AppenderatorsManager appenderatorsManager;
   protected final LockGranularity lockGranularityToUse;
 
   // Lazily initialized, to avoid calling it on the overlord when tasks are instantiated.
@@ -94,7 +95,8 @@ public abstract class SeekableStreamIndexTask<PartitionIdType, SequenceOffsetTyp
       @Nullable final ChatHandlerProvider chatHandlerProvider,
       final AuthorizerMapper authorizerMapper,
       final RowIngestionMetersFactory rowIngestionMetersFactory,
-      @Nullable final String groupId
+      @Nullable final String groupId,
+      AppenderatorsManager appenderatorsManager
   )
   {
     super(
@@ -117,6 +119,7 @@ public abstract class SeekableStreamIndexTask<PartitionIdType, SequenceOffsetTyp
     this.authorizerMapper = authorizerMapper;
     this.rowIngestionMetersFactory = rowIngestionMetersFactory;
     this.runnerSupplier = Suppliers.memoize(this::createTaskRunner);
+    this.appenderatorsManager = appenderatorsManager;
     this.lockGranularityToUse = getContextValue(Tasks.FORCE_TIME_CHUNK_LOCK_KEY, Tasks.DEFAULT_FORCE_TIME_CHUNK_LOCK)
                                 ? LockGranularity.TIME_CHUNK
                                 : LockGranularity.SEGMENT;
@@ -185,6 +188,8 @@ public abstract class SeekableStreamIndexTask<PartitionIdType, SequenceOffsetTyp
   {
     if (taskConfig.isRestoreTasksOnRestart()) {
       getRunner().stopGracefully();
+    } else {
+      getRunner().stopForcefully();
     }
   }
 
@@ -201,7 +206,8 @@ public abstract class SeekableStreamIndexTask<PartitionIdType, SequenceOffsetTyp
 
   public Appenderator newAppenderator(FireDepartmentMetrics metrics, TaskToolbox toolbox)
   {
-    return Appenderators.createRealtime(
+    return appenderatorsManager.createRealtimeAppenderatorForTask(
+        getId(),
         dataSchema,
         tuningConfig.withBasePersistDirectory(toolbox.getPersistDir()),
         metrics,
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java
index 4dfd676..3560192 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java
@@ -75,6 +75,7 @@ import org.apache.druid.segment.realtime.FireDepartment;
 import org.apache.druid.segment.realtime.FireDepartmentMetrics;
 import org.apache.druid.segment.realtime.appenderator.Appenderator;
 import org.apache.druid.segment.realtime.appenderator.AppenderatorDriverAddResult;
+import org.apache.druid.segment.realtime.appenderator.AppenderatorsManager;
 import org.apache.druid.segment.realtime.appenderator.SegmentsAndMetadata;
 import org.apache.druid.segment.realtime.appenderator.StreamAppenderatorDriver;
 import org.apache.druid.segment.realtime.firehose.ChatHandler;
@@ -200,6 +201,7 @@ public abstract class SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
   private final CircularBuffer<Throwable> savedParseExceptions;
   private final String stream;
   private final RowIngestionMeters rowIngestionMeters;
+  private final AppenderatorsManager appenderatorsManager;
 
   private final Set<String> publishingSequences = Sets.newConcurrentHashSet();
   private final List<ListenableFuture<SegmentsAndMetadata>> publishWaitList = new ArrayList<>();
@@ -228,6 +230,7 @@ public abstract class SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
       final Optional<ChatHandlerProvider> chatHandlerProvider,
       final CircularBuffer<Throwable> savedParseExceptions,
       final RowIngestionMetersFactory rowIngestionMetersFactory,
+      final AppenderatorsManager appenderatorsManager,
       final LockGranularity lockGranularityToUse
   )
   {
@@ -241,6 +244,7 @@ public abstract class SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
     this.savedParseExceptions = savedParseExceptions;
     this.stream = ioConfig.getStartSequenceNumbers().getStream();
     this.rowIngestionMeters = rowIngestionMetersFactory.createRowIngestionMeters();
+    this.appenderatorsManager = appenderatorsManager;
     this.endOffsets = new ConcurrentHashMap<>(ioConfig.getEndSequenceNumbers().getPartitionSequenceNumberMap());
     this.sequences = new CopyOnWriteArrayList<>();
     this.ingestionState = IngestionState.NOT_STARTED;
@@ -257,7 +261,7 @@ public abstract class SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
     catch (Exception e) {
       log.error(e, "Encountered exception while running task.");
       final String errorMsg = Throwables.getStackTraceAsString(e);
-      toolbox.getTaskReportFileWriter().write(getTaskCompletionReports(errorMsg));
+      toolbox.getTaskReportFileWriter().write(task.getId(), getTaskCompletionReports(errorMsg));
       return TaskStatus.failure(
           task.getId(),
           errorMsg
@@ -385,9 +389,11 @@ public abstract class SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
 
     Throwable caughtExceptionOuter = null;
     try (final RecordSupplier<PartitionIdType, SequenceOffsetType> recordSupplier = task.newTaskRecordSupplier()) {
-      toolbox.getDataSegmentServerAnnouncer().announce();
-      toolbox.getDruidNodeAnnouncer().announce(discoveryDruidNode);
 
+      if (appenderatorsManager.shouldTaskMakeNodeAnnouncements()) {
+        toolbox.getDataSegmentServerAnnouncer().announce();
+        toolbox.getDruidNodeAnnouncer().announce(discoveryDruidNode);
+      }
       appenderator = task.newAppenderator(fireDepartmentMetrics, toolbox);
       driver = task.newDriver(appenderator, toolbox, fireDepartmentMetrics);
 
@@ -874,8 +880,10 @@ public abstract class SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
           chatHandlerProvider.get().unregister(task.getId());
         }
 
-        toolbox.getDruidNodeAnnouncer().unannounce(discoveryDruidNode);
-        toolbox.getDataSegmentServerAnnouncer().unannounce();
+        if (appenderatorsManager.shouldTaskMakeNodeAnnouncements()) {
+          toolbox.getDruidNodeAnnouncer().unannounce(discoveryDruidNode);
+          toolbox.getDataSegmentServerAnnouncer().unannounce();
+        }
       }
       catch (Throwable e) {
         if (caughtExceptionOuter != null) {
@@ -886,7 +894,7 @@ public abstract class SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
       }
     }
 
-    toolbox.getTaskReportFileWriter().write(getTaskCompletionReports(null));
+    toolbox.getTaskReportFileWriter().write(task.getId(), getTaskCompletionReports(null));
     return TaskStatus.success(task.getId());
   }
 
@@ -1363,6 +1371,12 @@ public abstract class SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
     return rowIngestionMeters;
   }
 
+  public void stopForcefully()
+  {
+    log.info("Stopping forcefully (status: [%s])", status);
+    stopRequested.set(true);
+    runThread.interrupt();
+  }
 
   public void stopGracefully()
   {
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/worker/WorkerTaskManager.java b/indexing-service/src/main/java/org/apache/druid/indexing/worker/WorkerTaskManager.java
index ac5c15c..3f22dad 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/worker/WorkerTaskManager.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/worker/WorkerTaskManager.java
@@ -158,6 +158,8 @@ public abstract class WorkerTaskManager
 
     synchronized (lock) {
       try {
+        // When stopping, the task status should not be communicated to the overlord, so the listener and exec
+        // are shut down before the taskRunner is stopped.
         taskRunner.unregisterListener("WorkerTaskManager");
         exec.shutdownNow();
         taskRunner.stop();
@@ -693,7 +695,6 @@ public abstract class WorkerTaskManager
 
         changeHistory.addChangeRequest(new WorkerHistoryItem.TaskUpdate(latest));
         taskAnnouncementChanged(latest);
-
         log.info(
             "Job's finished. Completed [%s] with status [%s]",
             task.getId(),
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/TaskToolboxTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/TaskToolboxTest.java
index 0575afe..36ad3a0 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/common/TaskToolboxTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/TaskToolboxTest.java
@@ -26,7 +26,7 @@ import org.apache.druid.client.cache.CacheConfig;
 import org.apache.druid.client.cache.CachePopulatorStats;
 import org.apache.druid.indexing.common.actions.TaskActionClientFactory;
 import org.apache.druid.indexing.common.config.TaskConfig;
-import org.apache.druid.indexing.common.task.NoopTestTaskFileWriter;
+import org.apache.druid.indexing.common.task.NoopTestTaskReportFileWriter;
 import org.apache.druid.indexing.common.task.Task;
 import org.apache.druid.java.util.common.Intervals;
 import org.apache.druid.java.util.emitter.service.ServiceEmitter;
@@ -119,7 +119,7 @@ public class TaskToolboxTest
         null,
         null,
         null,
-        new NoopTestTaskFileWriter()
+        new NoopTestTaskReportFileWriter()
     );
   }
 
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/TestUtils.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/TestUtils.java
index 1b0430a..33011f9 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/common/TestUtils.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/TestUtils.java
@@ -28,6 +28,7 @@ import org.apache.druid.client.indexing.IndexingServiceClient;
 import org.apache.druid.client.indexing.NoopIndexingServiceClient;
 import org.apache.druid.indexing.common.stats.DropwizardRowIngestionMetersFactory;
 import org.apache.druid.indexing.common.stats.RowIngestionMetersFactory;
+import org.apache.druid.indexing.common.task.TestAppenderatorsManager;
 import org.apache.druid.jackson.DefaultObjectMapper;
 import org.apache.druid.java.util.common.ISE;
 import org.apache.druid.java.util.common.logger.Logger;
@@ -38,6 +39,7 @@ import org.apache.druid.segment.IndexMergerV9;
 import org.apache.druid.segment.column.ColumnConfig;
 import org.apache.druid.segment.loading.LocalDataSegmentPuller;
 import org.apache.druid.segment.loading.LocalLoadSpec;
+import org.apache.druid.segment.realtime.appenderator.AppenderatorsManager;
 import org.apache.druid.segment.realtime.firehose.ChatHandlerProvider;
 import org.apache.druid.segment.realtime.firehose.NoopChatHandlerProvider;
 import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory;
@@ -88,6 +90,7 @@ public class TestUtils
             .addValue(DataSegment.PruneLoadSpecHolder.class, DataSegment.PruneLoadSpecHolder.DEFAULT)
             .addValue(IndexingServiceClient.class, new NoopIndexingServiceClient())
             .addValue(AuthorizerMapper.class, new AuthorizerMapper(ImmutableMap.of()))
+            .addValue(AppenderatorsManager.class, new TestAppenderatorsManager())
             .addValue(LocalDataSegmentPuller.class, new LocalDataSegmentPuller())
     );
 
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTaskTest.java
index 2f568d8..482ef76 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTaskTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTaskTest.java
@@ -51,8 +51,8 @@ import org.apache.druid.indexer.TaskState;
 import org.apache.druid.indexer.TaskStatus;
 import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReportData;
 import org.apache.druid.indexing.common.SegmentLoaderFactory;
+import org.apache.druid.indexing.common.SingleFileTaskReportFileWriter;
 import org.apache.druid.indexing.common.TaskReport;
-import org.apache.druid.indexing.common.TaskReportFileWriter;
 import org.apache.druid.indexing.common.TaskToolbox;
 import org.apache.druid.indexing.common.TaskToolboxFactory;
 import org.apache.druid.indexing.common.TestUtils;
@@ -119,6 +119,7 @@ import org.apache.druid.segment.indexing.RealtimeIOConfig;
 import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec;
 import org.apache.druid.segment.loading.SegmentLoaderConfig;
 import org.apache.druid.segment.loading.StorageLocationConfig;
+import org.apache.druid.segment.realtime.appenderator.AppenderatorsManager;
 import org.apache.druid.segment.realtime.plumber.SegmentHandoffNotifier;
 import org.apache.druid.segment.realtime.plumber.SegmentHandoffNotifierFactory;
 import org.apache.druid.segment.transform.ExpressionTransform;
@@ -274,6 +275,7 @@ public class AppenderatorDriverRealtimeIndexTaskTest
   private File baseDir;
   private File reportsFile;
   private RowIngestionMetersFactory rowIngestionMetersFactory;
+  private AppenderatorsManager appenderatorsManager;
 
   @Before
   public void setUp() throws IOException
@@ -289,6 +291,8 @@ public class AppenderatorDriverRealtimeIndexTaskTest
     derbyConnector.createSegmentTable();
     derbyConnector.createPendingSegmentsTable();
 
+    appenderatorsManager = new TestAppenderatorsManager();
+
     baseDir = tempFolder.newFolder();
     reportsFile = File.createTempFile("KafkaIndexTaskTestReports-" + System.currentTimeMillis(), "json");
     makeToolboxFactory(baseDir);
@@ -1431,7 +1435,8 @@ public class AppenderatorDriverRealtimeIndexTaskTest
         null,
         null,
         AuthTestUtils.TEST_AUTHORIZER_MAPPER,
-        rowIngestionMetersFactory
+        rowIngestionMetersFactory,
+        appenderatorsManager
     )
     {
       @Override
@@ -1620,7 +1625,7 @@ public class AppenderatorDriverRealtimeIndexTaskTest
         EasyMock.createNiceMock(DruidNode.class),
         new LookupNodeService("tier"),
         new DataNodeService("tier", 1000, ServerType.INDEXER_EXECUTOR, 0),
-        new TaskReportFileWriter(reportsFile)
+        new SingleFileTaskReportFileWriter(reportsFile)
     );
   }
 
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskRunTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskRunTest.java
index 894ee08..cd00cf2 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskRunTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskRunTest.java
@@ -53,6 +53,7 @@ import org.apache.druid.segment.loading.SegmentLoader;
 import org.apache.druid.segment.loading.SegmentLoaderConfig;
 import org.apache.druid.segment.loading.SegmentLoaderLocalCacheManager;
 import org.apache.druid.segment.loading.StorageLocationConfig;
+import org.apache.druid.segment.realtime.appenderator.AppenderatorsManager;
 import org.apache.druid.server.security.AuthTestUtils;
 import org.apache.druid.timeline.DataSegment;
 import org.apache.druid.timeline.partition.NumberedOverwriteShardSpec;
@@ -126,6 +127,7 @@ public class CompactionTaskRunTest extends IngestionTestBase
   private final SegmentLoaderFactory segmentLoaderFactory;
   private final LockGranularity lockGranularity;
   private ExecutorService exec;
+  private AppenderatorsManager appenderatorsManager;
 
   public CompactionTaskRunTest(LockGranularity lockGranularity)
   {
@@ -140,6 +142,7 @@ public class CompactionTaskRunTest extends IngestionTestBase
       }
     };
     segmentLoaderFactory = new SegmentLoaderFactory(getIndexIO(), getObjectMapper());
+    appenderatorsManager = new TestAppenderatorsManager();
     this.lockGranularity = lockGranularity;
   }
 
@@ -168,7 +171,8 @@ public class CompactionTaskRunTest extends IngestionTestBase
         rowIngestionMetersFactory,
         coordinatorClient,
         segmentLoaderFactory,
-        retryPolicyFactory
+        retryPolicyFactory,
+        appenderatorsManager
     );
 
     final CompactionTask compactionTask = builder
@@ -211,7 +215,8 @@ public class CompactionTaskRunTest extends IngestionTestBase
         rowIngestionMetersFactory,
         coordinatorClient,
         segmentLoaderFactory,
-        retryPolicyFactory
+        retryPolicyFactory,
+        appenderatorsManager
     );
 
     final CompactionTask compactionTask1 = builder
@@ -286,7 +291,8 @@ public class CompactionTaskRunTest extends IngestionTestBase
         rowIngestionMetersFactory,
         coordinatorClient,
         segmentLoaderFactory,
-        retryPolicyFactory
+        retryPolicyFactory,
+        appenderatorsManager
     );
 
     final CompactionTask compactionTask = builder
@@ -326,7 +332,8 @@ public class CompactionTaskRunTest extends IngestionTestBase
         null,
         AuthTestUtils.TEST_AUTHORIZER_MAPPER,
         null,
-        rowIngestionMetersFactory
+        rowIngestionMetersFactory,
+        appenderatorsManager
     );
 
     final Future<Pair<TaskStatus, List<DataSegment>>> compactionFuture = exec.submit(
@@ -381,7 +388,8 @@ public class CompactionTaskRunTest extends IngestionTestBase
         rowIngestionMetersFactory,
         coordinatorClient,
         segmentLoaderFactory,
-        retryPolicyFactory
+        retryPolicyFactory,
+        appenderatorsManager
     );
 
     // day segmentGranularity
@@ -433,7 +441,8 @@ public class CompactionTaskRunTest extends IngestionTestBase
         rowIngestionMetersFactory,
         coordinatorClient,
         segmentLoaderFactory,
-        retryPolicyFactory
+        retryPolicyFactory,
+        appenderatorsManager
     );
 
     final CompactionTask compactionTask = builder
@@ -479,7 +488,8 @@ public class CompactionTaskRunTest extends IngestionTestBase
         rowIngestionMetersFactory,
         coordinatorClient,
         segmentLoaderFactory,
-        retryPolicyFactory
+        retryPolicyFactory,
+        appenderatorsManager
     );
 
     final CompactionTask compactionTask = builder
@@ -536,7 +546,8 @@ public class CompactionTaskRunTest extends IngestionTestBase
         rowIngestionMetersFactory,
         coordinatorClient,
         segmentLoaderFactory,
-        retryPolicyFactory
+        retryPolicyFactory,
+        appenderatorsManager
     );
 
     final CompactionTask compactionTask = builder
@@ -643,7 +654,8 @@ public class CompactionTaskRunTest extends IngestionTestBase
         null,
         AuthTestUtils.TEST_AUTHORIZER_MAPPER,
         null,
-        rowIngestionMetersFactory
+        rowIngestionMetersFactory,
+        appenderatorsManager
     );
 
     return runTask(indexTask, readyLatchToCountDown, latchToAwaitBeforeRun);
@@ -709,7 +721,7 @@ public class CompactionTaskRunTest extends IngestionTestBase
         null,
         null,
         null,
-        new NoopTestTaskFileWriter()
+        new NoopTestTaskReportFileWriter()
     );
 
     task.addToContext(Tasks.FORCE_TIME_CHUNK_LOCK_KEY, lockGranularity == LockGranularity.TIME_CHUNK);
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java
index 2494046..80eb893 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java
@@ -96,6 +96,7 @@ import org.apache.druid.segment.incremental.IncrementalIndex;
 import org.apache.druid.segment.indexing.DataSchema;
 import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec;
 import org.apache.druid.segment.loading.SegmentLoadingException;
+import org.apache.druid.segment.realtime.appenderator.AppenderatorsManager;
 import org.apache.druid.segment.realtime.firehose.ChatHandlerProvider;
 import org.apache.druid.segment.realtime.firehose.NoopChatHandlerProvider;
 import org.apache.druid.segment.selector.settable.SettableColumnValueSelector;
@@ -154,6 +155,7 @@ public class CompactionTaskTest
   private static RowIngestionMetersFactory rowIngestionMetersFactory = new TestUtils().getRowIngestionMetersFactory();
   private static Map<DataSegment, File> segmentMap = new HashMap<>();
   private static CoordinatorClient coordinatorClient = new TestCoordinatorClient(segmentMap);
+  private static AppenderatorsManager appenderatorsManager = new TestAppenderatorsManager();
   private static ObjectMapper objectMapper = setupInjectablesInObjectMapper(new DefaultObjectMapper());
   private static RetryPolicyFactory retryPolicyFactory = new RetryPolicyFactory(new RetryPolicyConfig());
 
@@ -244,6 +246,7 @@ public class CompactionTaskTest
                   binder.bind(RowIngestionMetersFactory.class).toInstance(rowIngestionMetersFactory);
                   binder.bind(CoordinatorClient.class).toInstance(coordinatorClient);
                   binder.bind(SegmentLoaderFactory.class).toInstance(new SegmentLoaderFactory(null, objectMapper));
+                  binder.bind(AppenderatorsManager.class).toInstance(appenderatorsManager);
                 }
             )
         )
@@ -328,7 +331,8 @@ public class CompactionTaskTest
         rowIngestionMetersFactory,
         coordinatorClient,
         segmentLoaderFactory,
-        retryPolicyFactory
+        retryPolicyFactory,
+        appenderatorsManager
     );
     final CompactionTask task = builder
         .interval(COMPACTION_INTERVAL)
@@ -352,7 +356,8 @@ public class CompactionTaskTest
         rowIngestionMetersFactory,
         coordinatorClient,
         segmentLoaderFactory,
-        retryPolicyFactory
+        retryPolicyFactory,
+        appenderatorsManager
     );
     final CompactionTask task = builder
         .segments(SEGMENTS)
@@ -376,7 +381,8 @@ public class CompactionTaskTest
         rowIngestionMetersFactory,
         coordinatorClient,
         segmentLoaderFactory,
-        retryPolicyFactory
+        retryPolicyFactory,
+        appenderatorsManager
     );
 
     final CompactionTask task = builder
@@ -824,7 +830,8 @@ public class CompactionTaskTest
         rowIngestionMetersFactory,
         coordinatorClient,
         segmentLoaderFactory,
-        retryPolicyFactory
+        retryPolicyFactory,
+        appenderatorsManager
     );
 
     final CompactionTask task = builder
@@ -1177,7 +1184,7 @@ public class CompactionTaskTest
           null,
           null,
           null,
-          new NoopTestTaskFileWriter()
+          new NoopTestTaskReportFileWriter()
       );
       this.segmentFileMap = segmentFileMap;
     }
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexTaskTest.java
index 45ced16..d3150ae 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexTaskTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexTaskTest.java
@@ -70,6 +70,7 @@ import org.apache.druid.segment.loading.SegmentLoader;
 import org.apache.druid.segment.loading.SegmentLoaderConfig;
 import org.apache.druid.segment.loading.SegmentLoaderLocalCacheManager;
 import org.apache.druid.segment.loading.StorageLocationConfig;
+import org.apache.druid.segment.realtime.appenderator.AppenderatorsManager;
 import org.apache.druid.segment.realtime.firehose.LocalFirehoseFactory;
 import org.apache.druid.segment.realtime.firehose.WindowedStorageAdapter;
 import org.apache.druid.segment.transform.ExpressionTransform;
@@ -140,6 +141,7 @@ public class IndexTaskTest extends IngestionTestBase
 
   private static final IndexSpec indexSpec = new IndexSpec();
   private final ObjectMapper jsonMapper;
+  private AppenderatorsManager appenderatorsManager;
   private final IndexIO indexIO;
   private final RowIngestionMetersFactory rowIngestionMetersFactory;
   private final LockGranularity lockGranularity;
@@ -157,6 +159,8 @@ public class IndexTaskTest extends IngestionTestBase
   @Before
   public void setup() throws IOException
   {
+    appenderatorsManager = new TestAppenderatorsManager();
+
     final File cacheDir = temporaryFolder.newFolder();
     segmentLoader = new SegmentLoaderLocalCacheManager(
         indexIO,
@@ -202,7 +206,8 @@ public class IndexTaskTest extends IngestionTestBase
         null,
         AuthTestUtils.TEST_AUTHORIZER_MAPPER,
         null,
-        rowIngestionMetersFactory
+        rowIngestionMetersFactory,
+        appenderatorsManager
     );
 
     final List<DataSegment> segments = runTask(indexTask).rhs;
@@ -255,7 +260,8 @@ public class IndexTaskTest extends IngestionTestBase
         null,
         AuthTestUtils.TEST_AUTHORIZER_MAPPER,
         null,
-        rowIngestionMetersFactory
+        rowIngestionMetersFactory,
+        appenderatorsManager
     );
 
     Assert.assertEquals(indexTask.getId(), indexTask.getGroupId());
@@ -300,7 +306,8 @@ public class IndexTaskTest extends IngestionTestBase
         null,
         AuthTestUtils.TEST_AUTHORIZER_MAPPER,
         null,
-        rowIngestionMetersFactory
+        rowIngestionMetersFactory,
+        appenderatorsManager
     );
 
     final List<DataSegment> segments = runTask(indexTask).rhs;
@@ -338,7 +345,8 @@ public class IndexTaskTest extends IngestionTestBase
         null,
         AuthTestUtils.TEST_AUTHORIZER_MAPPER,
         null,
-        rowIngestionMetersFactory
+        rowIngestionMetersFactory,
+        appenderatorsManager
     );
 
     final List<DataSegment> segments = runTask(indexTask).rhs;
@@ -372,7 +380,8 @@ public class IndexTaskTest extends IngestionTestBase
         null,
         AuthTestUtils.TEST_AUTHORIZER_MAPPER,
         null,
-        rowIngestionMetersFactory
+        rowIngestionMetersFactory,
+        appenderatorsManager
     );
 
     final List<DataSegment> segments = runTask(indexTask).rhs;
@@ -411,7 +420,8 @@ public class IndexTaskTest extends IngestionTestBase
         null,
         AuthTestUtils.TEST_AUTHORIZER_MAPPER,
         null,
-        rowIngestionMetersFactory
+        rowIngestionMetersFactory,
+        appenderatorsManager
     );
 
     final List<DataSegment> segments = runTask(indexTask).rhs;
@@ -486,7 +496,8 @@ public class IndexTaskTest extends IngestionTestBase
         null,
         AuthTestUtils.TEST_AUTHORIZER_MAPPER,
         null,
-        rowIngestionMetersFactory
+        rowIngestionMetersFactory,
+        appenderatorsManager
     );
 
     Assert.assertEquals("index_append_test", indexTask.getGroupId());
@@ -537,7 +548,8 @@ public class IndexTaskTest extends IngestionTestBase
         null,
         AuthTestUtils.TEST_AUTHORIZER_MAPPER,
         null,
-        rowIngestionMetersFactory
+        rowIngestionMetersFactory,
+        appenderatorsManager
     );
 
     final List<DataSegment> segments = runTask(indexTask).rhs;
@@ -601,7 +613,8 @@ public class IndexTaskTest extends IngestionTestBase
         null,
         AuthTestUtils.TEST_AUTHORIZER_MAPPER,
         null,
-        rowIngestionMetersFactory
+        rowIngestionMetersFactory,
+        appenderatorsManager
     );
 
     final List<DataSegment> segments = runTask(indexTask).rhs;
@@ -654,7 +667,8 @@ public class IndexTaskTest extends IngestionTestBase
         null,
         AuthTestUtils.TEST_AUTHORIZER_MAPPER,
         null,
-        rowIngestionMetersFactory
+        rowIngestionMetersFactory,
+        appenderatorsManager
     );
 
     final List<DataSegment> segments = runTask(indexTask).rhs;
@@ -702,7 +716,8 @@ public class IndexTaskTest extends IngestionTestBase
         null,
         AuthTestUtils.TEST_AUTHORIZER_MAPPER,
         null,
-        rowIngestionMetersFactory
+        rowIngestionMetersFactory,
+        appenderatorsManager
     );
 
     final List<DataSegment> segments = runTask(indexTask).rhs;
@@ -748,7 +763,8 @@ public class IndexTaskTest extends IngestionTestBase
         null,
         AuthTestUtils.TEST_AUTHORIZER_MAPPER,
         null,
-        rowIngestionMetersFactory
+        rowIngestionMetersFactory,
+        appenderatorsManager
     );
 
     final List<DataSegment> segments = runTask(indexTask).rhs;
@@ -793,7 +809,8 @@ public class IndexTaskTest extends IngestionTestBase
         null,
         AuthTestUtils.TEST_AUTHORIZER_MAPPER,
         null,
-        rowIngestionMetersFactory
+        rowIngestionMetersFactory,
+        appenderatorsManager
     );
 
     final List<DataSegment> segments = runTask(indexTask).rhs;
@@ -872,7 +889,8 @@ public class IndexTaskTest extends IngestionTestBase
         null,
         AuthTestUtils.TEST_AUTHORIZER_MAPPER,
         null,
-        rowIngestionMetersFactory
+        rowIngestionMetersFactory,
+        appenderatorsManager
     );
 
     final List<DataSegment> segments = runTask(indexTask).rhs;
@@ -926,7 +944,8 @@ public class IndexTaskTest extends IngestionTestBase
         null,
         AuthTestUtils.TEST_AUTHORIZER_MAPPER,
         null,
-        rowIngestionMetersFactory
+        rowIngestionMetersFactory,
+        appenderatorsManager
     );
 
     TaskStatus status = runTask(indexTask).lhs;
@@ -1022,7 +1041,8 @@ public class IndexTaskTest extends IngestionTestBase
         null,
         AuthTestUtils.TEST_AUTHORIZER_MAPPER,
         null,
-        rowIngestionMetersFactory
+        rowIngestionMetersFactory,
+        appenderatorsManager
     );
 
     TaskStatus status = runTask(indexTask).lhs;
@@ -1148,7 +1168,8 @@ public class IndexTaskTest extends IngestionTestBase
         null,
         AuthTestUtils.TEST_AUTHORIZER_MAPPER,
         null,
-        rowIngestionMetersFactory
+        rowIngestionMetersFactory,
+        appenderatorsManager
     );
 
     TaskStatus status = runTask(indexTask).lhs;
@@ -1265,7 +1286,8 @@ public class IndexTaskTest extends IngestionTestBase
         null,
         AuthTestUtils.TEST_AUTHORIZER_MAPPER,
         null,
-        rowIngestionMetersFactory
+        rowIngestionMetersFactory,
+        appenderatorsManager
     );
 
     TaskStatus status = runTask(indexTask).lhs;
@@ -1365,7 +1387,8 @@ public class IndexTaskTest extends IngestionTestBase
         null,
         AuthTestUtils.TEST_AUTHORIZER_MAPPER,
         null,
-        rowIngestionMetersFactory
+        rowIngestionMetersFactory,
+        appenderatorsManager
     );
 
     final List<DataSegment> segments = runTask(indexTask).rhs;
@@ -1436,7 +1459,8 @@ public class IndexTaskTest extends IngestionTestBase
         null,
         AuthTestUtils.TEST_AUTHORIZER_MAPPER,
         null,
-        rowIngestionMetersFactory
+        rowIngestionMetersFactory,
+        appenderatorsManager
     );
 
     TaskStatus status = runTask(indexTask).lhs;
@@ -1484,7 +1508,8 @@ public class IndexTaskTest extends IngestionTestBase
           null,
           AuthTestUtils.TEST_AUTHORIZER_MAPPER,
           null,
-          rowIngestionMetersFactory
+          rowIngestionMetersFactory,
+          appenderatorsManager
       );
 
       final List<DataSegment> segments = runTask(indexTask).rhs;
@@ -1551,7 +1576,8 @@ public class IndexTaskTest extends IngestionTestBase
           null,
           AuthTestUtils.TEST_AUTHORIZER_MAPPER,
           null,
-          rowIngestionMetersFactory
+          rowIngestionMetersFactory,
+          appenderatorsManager
       );
 
       final List<DataSegment> segments = runTask(indexTask).rhs;
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IngestionTestBase.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IngestionTestBase.java
index 15e9aa5..9c782a0 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IngestionTestBase.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IngestionTestBase.java
@@ -24,7 +24,7 @@ import com.google.common.base.Optional;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
 import org.apache.druid.indexer.TaskStatus;
-import org.apache.druid.indexing.common.TaskReportFileWriter;
+import org.apache.druid.indexing.common.SingleFileTaskReportFileWriter;
 import org.apache.druid.indexing.common.TaskToolbox;
 import org.apache.druid.indexing.common.TestUtils;
 import org.apache.druid.indexing.common.actions.LocalTaskActionClient;
@@ -302,7 +302,7 @@ public abstract class IngestionTestBase
             null,
             null,
             null,
-            new TaskReportFileWriter(taskReportsFile)
+            new SingleFileTaskReportFileWriter(taskReportsFile)
         );
 
         if (task.isReady(box.getTaskActionClient())) {
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/NoopTestTaskFileWriter.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/NoopTestTaskReportFileWriter.java
similarity index 79%
rename from indexing-service/src/test/java/org/apache/druid/indexing/common/task/NoopTestTaskFileWriter.java
rename to indexing-service/src/test/java/org/apache/druid/indexing/common/task/NoopTestTaskReportFileWriter.java
index 3e7b8fd..0398a21 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/NoopTestTaskFileWriter.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/NoopTestTaskReportFileWriter.java
@@ -19,20 +19,22 @@
 
 package org.apache.druid.indexing.common.task;
 
+import com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.druid.indexing.common.TaskReport;
 import org.apache.druid.indexing.common.TaskReportFileWriter;
 
 import java.util.Map;
 
-public class NoopTestTaskFileWriter extends TaskReportFileWriter
+public class NoopTestTaskReportFileWriter implements TaskReportFileWriter
 {
-  public NoopTestTaskFileWriter()
+  @Override
+  public void write(String id, Map<String, TaskReport> reports)
   {
-    super(null);
   }
 
   @Override
-  public void write(Map<String, TaskReport> reports)
+  public void setObjectMapper(ObjectMapper objectMapper)
   {
+
   }
 }
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/RealtimeIndexTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/RealtimeIndexTaskTest.java
index a500905..29a7391 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/RealtimeIndexTaskTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/RealtimeIndexTaskTest.java
@@ -999,7 +999,7 @@ public class RealtimeIndexTaskTest
         EasyMock.createNiceMock(DruidNode.class),
         new LookupNodeService("tier"),
         new DataNodeService("tier", 1000, ServerType.INDEXER_EXECUTOR, 0),
-        new NoopTestTaskFileWriter()
+        new NoopTestTaskReportFileWriter()
     );
 
     return toolboxFactory.build(task);
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TaskSerdeTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TaskSerdeTest.java
index a58b295..189b4df 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TaskSerdeTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TaskSerdeTest.java
@@ -218,7 +218,8 @@ public class TaskSerdeTest
         null,
         AuthTestUtils.TEST_AUTHORIZER_MAPPER,
         null,
-        rowIngestionMetersFactory
+        rowIngestionMetersFactory,
+        null
     );
 
     final String json = jsonMapper.writeValueAsString(task);
@@ -301,7 +302,8 @@ public class TaskSerdeTest
         null,
         AuthTestUtils.TEST_AUTHORIZER_MAPPER,
         null,
-        rowIngestionMetersFactory
+        rowIngestionMetersFactory,
+        null
     );
 
     for (final Module jacksonModule : new FirehoseModule().getJacksonModules()) {
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TestAppenderatorsManager.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TestAppenderatorsManager.java
new file mode 100644
index 0000000..28b15ea
--- /dev/null
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TestAppenderatorsManager.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.common.task;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.druid.client.cache.Cache;
+import org.apache.druid.client.cache.CacheConfig;
+import org.apache.druid.client.cache.CachePopulatorStats;
+import org.apache.druid.java.util.emitter.service.ServiceEmitter;
+import org.apache.druid.query.Query;
+import org.apache.druid.query.QueryRunner;
+import org.apache.druid.query.QueryRunnerFactoryConglomerate;
+import org.apache.druid.query.SegmentDescriptor;
+import org.apache.druid.segment.IndexIO;
+import org.apache.druid.segment.IndexMerger;
+import org.apache.druid.segment.indexing.DataSchema;
+import org.apache.druid.segment.loading.DataSegmentPusher;
+import org.apache.druid.segment.realtime.FireDepartmentMetrics;
+import org.apache.druid.segment.realtime.appenderator.Appenderator;
+import org.apache.druid.segment.realtime.appenderator.AppenderatorConfig;
+import org.apache.druid.segment.realtime.appenderator.Appenderators;
+import org.apache.druid.segment.realtime.appenderator.AppenderatorsManager;
+import org.apache.druid.server.coordination.DataSegmentAnnouncer;
+import org.joda.time.Interval;
+
+import java.util.concurrent.ExecutorService;
+
+public class TestAppenderatorsManager implements AppenderatorsManager
+{
+  private Appenderator realtimeAppenderator;
+
+  @Override
+  public Appenderator createRealtimeAppenderatorForTask(
+      String taskId,
+      DataSchema schema,
+      AppenderatorConfig config,
+      FireDepartmentMetrics metrics,
+      DataSegmentPusher dataSegmentPusher,
+      ObjectMapper objectMapper,
+      IndexIO indexIO,
+      IndexMerger indexMerger,
+      QueryRunnerFactoryConglomerate conglomerate,
+      DataSegmentAnnouncer segmentAnnouncer,
+      ServiceEmitter emitter,
+      ExecutorService queryExecutorService,
+      Cache cache,
+      CacheConfig cacheConfig,
+      CachePopulatorStats cachePopulatorStats
+  )
+  {
+    realtimeAppenderator = Appenderators.createRealtime(
+        schema,
+        config,
+        metrics,
+        dataSegmentPusher,
+        objectMapper,
+        indexIO,
+        indexMerger,
+        conglomerate,
+        segmentAnnouncer,
+        emitter,
+        queryExecutorService,
+        cache,
+        cacheConfig,
+        cachePopulatorStats
+    );
+    return realtimeAppenderator;
+  }
+
+  @Override
+  public Appenderator createOfflineAppenderatorForTask(
+      String taskId,
+      DataSchema schema,
+      AppenderatorConfig config,
+      FireDepartmentMetrics metrics,
+      DataSegmentPusher dataSegmentPusher,
+      ObjectMapper objectMapper,
+      IndexIO indexIO,
+      IndexMerger indexMerger
+  )
+  {
+    return Appenderators.createOffline(
+        schema,
+        config,
+        metrics,
+        dataSegmentPusher,
+        objectMapper,
+        indexIO,
+        indexMerger
+    );
+  }
+
+  @Override
+  public void removeAppenderatorForTask(String taskId)
+  {
+    // nothing to remove
+  }
+
+  @Override
+  public <T> QueryRunner<T> getQueryRunnerForIntervals(
+      Query<T> query,
+      Iterable<Interval> intervals
+  )
+  {
+    if (realtimeAppenderator != null) {
+      return realtimeAppenderator.getQueryRunnerForIntervals(query, intervals);
+    } else {
+      return null;
+    }
+  }
+
+  @Override
+  public <T> QueryRunner<T> getQueryRunnerForSegments(
+      Query<T> query,
+      Iterable<SegmentDescriptor> specs
+  )
+  {
+    if (realtimeAppenderator != null) {
+      return realtimeAppenderator.getQueryRunnerForSegments(query, specs);
+    } else {
+      return null;
+    }
+  }
+
+  @Override
+  public boolean shouldTaskMakeNodeAnnouncements()
+  {
+    return true;
+  }
+}
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractParallelIndexSupervisorTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractParallelIndexSupervisorTaskTest.java
index 2bc229d..95467d7 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractParallelIndexSupervisorTaskTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractParallelIndexSupervisorTaskTest.java
@@ -43,9 +43,10 @@ import org.apache.druid.indexing.common.actions.TaskActionClient;
 import org.apache.druid.indexing.common.stats.DropwizardRowIngestionMetersFactory;
 import org.apache.druid.indexing.common.task.IndexTaskClientFactory;
 import org.apache.druid.indexing.common.task.IngestionTestBase;
-import org.apache.druid.indexing.common.task.NoopTestTaskFileWriter;
+import org.apache.druid.indexing.common.task.NoopTestTaskReportFileWriter;
 import org.apache.druid.indexing.common.task.Task;
 import org.apache.druid.indexing.common.task.TaskResource;
+import org.apache.druid.indexing.common.task.TestAppenderatorsManager;
 import org.apache.druid.java.util.common.DateTimes;
 import org.apache.druid.java.util.common.ISE;
 import org.apache.druid.java.util.common.concurrent.Execs;
@@ -251,7 +252,7 @@ public class AbstractParallelIndexSupervisorTaskTest extends IngestionTestBase
         null,
         null,
         null,
-        new NoopTestTaskFileWriter()
+        new NoopTestTaskReportFileWriter()
     );
   }
 
@@ -280,7 +281,8 @@ public class AbstractParallelIndexSupervisorTaskTest extends IngestionTestBase
               return new AllowAllAuthorizer();
             }
           },
-          new DropwizardRowIngestionMetersFactory()
+          new DropwizardRowIngestionMetersFactory(),
+          new TestAppenderatorsManager()
       );
     }
   }
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskKillTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskKillTest.java
index c7c8d0b..2d0678a 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskKillTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskKillTest.java
@@ -31,6 +31,7 @@ import org.apache.druid.indexing.common.TaskToolbox;
 import org.apache.druid.indexing.common.actions.TaskActionClient;
 import org.apache.druid.indexing.common.task.IndexTaskClientFactory;
 import org.apache.druid.indexing.common.task.TaskResource;
+import org.apache.druid.indexing.common.task.TestAppenderatorsManager;
 import org.apache.druid.java.util.common.Intervals;
 import org.apache.druid.java.util.common.Pair;
 import org.apache.druid.java.util.common.concurrent.Execs;
@@ -396,7 +397,8 @@ public class ParallelIndexSupervisorTaskKillTest extends AbstractParallelIndexSu
           ingestionSchema,
           context,
           indexingServiceClient,
-          taskClientFactory
+          taskClientFactory,
+          new TestAppenderatorsManager()
       );
     }
 
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskResourceTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskResourceTest.java
index c0a38c5..d62512e 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskResourceTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskResourceTest.java
@@ -35,6 +35,7 @@ import org.apache.druid.indexing.common.TaskToolbox;
 import org.apache.druid.indexing.common.task.AbstractTask;
 import org.apache.druid.indexing.common.task.IndexTaskClientFactory;
 import org.apache.druid.indexing.common.task.TaskResource;
+import org.apache.druid.indexing.common.task.TestAppenderatorsManager;
 import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexTaskRunner.SubTaskSpecStatus;
 import org.apache.druid.java.util.common.DateTimes;
 import org.apache.druid.java.util.common.ISE;
@@ -643,7 +644,8 @@ public class ParallelIndexSupervisorTaskResourceTest extends AbstractParallelInd
           ingestionSchema,
           context,
           null,
-          taskClientFactory
+          taskClientFactory,
+          new TestAppenderatorsManager()
       );
       this.taskClientFactory = taskClientFactory;
     }
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskSerdeTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskSerdeTest.java
index 0fa747f..ccd3bd0 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskSerdeTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskSerdeTest.java
@@ -31,6 +31,7 @@ import org.apache.druid.indexing.common.TestUtils;
 import org.apache.druid.indexing.common.stats.DropwizardRowIngestionMetersFactory;
 import org.apache.druid.indexing.common.task.Task;
 import org.apache.druid.indexing.common.task.TaskResource;
+import org.apache.druid.indexing.common.task.TestAppenderatorsManager;
 import org.apache.druid.java.util.common.Intervals;
 import org.apache.druid.java.util.common.granularity.Granularities;
 import org.apache.druid.query.aggregation.AggregatorFactory;
@@ -159,7 +160,8 @@ public class ParallelIndexSupervisorTaskSerdeTest
         new NoopIndexingServiceClient(),
         new NoopChatHandlerProvider(),
         new AuthorizerMapper(Collections.emptyMap()),
-        new DropwizardRowIngestionMetersFactory()
+        new DropwizardRowIngestionMetersFactory(),
+        new TestAppenderatorsManager()
     );
   }
 }
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskTest.java
index c6e127b..14ddf4b 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskTest.java
@@ -30,6 +30,7 @@ import org.apache.druid.indexing.common.TaskToolbox;
 import org.apache.druid.indexing.common.actions.TaskActionClient;
 import org.apache.druid.indexing.common.task.TaskResource;
 import org.apache.druid.indexing.common.task.Tasks;
+import org.apache.druid.indexing.common.task.TestAppenderatorsManager;
 import org.apache.druid.java.util.common.Intervals;
 import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.java.util.common.granularity.Granularities;
@@ -144,7 +145,8 @@ public class ParallelIndexSupervisorTaskTest extends AbstractParallelIndexSuperv
           spec.getIngestionSpec(),
           spec.getContext(),
           indexingServiceClient,
-          null
+          null,
+          new TestAppenderatorsManager()
       );
       final TaskActionClient subTaskActionClient = createActionClient(subTask);
       prepareTaskForLocking(subTask);
@@ -532,7 +534,8 @@ public class ParallelIndexSupervisorTaskTest extends AbstractParallelIndexSuperv
           getIngestionSpec(),
           getContext(),
           null,
-          new LocalParallelIndexTaskClientFactory(supervisorTask)
+          new LocalParallelIndexTaskClientFactory(supervisorTask),
+          new TestAppenderatorsManager()
       );
     }
   }
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/SingleTaskBackgroundRunnerTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/SingleTaskBackgroundRunnerTest.java
index 077ae47..59c2fec 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/SingleTaskBackgroundRunnerTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/SingleTaskBackgroundRunnerTest.java
@@ -23,7 +23,7 @@ import com.google.common.util.concurrent.ListenableFuture;
 import org.apache.druid.indexer.TaskState;
 import org.apache.druid.indexer.TaskStatus;
 import org.apache.druid.indexing.common.SegmentLoaderFactory;
-import org.apache.druid.indexing.common.TaskReportFileWriter;
+import org.apache.druid.indexing.common.SingleFileTaskReportFileWriter;
 import org.apache.druid.indexing.common.TaskToolbox;
 import org.apache.druid.indexing.common.TaskToolboxFactory;
 import org.apache.druid.indexing.common.TestUtils;
@@ -105,7 +105,7 @@ public class SingleTaskBackgroundRunnerTest
         node,
         null,
         null,
-        new TaskReportFileWriter(new File("fake"))
+        new SingleFileTaskReportFileWriter(new File("fake"))
     );
     runner = new SingleTaskBackgroundRunner(
         toolboxFactory,
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java
index b89d8f9..cb320e8 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java
@@ -66,10 +66,11 @@ import org.apache.druid.indexing.common.task.IndexTask.IndexIOConfig;
 import org.apache.druid.indexing.common.task.IndexTask.IndexIngestionSpec;
 import org.apache.druid.indexing.common.task.IndexTask.IndexTuningConfig;
 import org.apache.druid.indexing.common.task.KillTask;
-import org.apache.druid.indexing.common.task.NoopTestTaskFileWriter;
+import org.apache.druid.indexing.common.task.NoopTestTaskReportFileWriter;
 import org.apache.druid.indexing.common.task.RealtimeIndexTask;
 import org.apache.druid.indexing.common.task.Task;
 import org.apache.druid.indexing.common.task.TaskResource;
+import org.apache.druid.indexing.common.task.TestAppenderatorsManager;
 import org.apache.druid.indexing.overlord.config.TaskQueueConfig;
 import org.apache.druid.indexing.overlord.supervisor.SupervisorManager;
 import org.apache.druid.indexing.test.TestIndexerMetadataStorageCoordinator;
@@ -110,6 +111,7 @@ import org.apache.druid.segment.loading.SegmentLoaderConfig;
 import org.apache.druid.segment.loading.StorageLocationConfig;
 import org.apache.druid.segment.realtime.FireDepartment;
 import org.apache.druid.segment.realtime.FireDepartmentTest;
+import org.apache.druid.segment.realtime.appenderator.AppenderatorsManager;
 import org.apache.druid.segment.realtime.plumber.SegmentHandoffNotifier;
 import org.apache.druid.segment.realtime.plumber.SegmentHandoffNotifierFactory;
 import org.apache.druid.server.DruidNode;
@@ -228,6 +230,7 @@ public class TaskLifecycleTest
   private TaskQueueConfig tqc;
   private TaskConfig taskConfig;
   private DataSegmentPusher dataSegmentPusher;
+  private AppenderatorsManager appenderatorsManager;
 
   private int pushedSegments;
   private int announcedSinks;
@@ -529,6 +532,8 @@ public class TaskLifecycleTest
     Preconditions.checkNotNull(taskStorage);
     Preconditions.checkNotNull(emitter);
 
+    appenderatorsManager = new TestAppenderatorsManager();
+
     taskLockbox = new TaskLockbox(taskStorage, mdc);
     tac = new LocalTaskActionClientFactory(
         taskStorage,
@@ -552,6 +557,7 @@ public class TaskLifecycleTest
         return new ArrayList<>();
       }
     };
+
     return new TaskToolboxFactory(
         taskConfig,
         tac,
@@ -622,7 +628,7 @@ public class TaskLifecycleTest
         EasyMock.createNiceMock(DruidNode.class),
         new LookupNodeService("tier"),
         new DataNodeService("tier", 1000, ServerType.INDEXER_EXECUTOR, 0),
-        new NoopTestTaskFileWriter()
+        new NoopTestTaskReportFileWriter()
     );
   }
 
@@ -708,7 +714,8 @@ public class TaskLifecycleTest
         null,
         AuthTestUtils.TEST_AUTHORIZER_MAPPER,
         null,
-        ROW_INGESTION_METERS_FACTORY
+        ROW_INGESTION_METERS_FACTORY,
+        appenderatorsManager
     );
 
     final Optional<TaskStatus> preRunTaskStatus = tsqa.getStatus(indexTask.getId());
@@ -790,7 +797,8 @@ public class TaskLifecycleTest
         null,
         AuthTestUtils.TEST_AUTHORIZER_MAPPER,
         null,
-        ROW_INGESTION_METERS_FACTORY
+        ROW_INGESTION_METERS_FACTORY,
+        null
     );
 
     final TaskStatus status = runTask(indexTask);
@@ -1185,7 +1193,8 @@ public class TaskLifecycleTest
         null,
         AuthTestUtils.TEST_AUTHORIZER_MAPPER,
         null,
-        ROW_INGESTION_METERS_FACTORY
+        ROW_INGESTION_METERS_FACTORY,
+        appenderatorsManager
     );
 
     final long startTime = System.currentTimeMillis();
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunnerTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunnerTest.java
index 5eddb2e..a94dd5e 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunnerTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunnerTest.java
@@ -87,7 +87,7 @@ public class HttpRemoteTaskRunnerTest
   {
     TestDruidNodeDiscovery druidNodeDiscovery = new TestDruidNodeDiscovery();
     DruidNodeDiscoveryProvider druidNodeDiscoveryProvider = EasyMock.createMock(DruidNodeDiscoveryProvider.class);
-    EasyMock.expect(druidNodeDiscoveryProvider.getForNodeType(NodeType.MIDDLE_MANAGER))
+    EasyMock.expect(druidNodeDiscoveryProvider.getForService(WorkerNodeService.DISCOVERY_SERVICE_KEY))
             .andReturn(druidNodeDiscovery);
     EasyMock.replay(druidNodeDiscoveryProvider);
 
@@ -178,7 +178,7 @@ public class HttpRemoteTaskRunnerTest
   {
     TestDruidNodeDiscovery druidNodeDiscovery = new TestDruidNodeDiscovery();
     DruidNodeDiscoveryProvider druidNodeDiscoveryProvider = EasyMock.createMock(DruidNodeDiscoveryProvider.class);
-    EasyMock.expect(druidNodeDiscoveryProvider.getForNodeType(NodeType.MIDDLE_MANAGER))
+    EasyMock.expect(druidNodeDiscoveryProvider.getForService(WorkerNodeService.DISCOVERY_SERVICE_KEY))
             .andReturn(druidNodeDiscovery);
     EasyMock.replay(druidNodeDiscoveryProvider);
 
@@ -268,7 +268,7 @@ public class HttpRemoteTaskRunnerTest
   {
     TestDruidNodeDiscovery druidNodeDiscovery = new TestDruidNodeDiscovery();
     DruidNodeDiscoveryProvider druidNodeDiscoveryProvider = EasyMock.createMock(DruidNodeDiscoveryProvider.class);
-    EasyMock.expect(druidNodeDiscoveryProvider.getForNodeType(NodeType.MIDDLE_MANAGER))
+    EasyMock.expect(druidNodeDiscoveryProvider.getForService(WorkerNodeService.DISCOVERY_SERVICE_KEY))
             .andReturn(druidNodeDiscovery);
     EasyMock.replay(druidNodeDiscoveryProvider);
 
@@ -421,7 +421,7 @@ public class HttpRemoteTaskRunnerTest
   {
     TestDruidNodeDiscovery druidNodeDiscovery = new TestDruidNodeDiscovery();
     DruidNodeDiscoveryProvider druidNodeDiscoveryProvider = EasyMock.createMock(DruidNodeDiscoveryProvider.class);
-    EasyMock.expect(druidNodeDiscoveryProvider.getForNodeType(NodeType.MIDDLE_MANAGER))
+    EasyMock.expect(druidNodeDiscoveryProvider.getForService(WorkerNodeService.DISCOVERY_SERVICE_KEY))
             .andReturn(druidNodeDiscovery);
     EasyMock.replay(druidNodeDiscoveryProvider);
 
@@ -597,7 +597,7 @@ public class HttpRemoteTaskRunnerTest
   {
     TestDruidNodeDiscovery druidNodeDiscovery = new TestDruidNodeDiscovery();
     DruidNodeDiscoveryProvider druidNodeDiscoveryProvider = EasyMock.createMock(DruidNodeDiscoveryProvider.class);
-    EasyMock.expect(druidNodeDiscoveryProvider.getForNodeType(NodeType.MIDDLE_MANAGER))
+    EasyMock.expect(druidNodeDiscoveryProvider.getForService(WorkerNodeService.DISCOVERY_SERVICE_KEY))
             .andReturn(druidNodeDiscovery);
     EasyMock.replay(druidNodeDiscoveryProvider);
 
@@ -771,7 +771,7 @@ public class HttpRemoteTaskRunnerTest
   {
     TestDruidNodeDiscovery druidNodeDiscovery = new TestDruidNodeDiscovery();
     DruidNodeDiscoveryProvider druidNodeDiscoveryProvider = EasyMock.createMock(DruidNodeDiscoveryProvider.class);
-    EasyMock.expect(druidNodeDiscoveryProvider.getForNodeType(NodeType.MIDDLE_MANAGER))
+    EasyMock.expect(druidNodeDiscoveryProvider.getForService(WorkerNodeService.DISCOVERY_SERVICE_KEY))
             .andReturn(druidNodeDiscovery);
     EasyMock.replay(druidNodeDiscoveryProvider);
 
@@ -1206,7 +1206,7 @@ public class HttpRemoteTaskRunnerTest
   {
     TestDruidNodeDiscovery druidNodeDiscovery = new TestDruidNodeDiscovery();
     DruidNodeDiscoveryProvider druidNodeDiscoveryProvider = EasyMock.createMock(DruidNodeDiscoveryProvider.class);
-    EasyMock.expect(druidNodeDiscoveryProvider.getForNodeType(NodeType.MIDDLE_MANAGER))
+    EasyMock.expect(druidNodeDiscoveryProvider.getForService(WorkerNodeService.DISCOVERY_SERVICE_KEY))
             .andReturn(druidNodeDiscovery);
     EasyMock.replay(druidNodeDiscoveryProvider);
 
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java
index 6381d0f..bac1cc8 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java
@@ -706,7 +706,8 @@ public class SeekableStreamSupervisorStateTest extends EasyMockSupport
           chatHandlerProvider,
           authorizerMapper,
           rowIngestionMetersFactory,
-          groupId
+          groupId,
+          null
       );
     }
 
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/worker/WorkerTaskManagerTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/worker/WorkerTaskManagerTest.java
index b4c21b7..5ea1ffd 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/worker/WorkerTaskManagerTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/worker/WorkerTaskManagerTest.java
@@ -34,7 +34,7 @@ import org.apache.druid.indexing.common.actions.TaskActionClient;
 import org.apache.druid.indexing.common.actions.TaskActionClientFactory;
 import org.apache.druid.indexing.common.config.TaskConfig;
 import org.apache.druid.indexing.common.task.NoopTask;
-import org.apache.druid.indexing.common.task.NoopTestTaskFileWriter;
+import org.apache.druid.indexing.common.task.NoopTestTaskReportFileWriter;
 import org.apache.druid.indexing.common.task.Task;
 import org.apache.druid.indexing.common.task.Tasks;
 import org.apache.druid.indexing.overlord.TestTaskRunner;
@@ -131,7 +131,7 @@ public class WorkerTaskManagerTest
                 null,
                 null,
                 null,
-                new NoopTestTaskFileWriter()
+                new NoopTestTaskReportFileWriter()
             ),
             taskConfig,
             location
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/worker/WorkerTaskMonitorTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/worker/WorkerTaskMonitorTest.java
index 8c22aaa..c598597 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/worker/WorkerTaskMonitorTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/worker/WorkerTaskMonitorTest.java
@@ -39,7 +39,7 @@ import org.apache.druid.indexing.common.TestUtils;
 import org.apache.druid.indexing.common.actions.TaskActionClient;
 import org.apache.druid.indexing.common.actions.TaskActionClientFactory;
 import org.apache.druid.indexing.common.config.TaskConfig;
-import org.apache.druid.indexing.common.task.NoopTestTaskFileWriter;
+import org.apache.druid.indexing.common.task.NoopTestTaskReportFileWriter;
 import org.apache.druid.indexing.common.task.Task;
 import org.apache.druid.indexing.overlord.SingleTaskBackgroundRunner;
 import org.apache.druid.indexing.overlord.TestRemoteTaskRunnerConfig;
@@ -179,7 +179,7 @@ public class WorkerTaskMonitorTest
                 null,
                 null,
                 null,
-                new NoopTestTaskFileWriter()
+                new NoopTestTaskReportFileWriter()
             ),
             taskConfig,
             new NoopServiceEmitter(),
diff --git a/server/src/main/java/org/apache/druid/discovery/DruidNodeDiscoveryProvider.java b/server/src/main/java/org/apache/druid/discovery/DruidNodeDiscoveryProvider.java
index 4dcb777..ee9a2e8 100644
--- a/server/src/main/java/org/apache/druid/discovery/DruidNodeDiscoveryProvider.java
+++ b/server/src/main/java/org/apache/druid/discovery/DruidNodeDiscoveryProvider.java
@@ -40,9 +40,9 @@ import java.util.concurrent.ConcurrentMap;
 public abstract class DruidNodeDiscoveryProvider
 {
   private static final Map<String, Set<NodeType>> SERVICE_TO_NODE_TYPES = ImmutableMap.of(
-      LookupNodeService.DISCOVERY_SERVICE_KEY, ImmutableSet.of(NodeType.BROKER, NodeType.HISTORICAL, NodeType.PEON),
-      DataNodeService.DISCOVERY_SERVICE_KEY, ImmutableSet.of(NodeType.HISTORICAL, NodeType.PEON),
-      WorkerNodeService.DISCOVERY_SERVICE_KEY, ImmutableSet.of(NodeType.PEON)
+      LookupNodeService.DISCOVERY_SERVICE_KEY, ImmutableSet.of(NodeType.BROKER, NodeType.HISTORICAL, NodeType.PEON, NodeType.INDEXER),
+      DataNodeService.DISCOVERY_SERVICE_KEY, ImmutableSet.of(NodeType.HISTORICAL, NodeType.PEON, NodeType.INDEXER),
+      WorkerNodeService.DISCOVERY_SERVICE_KEY, ImmutableSet.of(NodeType.MIDDLE_MANAGER, NodeType.INDEXER)
   );
 
   private final ConcurrentHashMap<String, ServiceDruidNodeDiscovery> serviceDiscoveryMap =
diff --git a/server/src/main/java/org/apache/druid/discovery/NodeType.java b/server/src/main/java/org/apache/druid/discovery/NodeType.java
index 841656b..1f6f90d 100644
--- a/server/src/main/java/org/apache/druid/discovery/NodeType.java
+++ b/server/src/main/java/org/apache/druid/discovery/NodeType.java
@@ -36,7 +36,8 @@ public enum NodeType
   OVERLORD("overlord"),
   PEON("peon"),
   ROUTER("router"),
-  MIDDLE_MANAGER("middleManager");
+  MIDDLE_MANAGER("middleManager"),
+  INDEXER("indexer");
 
   private final String jsonName;
 
diff --git a/server/src/main/java/org/apache/druid/query/lookup/LookupModule.java b/server/src/main/java/org/apache/druid/query/lookup/LookupModule.java
index c173dc1..1459663 100644
--- a/server/src/main/java/org/apache/druid/query/lookup/LookupModule.java
+++ b/server/src/main/java/org/apache/druid/query/lookup/LookupModule.java
@@ -46,6 +46,7 @@ public class LookupModule implements DruidModule
 {
   static final String PROPERTY_BASE = "druid.lookup";
   public static final String FAILED_UPDATES_KEY = "failedUpdates";
+  public static final int LOOKUP_LISTENER_QOS_MAX_REQUESTS = 2;
 
   public static String getTierListenerPath(String tier)
   {
@@ -80,7 +81,7 @@ public class LookupModule implements DruidModule
     JettyBindings.addQosFilter(
         binder,
         ListenerResource.BASE_PATH + "/" + LookupCoordinatorManager.LOOKUP_LISTEN_ANNOUNCE_KEY,
-        2 // 1 for "normal" operation and 1 for "emergency" or other
+        LOOKUP_LISTENER_QOS_MAX_REQUESTS // 1 for "normal" operation and 1 for "emergency" or other
     );
   }
 
diff --git a/server/src/main/java/org/apache/druid/segment/realtime/CliIndexerDataSegmentServerAnnouncerLifecycleHandler.java b/server/src/main/java/org/apache/druid/segment/realtime/CliIndexerDataSegmentServerAnnouncerLifecycleHandler.java
new file mode 100644
index 0000000..e874a30
--- /dev/null
+++ b/server/src/main/java/org/apache/druid/segment/realtime/CliIndexerDataSegmentServerAnnouncerLifecycleHandler.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.realtime;
+
+import com.google.common.base.Throwables;
+import com.google.inject.Inject;
+import org.apache.druid.concurrent.LifecycleLock;
+import org.apache.druid.guice.ManageLifecycle;
+import org.apache.druid.java.util.common.lifecycle.LifecycleStart;
+import org.apache.druid.java.util.common.lifecycle.LifecycleStop;
+import org.apache.druid.java.util.emitter.EmittingLogger;
+import org.apache.druid.server.coordination.DataSegmentServerAnnouncer;
+
+import java.io.IOException;
+
+/**
+ * Ties the {@link DataSegmentServerAnnouncer} announce/unannounce to the lifecycle start and stop.
+ *
+ * Analogous to {@link org.apache.druid.server.coordination.SegmentLoadDropHandler} on the Historicals,
+ * but without segment cache management.
+ */
+@ManageLifecycle
+public class CliIndexerDataSegmentServerAnnouncerLifecycleHandler
+{
+  private static final EmittingLogger LOG = new EmittingLogger(CliIndexerDataSegmentServerAnnouncerLifecycleHandler.class);
+
+  private final DataSegmentServerAnnouncer dataSegmentServerAnnouncer;
+
+  private final LifecycleLock lifecycleLock = new LifecycleLock();
+
+  @Inject
+  public CliIndexerDataSegmentServerAnnouncerLifecycleHandler(
+      DataSegmentServerAnnouncer dataSegmentServerAnnouncer
+  )
+  {
+    this.dataSegmentServerAnnouncer = dataSegmentServerAnnouncer;
+  }
+
+  @LifecycleStart
+  public void start() throws IOException
+  {
+    if (!lifecycleLock.canStart()) {
+      throw new RuntimeException("Lifecycle lock could not start");
+    }
+
+    try {
+      if (lifecycleLock.isStarted()) {
+        return;
+      }
+
+      LOG.info("Starting...");
+      try {
+        dataSegmentServerAnnouncer.announce();
+      }
+      catch (Exception e) {
+        Throwables.propagateIfPossible(e, IOException.class);
+        throw new RuntimeException(e);
+      }
+      LOG.info("Started.");
+      lifecycleLock.started();
+    }
+    finally {
+      lifecycleLock.exitStart();
+    }
+  }
+
+  @LifecycleStop
+  public void stop()
+  {
+    if (!lifecycleLock.canStop()) {
+      throw new RuntimeException("Lifecycle lock could not stop");
+    }
+
+    if (!lifecycleLock.isStarted()) {
+      return;
+    }
+
+    LOG.info("Stopping...");
+    try {
+      dataSegmentServerAnnouncer.unannounce();
+    }
+    catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+    LOG.info("Stopped.");
+  }
+}
diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorImpl.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorImpl.java
index 1a2fd5f..4162a5f 100644
--- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorImpl.java
+++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorImpl.java
@@ -128,9 +128,7 @@ public class AppenderatorImpl implements Appenderator
    */
   private final ConcurrentMap<SegmentIdWithShardSpec, Sink> sinks = new ConcurrentHashMap<>();
   private final Set<SegmentIdWithShardSpec> droppingSinks = Sets.newConcurrentHashSet();
-  private final VersionedIntervalTimeline<String, Sink> sinkTimeline = new VersionedIntervalTimeline<>(
-      String.CASE_INSENSITIVE_ORDER
-  );
+  private final VersionedIntervalTimeline<String, Sink> sinkTimeline;
   private final long maxBytesTuningConfig;
 
   private final QuerySegmentWalker texasRanger;
@@ -173,6 +171,55 @@ public class AppenderatorImpl implements Appenderator
       CachePopulatorStats cachePopulatorStats
   )
   {
+    this(
+        schema,
+        tuningConfig,
+        metrics,
+        dataSegmentPusher,
+        objectMapper,
+        segmentAnnouncer,
+        conglomerate == null ? null : new SinkQuerySegmentWalker(
+            schema.getDataSource(),
+            new VersionedIntervalTimeline<>(
+                String.CASE_INSENSITIVE_ORDER
+            ),
+            objectMapper,
+            emitter,
+            conglomerate,
+            queryExecutorService,
+            Preconditions.checkNotNull(cache, "cache"),
+            cacheConfig,
+            cachePopulatorStats
+        ),
+        indexIO,
+        indexMerger,
+        cache
+    );
+    log.info("Created Appenderator for dataSource[%s].", schema.getDataSource());
+  }
+
+  /**
+   * This constructor allows the caller to provide its own SinkQuerySegmentWalker.
+   *
+   * The sinkTimeline is set to the sink timeline of the provided SinkQuerySegmentWalker.
+   * If the SinkQuerySegmentWalker is null, a new sink timeline is initialized.
+   *
+   * It is used by UnifiedIndexerAppenderatorsManager which allows queries on data associated with multiple
+   * Appenderators.
+   */
+  AppenderatorImpl(
+      DataSchema schema,
+      AppenderatorConfig tuningConfig,
+      FireDepartmentMetrics metrics,
+      DataSegmentPusher dataSegmentPusher,
+      ObjectMapper objectMapper,
+      DataSegmentAnnouncer segmentAnnouncer,
+      SinkQuerySegmentWalker sinkQuerySegmentWalker,
+      IndexIO indexIO,
+      IndexMerger indexMerger,
+      Cache cache
+  )
+  {
     this.schema = Preconditions.checkNotNull(schema, "schema");
     this.tuningConfig = Preconditions.checkNotNull(tuningConfig, "tuningConfig");
     this.metrics = Preconditions.checkNotNull(metrics, "metrics");
@@ -182,21 +229,21 @@ public class AppenderatorImpl implements Appenderator
     this.indexIO = Preconditions.checkNotNull(indexIO, "indexIO");
     this.indexMerger = Preconditions.checkNotNull(indexMerger, "indexMerger");
     this.cache = cache;
-    this.texasRanger = conglomerate == null ? null : new SinkQuerySegmentWalker(
-        schema.getDataSource(),
-        sinkTimeline,
-        objectMapper,
-        emitter,
-        conglomerate,
-        queryExecutorService,
-        Preconditions.checkNotNull(cache, "cache"),
-        cacheConfig,
-        cachePopulatorStats
-    );
+    this.texasRanger = sinkQuerySegmentWalker;
+
+    if (sinkQuerySegmentWalker == null) {
+      this.sinkTimeline = new VersionedIntervalTimeline<>(
+          String.CASE_INSENSITIVE_ORDER
+      );
+    } else {
+      this.sinkTimeline = sinkQuerySegmentWalker.getSinkTimeline();
+    }
+
     maxBytesTuningConfig = TuningConfigs.getMaxBytesInMemoryOrDefault(tuningConfig.getMaxBytesInMemory());
     log.info("Created Appenderator for dataSource[%s].", schema.getDataSource());
   }
 
+
   @Override
   public String getDataSource()
   {
diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorsManager.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorsManager.java
new file mode 100644
index 0000000..06d3f6e
--- /dev/null
+++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorsManager.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.realtime.appenderator;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.druid.client.cache.Cache;
+import org.apache.druid.client.cache.CacheConfig;
+import org.apache.druid.client.cache.CachePopulatorStats;
+import org.apache.druid.java.util.emitter.service.ServiceEmitter;
+import org.apache.druid.query.Query;
+import org.apache.druid.query.QueryRunner;
+import org.apache.druid.query.QueryRunnerFactoryConglomerate;
+import org.apache.druid.query.SegmentDescriptor;
+import org.apache.druid.segment.IndexIO;
+import org.apache.druid.segment.IndexMerger;
+import org.apache.druid.segment.indexing.DataSchema;
+import org.apache.druid.segment.loading.DataSegmentPusher;
+import org.apache.druid.segment.realtime.FireDepartmentMetrics;
+import org.apache.druid.server.coordination.DataSegmentAnnouncer;
+import org.joda.time.Interval;
+
+import java.util.concurrent.ExecutorService;
+
+/**
+ * This interface defines entities that create and manage potentially multiple {@link Appenderator} instances.
+ *
+ * The AppenderatorsManager should be used by tasks running in a Peon or an CliIndexer process when it needs
+ * an Appenderator.
+ *
+ * The AppenderatorsManager also provides methods for creating {@link QueryRunner} instances that read the data
+ * held by the Appenderators created through the AppenderatorsManager.
+ *
+ * In later updates, this interface will be used to manage memory usage across multiple Appenderators,
+ * useful for the Indexer where all Tasks run in the same process.
+ *
+ * The methods on AppenderatorsManager can be called by multiple threads.
+ *
+ * This class provides similar functionality to the {@link org.apache.druid.server.coordination.ServerManager} and
+ * {@link org.apache.druid.server.SegmentManager} on the Historical processes.
+ */
+public interface AppenderatorsManager
+{
+  /**
+   * Creates an Appenderator suited for realtime ingestion. Note that this method's parameters include objects
+   * used for query processing.
+   */
+  Appenderator createRealtimeAppenderatorForTask(
+      String taskId,
+      DataSchema schema,
+      AppenderatorConfig config,
+      FireDepartmentMetrics metrics,
+      DataSegmentPusher dataSegmentPusher,
+      ObjectMapper objectMapper,
+      IndexIO indexIO,
+      IndexMerger indexMerger,
+      QueryRunnerFactoryConglomerate conglomerate,
+      DataSegmentAnnouncer segmentAnnouncer,
+      ServiceEmitter emitter,
+      ExecutorService queryExecutorService,
+      Cache cache,
+      CacheConfig cacheConfig,
+      CachePopulatorStats cachePopulatorStats
+  );
+
+  /**
+   * Creates an Appenderator suited for batch ingestion.
+   */
+  Appenderator createOfflineAppenderatorForTask(
+      String taskId,
+      DataSchema schema,
+      AppenderatorConfig config,
+      FireDepartmentMetrics metrics,
+      DataSegmentPusher dataSegmentPusher,
+      ObjectMapper objectMapper,
+      IndexIO indexIO,
+      IndexMerger indexMerger
+  );
+
+  /**
+   * Removes any internal Appenderator-tracking state associated with the provided taskId.
+   *
+   * This method should be called when a task is finished using its Appenderator that was previously created by
+   * createRealtimeAppenderatorForTask or createOfflineAppenderatorForTask.
+   *
+   * The method can be called by the entity managing Tasks when the Tasks finish, such as ThreadingTaskRunner.
+   */
+  void removeAppenderatorForTask(String taskId);
+
+  /**
+   * Returns a query runner for the given intervals over the Appenderators managed by this AppenderatorsManager.
+   */
+  <T> QueryRunner<T> getQueryRunnerForIntervals(
+      Query<T> query,
+      Iterable<Interval> intervals
+  );
+
+  /**
+   * Returns a query runner for the given segment specs over the Appenderators managed by this AppenderatorsManager.
+   */
+  <T> QueryRunner<T> getQueryRunnerForSegments(
+      Query<T> query,
+      Iterable<SegmentDescriptor> specs
+  );
+
+  /**
+   * As AppenderatorsManager implementions are service dependent (i.e., Peons and Indexers have different impls),
+   * this method allows Tasks to know whether they should announce themselves as nodes and segment servers
+   * to the rest of the cluster.
+   *
+   * Only Tasks running in Peons (i.e., as separate processes) should make their own individual node announcements.
+   */
+  boolean shouldTaskMakeNodeAnnouncements();
+}
diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/DummyForInjectionAppenderatorsManager.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/DummyForInjectionAppenderatorsManager.java
new file mode 100644
index 0000000..23b4567
--- /dev/null
+++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/DummyForInjectionAppenderatorsManager.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.realtime.appenderator;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.druid.client.cache.Cache;
+import org.apache.druid.client.cache.CacheConfig;
+import org.apache.druid.client.cache.CachePopulatorStats;
+import org.apache.druid.java.util.common.UOE;
+import org.apache.druid.java.util.emitter.service.ServiceEmitter;
+import org.apache.druid.query.Query;
+import org.apache.druid.query.QueryRunner;
+import org.apache.druid.query.QueryRunnerFactoryConglomerate;
+import org.apache.druid.query.SegmentDescriptor;
+import org.apache.druid.segment.IndexIO;
+import org.apache.druid.segment.IndexMerger;
+import org.apache.druid.segment.indexing.DataSchema;
+import org.apache.druid.segment.loading.DataSegmentPusher;
+import org.apache.druid.segment.realtime.FireDepartmentMetrics;
+import org.apache.druid.server.coordination.DataSegmentAnnouncer;
+import org.joda.time.Interval;
+
+import java.util.concurrent.ExecutorService;
+
+/**
+ * This implementation is needed because Overlords and MiddleManagers operate on Task objects which
+ * can require an AppenderatorsManager to be injected.
+ *
+ * The methods of this implementation throw exceptions because the Overlord/MM should never be calling
+ * the AppenderatorsManager.
+ */
+public class DummyForInjectionAppenderatorsManager implements AppenderatorsManager
+{
+  private static final String ERROR_MSG =
+      "AppenderatorsManager methods should only called by services that run tasks directly.";
+
+  @Override
+  public Appenderator createRealtimeAppenderatorForTask(
+      String taskId,
+      DataSchema schema,
+      AppenderatorConfig config,
+      FireDepartmentMetrics metrics,
+      DataSegmentPusher dataSegmentPusher,
+      ObjectMapper objectMapper,
+      IndexIO indexIO,
+      IndexMerger indexMerger,
+      QueryRunnerFactoryConglomerate conglomerate,
+      DataSegmentAnnouncer segmentAnnouncer,
+      ServiceEmitter emitter,
+      ExecutorService queryExecutorService,
+      Cache cache,
+      CacheConfig cacheConfig,
+      CachePopulatorStats cachePopulatorStats
+  )
+  {
+    throw new UOE(ERROR_MSG);
+  }
+
+  @Override
+  public Appenderator createOfflineAppenderatorForTask(
+      String taskId,
+      DataSchema schema,
+      AppenderatorConfig config,
+      FireDepartmentMetrics metrics,
+      DataSegmentPusher dataSegmentPusher,
+      ObjectMapper objectMapper,
+      IndexIO indexIO,
+      IndexMerger indexMerger
+  )
+  {
+    throw new UOE(ERROR_MSG);
+  }
+
+  @Override
+  public void removeAppenderatorForTask(String taskId)
+  {
+    throw new UOE(ERROR_MSG);
+  }
+
+  @Override
+  public <T> QueryRunner<T> getQueryRunnerForIntervals(
+      Query<T> query,
+      Iterable<Interval> intervals
+  )
+  {
+    throw new UOE(ERROR_MSG);
+  }
+
+  @Override
+  public <T> QueryRunner<T> getQueryRunnerForSegments(
+      Query<T> query,
+      Iterable<SegmentDescriptor> specs
+  )
+  {
+    throw new UOE(ERROR_MSG);
+  }
+
+  @Override
+  public boolean shouldTaskMakeNodeAnnouncements()
+  {
+    throw new UOE(ERROR_MSG);
+  }
+}
diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/PeonAppenderatorsManager.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/PeonAppenderatorsManager.java
new file mode 100644
index 0000000..7563b15
--- /dev/null
+++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/PeonAppenderatorsManager.java
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.realtime.appenderator;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.druid.client.cache.Cache;
+import org.apache.druid.client.cache.CacheConfig;
+import org.apache.druid.client.cache.CachePopulatorStats;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.emitter.service.ServiceEmitter;
+import org.apache.druid.query.Query;
+import org.apache.druid.query.QueryRunner;
+import org.apache.druid.query.QueryRunnerFactoryConglomerate;
+import org.apache.druid.query.SegmentDescriptor;
+import org.apache.druid.segment.IndexIO;
+import org.apache.druid.segment.IndexMerger;
+import org.apache.druid.segment.indexing.DataSchema;
+import org.apache.druid.segment.loading.DataSegmentPusher;
+import org.apache.druid.segment.realtime.FireDepartmentMetrics;
+import org.apache.druid.server.coordination.DataSegmentAnnouncer;
+import org.joda.time.Interval;
+
+import java.util.concurrent.ExecutorService;
+
+/**
+ * Manages Appenderators for tasks running within a CliPeon process.
+ *
+ * It provides the ability to create a realtime appenderator or multiple batch appenderators,
+ * and serves queries on the realtime appenderator.
+ *
+ * The implementation contains sanity checks that throw errors if more than one realtime appenderator is created,
+ * or if a task tries to create both realtime and batch appenderators. These checks can be adjusted if these
+ * assumptions are no longer true.
+ *
+ * Because the peon is a separate process that will terminate after task completion, this implementation
+ * relies on process shutdown for resource cleanup.
+ */
+public class PeonAppenderatorsManager implements AppenderatorsManager
+{
+  private Appenderator realtimeAppenderator;
+  private Appenderator batchAppenderator;
+
+  @Override
+  public Appenderator createRealtimeAppenderatorForTask(
+      String taskId,
+      DataSchema schema,
+      AppenderatorConfig config,
+      FireDepartmentMetrics metrics,
+      DataSegmentPusher dataSegmentPusher,
+      ObjectMapper objectMapper,
+      IndexIO indexIO,
+      IndexMerger indexMerger,
+      QueryRunnerFactoryConglomerate conglomerate,
+      DataSegmentAnnouncer segmentAnnouncer,
+      ServiceEmitter emitter,
+      ExecutorService queryExecutorService,
+      Cache cache,
+      CacheConfig cacheConfig,
+      CachePopulatorStats cachePopulatorStats
+  )
+  {
+    if (realtimeAppenderator != null) {
+      throw new ISE("A realtime appenderator was already created for this peon's task.");
+    } else if (batchAppenderator != null) {
+      throw new ISE("A batch appenderator was already created for this peon's task.");
+    } else {
+      realtimeAppenderator = Appenderators.createRealtime(
+          schema,
+          config,
+          metrics,
+          dataSegmentPusher,
+          objectMapper,
+          indexIO,
+          indexMerger,
+          conglomerate,
+          segmentAnnouncer,
+          emitter,
+          queryExecutorService,
+          cache,
+          cacheConfig,
+          cachePopulatorStats
+      );
+    }
+    return realtimeAppenderator;
+  }
+
+  @Override
+  public Appenderator createOfflineAppenderatorForTask(
+      String taskId,
+      DataSchema schema,
+      AppenderatorConfig config,
+      FireDepartmentMetrics metrics,
+      DataSegmentPusher dataSegmentPusher,
+      ObjectMapper objectMapper,
+      IndexIO indexIO,
+      IndexMerger indexMerger
+  )
+  {
+    // CompactionTask does run multiple sub-IndexTasks, so we allow multiple batch appenderators
+    if (realtimeAppenderator != null) {
+      throw new ISE("A realtime appenderator was already created for this peon's task.");
+    } else {
+      batchAppenderator = Appenderators.createOffline(
+          schema,
+          config,
+          metrics,
+          dataSegmentPusher,
+          objectMapper,
+          indexIO,
+          indexMerger
+      );
+      return batchAppenderator;
+    }
+  }
+
+  @Override
+  public void removeAppenderatorForTask(String taskId)
+  {
+    // the peon only runs one task, and the process will shutdown later, don't need to do anything
+  }
+
+  @Override
+  public <T> QueryRunner<T> getQueryRunnerForIntervals(
+      Query<T> query,
+      Iterable<Interval> intervals
+  )
+  {
+    if (realtimeAppenderator == null) {
+      throw new ISE("Was asked for a query runner but realtimeAppenderator was null!");
+    } else {
+      return realtimeAppenderator.getQueryRunnerForIntervals(query, intervals);
+    }
+  }
+
+  @Override
+  public <T> QueryRunner<T> getQueryRunnerForSegments(
+      Query<T> query,
+      Iterable<SegmentDescriptor> specs
+  )
+  {
+    if (realtimeAppenderator == null) {
+      throw new ISE("Was asked for a query runner but realtimeAppenderator was null!");
+    } else {
+      return realtimeAppenderator.getQueryRunnerForSegments(query, specs);
+    }
+  }
+
+  @Override
+  public boolean shouldTaskMakeNodeAnnouncements()
+  {
+    return true;
+  }
+}
diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/SinkQuerySegmentWalker.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/SinkQuerySegmentWalker.java
index 7d08c0f..69cef4c 100644
--- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/SinkQuerySegmentWalker.java
+++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/SinkQuerySegmentWalker.java
@@ -74,6 +74,7 @@ public class SinkQuerySegmentWalker implements QuerySegmentWalker
   private static final String CONTEXT_SKIP_INCREMENTAL_SEGMENT = "skipIncrementalSegment";
 
   private final String dataSource;
+
   private final VersionedIntervalTimeline<String, Sink> sinkTimeline;
   private final ObjectMapper objectMapper;
   private final ServiceEmitter emitter;
@@ -312,6 +313,11 @@ public class SinkQuerySegmentWalker implements QuerySegmentWalker
     );
   }
 
+  public VersionedIntervalTimeline<String, Sink> getSinkTimeline()
+  {
+    return sinkTimeline;
+  }
+
   public static String makeHydrantCacheIdentifier(FireHydrant input)
   {
     return input.getSegmentId() + "_" + input.getCount();
diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/UnifiedIndexerAppenderatorsManager.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/UnifiedIndexerAppenderatorsManager.java
new file mode 100644
index 0000000..4aa3593
--- /dev/null
+++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/UnifiedIndexerAppenderatorsManager.java
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.realtime.appenderator;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Preconditions;
+import com.google.inject.Inject;
+import org.apache.druid.client.cache.Cache;
+import org.apache.druid.client.cache.CacheConfig;
+import org.apache.druid.client.cache.CachePopulatorStats;
+import org.apache.druid.guice.annotations.Processing;
+import org.apache.druid.java.util.common.IAE;
+import org.apache.druid.java.util.emitter.service.ServiceEmitter;
+import org.apache.druid.query.Query;
+import org.apache.druid.query.QueryRunner;
+import org.apache.druid.query.QueryRunnerFactoryConglomerate;
+import org.apache.druid.query.SegmentDescriptor;
+import org.apache.druid.segment.IndexIO;
+import org.apache.druid.segment.IndexMerger;
+import org.apache.druid.segment.indexing.DataSchema;
+import org.apache.druid.segment.loading.DataSegmentPusher;
+import org.apache.druid.segment.realtime.FireDepartmentMetrics;
+import org.apache.druid.segment.realtime.plumber.Sink;
+import org.apache.druid.server.coordination.DataSegmentAnnouncer;
+import org.apache.druid.timeline.VersionedIntervalTimeline;
+import org.joda.time.Interval;
+
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+
+/**
+ * Manages Appenderators for the Indexer task execution service, which runs all tasks in a single process.
+ *
+ * This class keeps two maps:
+ * - A per-datasource SinkQuerySegmentWalker (with an associated per-datasource timeline)
+ * - A map that associates a taskId with the Appenderator created for that task
+ *
+ * Appenderators created by this class will use the shared per-datasource SinkQuerySegmentWalkers.
+ *
+ * The per-datasource SinkQuerySegmentWalkers share a common queryExecutorService.
+ */
+public class UnifiedIndexerAppenderatorsManager implements AppenderatorsManager
+{
+  private final ConcurrentHashMap<String, SinkQuerySegmentWalker> datasourceSegmentWalkers = new ConcurrentHashMap<>();
+
+  private final ExecutorService queryExecutorService;
+  private final Cache cache;
+  private final CacheConfig cacheConfig;
+  private final CachePopulatorStats cachePopulatorStats;
+
+  @Inject
+  public UnifiedIndexerAppenderatorsManager(
+      @Processing ExecutorService queryExecutorService,
+      Cache cache,
+      CacheConfig cacheConfig,
+      CachePopulatorStats cachePopulatorStats
+  )
+  {
+    this.queryExecutorService = queryExecutorService;
+    this.cache = cache;
+    this.cacheConfig = cacheConfig;
+    this.cachePopulatorStats = cachePopulatorStats;
+  }
+
+  @Override
+  public Appenderator createRealtimeAppenderatorForTask(
+      String taskId,
+      DataSchema schema,
+      AppenderatorConfig config,
+      FireDepartmentMetrics metrics,
+      DataSegmentPusher dataSegmentPusher,
+      ObjectMapper objectMapper,
+      IndexIO indexIO,
+      IndexMerger indexMerger,
+      QueryRunnerFactoryConglomerate conglomerate,
+      DataSegmentAnnouncer segmentAnnouncer,
+      ServiceEmitter emitter,
+      ExecutorService queryExecutorService,
+      Cache cache,
+      CacheConfig cacheConfig,
+      CachePopulatorStats cachePopulatorStats
+  )
+  {
+    SinkQuerySegmentWalker segmentWalker = datasourceSegmentWalkers.computeIfAbsent(
+        schema.getDataSource(),
+        (datasource) -> {
+          VersionedIntervalTimeline<String, Sink> sinkTimeline = new VersionedIntervalTimeline<>(
+              String.CASE_INSENSITIVE_ORDER
+          );
+          SinkQuerySegmentWalker datasourceSegmentWalker = new SinkQuerySegmentWalker(
+              schema.getDataSource(),
+              sinkTimeline,
+              objectMapper,
+              emitter,
+              conglomerate,
+              this.queryExecutorService,
+              Preconditions.checkNotNull(this.cache, "cache"),
+              this.cacheConfig,
+              this.cachePopulatorStats
+          );
+          return datasourceSegmentWalker;
+        }
+    );
+
+    Appenderator appenderator = new AppenderatorImpl(
+        schema,
+        config,
+        metrics,
+        dataSegmentPusher,
+        objectMapper,
+        segmentAnnouncer,
+        segmentWalker,
+        indexIO,
+        indexMerger,
+        cache
+    );
+
+    return appenderator;
+  }
+
+  @Override
+  public Appenderator createOfflineAppenderatorForTask(
+      String taskId,
+      DataSchema schema,
+      AppenderatorConfig config,
+      FireDepartmentMetrics metrics,
+      DataSegmentPusher dataSegmentPusher,
+      ObjectMapper objectMapper,
+      IndexIO indexIO,
+      IndexMerger indexMerger
+  )
+  {
+    Appenderator appenderator = Appenderators.createOffline(
+        schema,
+        config,
+        metrics,
+        dataSegmentPusher,
+        objectMapper,
+        indexIO,
+        indexMerger
+    );
+    return appenderator;
+  }
+
+  @Override
+  public void removeAppenderatorForTask(String taskId)
+  {
+    // nothing to remove presently
+  }
+
+  @Override
+  public <T> QueryRunner<T> getQueryRunnerForIntervals(
+      Query<T> query,
+      Iterable<Interval> intervals
+  )
+  {
+    SinkQuerySegmentWalker segmentWalker = datasourceSegmentWalkers.get(query.getDataSource().toString());
+    if (segmentWalker == null) {
+      throw new IAE("Could not find segment walker for datasource [%s]", query.getDataSource().toString());
+    }
+    return segmentWalker.getQueryRunnerForIntervals(query, intervals);
+  }
+
+  @Override
+  public <T> QueryRunner<T> getQueryRunnerForSegments(
+      Query<T> query,
+      Iterable<SegmentDescriptor> specs
+  )
+  {
+    SinkQuerySegmentWalker segmentWalker = datasourceSegmentWalkers.get(query.getDataSource().toString());
+    if (segmentWalker == null) {
+      throw new IAE("Could not find segment walker for datasource [%s]", query.getDataSource().toString());
+    }
+    return segmentWalker.getQueryRunnerForSegments(query, specs);
+  }
+
+  @Override
+  public boolean shouldTaskMakeNodeAnnouncements()
+  {
+    return false;
+  }
+}
diff --git a/server/src/main/java/org/apache/druid/segment/realtime/firehose/ChatHandlerResource.java b/server/src/main/java/org/apache/druid/segment/realtime/firehose/ChatHandlerResource.java
index 98f508f..60a7315 100644
--- a/server/src/main/java/org/apache/druid/segment/realtime/firehose/ChatHandlerResource.java
+++ b/server/src/main/java/org/apache/druid/segment/realtime/firehose/ChatHandlerResource.java
@@ -32,7 +32,7 @@ import javax.ws.rs.core.Context;
 import javax.ws.rs.core.HttpHeaders;
 import java.util.List;
 
-@Path("/druid/worker/v1")
+@Path("/druid/worker/v1/chat")
 public class ChatHandlerResource
 {
   public static final String TASK_ID_HEADER = "X-Druid-Task-Id";
@@ -47,7 +47,7 @@ public class ChatHandlerResource
     this.taskId = taskIdHolder.getTaskId();
   }
 
-  @Path("/chat/{id}")
+  @Path("/{id}")
   public Object doTaskChat(@PathParam("id") String handlerId, @Context HttpHeaders headers)
   {
     if (taskId != null) {
diff --git a/server/src/main/java/org/apache/druid/server/http/ClusterResource.java b/server/src/main/java/org/apache/druid/server/http/ClusterResource.java
index 0bc8713..926b90f 100644
--- a/server/src/main/java/org/apache/druid/server/http/ClusterResource.java
+++ b/server/src/main/java/org/apache/druid/server/http/ClusterResource.java
@@ -74,6 +74,11 @@ public class ClusterResource
       entityBuilder.put(NodeType.MIDDLE_MANAGER, mmNodes);
     }
 
+    Collection<Object> indexerNodes = getNodes(NodeType.INDEXER, full);
+    if (!indexerNodes.isEmpty()) {
+      entityBuilder.put(NodeType.INDEXER, indexerNodes);
+    }
+
     Collection<Object> routerNodes = getNodes(NodeType.ROUTER, full);
     if (!routerNodes.isEmpty()) {
       entityBuilder.put(NodeType.ROUTER, routerNodes);
diff --git a/server/src/main/java/org/apache/druid/server/initialization/ServerConfig.java b/server/src/main/java/org/apache/druid/server/initialization/ServerConfig.java
index cc86003..fcd2ccf 100644
--- a/server/src/main/java/org/apache/druid/server/initialization/ServerConfig.java
+++ b/server/src/main/java/org/apache/druid/server/initialization/ServerConfig.java
@@ -34,12 +34,50 @@ import java.util.zip.Deflater;
  */
 public class ServerConfig
 {
-
   public static final int DEFAULT_GZIP_INFLATE_BUFFER_SIZE = 4096;
 
+  /**
+   * The ServerConfig is normally created using {@link org.apache.druid.guice.JsonConfigProvider} binding.
+   *
+   * This constructor is provided for callers that need to create a ServerConfig object with specific field values.
+   */
+  public ServerConfig(
+      int numThreads,
+      int queueSize,
+      boolean enableRequestLimit,
+      @NotNull Period maxIdleTime,
+      long defaultQueryTimeout,
+      long maxScatterGatherBytes,
+      long maxQueryTimeout,
+      int maxRequestHeaderSize,
+      @NotNull Period gracefulShutdownTimeout,
+      @NotNull Period unannouncePropagationDelay,
+      int inflateBufferSize,
+      int compressionLevel
+  )
+  {
+    this.numThreads = numThreads;
+    this.queueSize = queueSize;
+    this.enableRequestLimit = enableRequestLimit;
+    this.maxIdleTime = maxIdleTime;
+    this.defaultQueryTimeout = defaultQueryTimeout;
+    this.maxScatterGatherBytes = maxScatterGatherBytes;
+    this.maxQueryTimeout = maxQueryTimeout;
+    this.maxRequestHeaderSize = maxRequestHeaderSize;
+    this.gracefulShutdownTimeout = gracefulShutdownTimeout;
+    this.unannouncePropagationDelay = unannouncePropagationDelay;
+    this.inflateBufferSize = inflateBufferSize;
+    this.compressionLevel = compressionLevel;
+  }
+
+  public ServerConfig()
+  {
+
+  }
+
   @JsonProperty
   @Min(1)
-  private int numThreads = Math.max(10, (JvmUtils.getRuntimeInfo().getAvailableProcessors() * 17) / 16 + 2) + 30;
+  private int numThreads = getDefaultNumThreads();
 
   @JsonProperty
   @Min(1)
@@ -207,4 +245,9 @@ public class ServerConfig
            ", compressionLevel=" + compressionLevel +
            '}';
   }
+
+  public static int getDefaultNumThreads()
+  {
+    return Math.max(10, (JvmUtils.getRuntimeInfo().getAvailableProcessors() * 17) / 16 + 2) + 30;
+  }
 }
diff --git a/server/src/main/java/org/apache/druid/server/initialization/jetty/ChatHandlerServerModule.java b/server/src/main/java/org/apache/druid/server/initialization/jetty/ChatHandlerServerModule.java
index 6eafec8..b1ff033 100644
--- a/server/src/main/java/org/apache/druid/server/initialization/jetty/ChatHandlerServerModule.java
+++ b/server/src/main/java/org/apache/druid/server/initialization/jetty/ChatHandlerServerModule.java
@@ -62,7 +62,11 @@ public class ChatHandlerServerModule implements Module
 
     if (properties.containsKey(MAX_CHAT_REQUESTS_PROPERTY)) {
       final int maxRequests = Integer.parseInt(properties.getProperty(MAX_CHAT_REQUESTS_PROPERTY));
-      JettyBindings.addQosFilter(binder, "/druid/worker/v1/chat/*", maxRequests);
+      JettyBindings.addQosFilter(
+          binder,
+          "/druid/worker/v1/chat/*",
+          maxRequests
+      );
     }
 
     Multibinder.newSetBinder(binder, ServletFilterHolder.class).addBinding().to(TaskIdResponseHeaderFilterHolder.class);
diff --git a/server/src/main/java/org/apache/druid/server/initialization/jetty/ChatHandlerServerModule.java b/server/src/main/java/org/apache/druid/server/initialization/jetty/CliIndexerServerModule.java
similarity index 59%
copy from server/src/main/java/org/apache/druid/server/initialization/jetty/ChatHandlerServerModule.java
copy to server/src/main/java/org/apache/druid/server/initialization/jetty/CliIndexerServerModule.java
index 6eafec8..a3a456b 100644
--- a/server/src/main/java/org/apache/druid/server/initialization/jetty/ChatHandlerServerModule.java
+++ b/server/src/main/java/org/apache/druid/server/initialization/jetty/CliIndexerServerModule.java
@@ -31,6 +31,7 @@ import org.apache.druid.guice.LifecycleModule;
 import org.apache.druid.guice.annotations.RemoteChatHandler;
 import org.apache.druid.guice.annotations.Self;
 import org.apache.druid.java.util.common.lifecycle.Lifecycle;
+import org.apache.druid.query.lookup.LookupModule;
 import org.apache.druid.segment.realtime.firehose.ChatHandlerResource;
 import org.apache.druid.server.DruidNode;
 import org.apache.druid.server.initialization.ServerConfig;
@@ -44,12 +45,12 @@ import java.util.Properties;
 
 /**
  */
-public class ChatHandlerServerModule implements Module
+public class CliIndexerServerModule implements Module
 {
-  private static final String MAX_CHAT_REQUESTS_PROPERTY = "druid.indexer.server.maxChatRequests";
+  private static final String SERVER_HTTP_NUM_THREADS_PROPERTY = "druid.server.http.numThreads";
   private final Properties properties;
 
-  public ChatHandlerServerModule(Properties properties)
+  public CliIndexerServerModule(Properties properties)
   {
     this.properties = properties;
   }
@@ -60,11 +61,40 @@ public class ChatHandlerServerModule implements Module
     Jerseys.addResource(binder, ChatHandlerResource.class);
     LifecycleModule.register(binder, ChatHandlerResource.class);
 
-    if (properties.containsKey(MAX_CHAT_REQUESTS_PROPERTY)) {
-      final int maxRequests = Integer.parseInt(properties.getProperty(MAX_CHAT_REQUESTS_PROPERTY));
-      JettyBindings.addQosFilter(binder, "/druid/worker/v1/chat/*", maxRequests);
+    // Use an equal number of threads for chat handler and non-chat handler requests.
+    int serverHttpNumThreads;
+    if (properties.getProperty(SERVER_HTTP_NUM_THREADS_PROPERTY) == null) {
+      serverHttpNumThreads = ServerConfig.getDefaultNumThreads();
+    } else {
+      serverHttpNumThreads = Integer.parseInt(properties.getProperty(SERVER_HTTP_NUM_THREADS_PROPERTY));
     }
 
+    JettyBindings.addQosFilter(
+        binder,
+        "/druid/worker/v1/chat/*",
+        serverHttpNumThreads
+    );
+
+    String[] notChatPaths = new String[]{
+        "/druid/v2/*", // QueryResource
+        "/status/*", // StatusResource
+        "/druid-internal/*", // SegmentListerResource, TaskManagementResource
+        "/druid/worker/v1/enable", // WorkerResource
+        "/druid/worker/v1/disable", // WorkerResource
+        "/druid/worker/v1/enabled", // WorkerResource
+        "/druid/worker/v1/tasks", // WorkerResource
+        "/druid/worker/v1/task/*", // WorkerResource
+        "/druid/v1/lookups/*", // LookupIntrospectionResource
+        "/druid-ext/*" // basic-security
+    };
+    JettyBindings.addQosFilter(
+        binder,
+        notChatPaths,
+        serverHttpNumThreads
+    );
+
+    // Be aware that lookups have a 2 maxRequest QoS filter as well.
+
     Multibinder.newSetBinder(binder, ServletFilterHolder.class).addBinding().to(TaskIdResponseHeaderFilterHolder.class);
 
     /**
@@ -100,10 +130,34 @@ public class ChatHandlerServerModule implements Module
         injector,
         lifecycle,
         node,
-        config,
+        makeAdjustedServerConfig(config),
         TLSServerConfig,
         injector.getExistingBinding(Key.get(SslContextFactory.class)),
         injector.getInstance(TLSCertificateChecker.class)
     );
   }
+
+  /**
+   * Adjusts the ServerConfig such that we double the number of configured HTTP threads,
+   * with one half allocated using QoS to chat handler requests, and the other half for other requests.
+   *
+   * 2 dedicated threads are added for lookup listening, which also has a QoS filter applied.
+   */
+  public ServerConfig makeAdjustedServerConfig(ServerConfig oldConfig)
+  {
+    return new ServerConfig(
+        (oldConfig.getNumThreads() * 2) + LookupModule.LOOKUP_LISTENER_QOS_MAX_REQUESTS,
+        oldConfig.getQueueSize(),
+        oldConfig.isEnableRequestLimit(),
+        oldConfig.getMaxIdleTime(),
+        oldConfig.getDefaultQueryTimeout(),
+        oldConfig.getMaxScatterGatherBytes(),
+        oldConfig.getMaxQueryTimeout(),
+        oldConfig.getMaxRequestHeaderSize(),
+        oldConfig.getGracefulShutdownTimeout(),
+        oldConfig.getUnannouncePropagationDelay(),
+        oldConfig.getInflateBufferSize(),
+        oldConfig.getCompressionLevel()
+    );
+  }
 }
diff --git a/server/src/main/java/org/apache/druid/server/initialization/jetty/JettyBindings.java b/server/src/main/java/org/apache/druid/server/initialization/jetty/JettyBindings.java
index 3bcbafb..9942acf 100644
--- a/server/src/main/java/org/apache/druid/server/initialization/jetty/JettyBindings.java
+++ b/server/src/main/java/org/apache/druid/server/initialization/jetty/JettyBindings.java
@@ -40,7 +40,7 @@ public class JettyBindings
     // No instantiation.
   }
 
-  public static void addQosFilter(Binder binder, String path, int maxRequests)
+  public static void addQosFilter(Binder binder, String paths, int maxRequests)
   {
     if (maxRequests <= 0) {
       return;
@@ -48,7 +48,18 @@ public class JettyBindings
 
     Multibinder.newSetBinder(binder, ServletFilterHolder.class)
                .addBinding()
-               .toInstance(new QosFilterHolder(path, maxRequests));
+               .toInstance(new QosFilterHolder(new String[]{paths}, maxRequests));
+  }
+
+  public static void addQosFilter(Binder binder, String[] paths, int maxRequests)
+  {
+    if (maxRequests <= 0) {
+      return;
+    }
+
+    Multibinder.newSetBinder(binder, ServletFilterHolder.class)
+               .addBinding()
+               .toInstance(new QosFilterHolder(paths, maxRequests));
   }
 
   public static void addHandler(Binder binder, Class<? extends Handler> handlerClass)
@@ -60,12 +71,12 @@ public class JettyBindings
 
   private static class QosFilterHolder implements ServletFilterHolder
   {
-    private final String path;
+    private final String[] paths;
     private final int maxRequests;
 
-    public QosFilterHolder(String path, int maxRequests)
+    public QosFilterHolder(String[] paths, int maxRequests)
     {
-      this.path = path;
+      this.paths = paths;
       this.maxRequests = maxRequests;
     }
 
@@ -90,7 +101,13 @@ public class JettyBindings
     @Override
     public String getPath()
     {
-      return path;
+      return null;
+    }
+
+    @Override
+    public String[] getPaths()
+    {
+      return paths;
     }
 
     @Override
diff --git a/server/src/main/java/org/apache/druid/server/initialization/jetty/JettyServerInitUtils.java b/server/src/main/java/org/apache/druid/server/initialization/jetty/JettyServerInitUtils.java
index 0b02343..ee1b5a6 100644
--- a/server/src/main/java/org/apache/druid/server/initialization/jetty/JettyServerInitUtils.java
+++ b/server/src/main/java/org/apache/druid/server/initialization/jetty/JettyServerInitUtils.java
@@ -27,9 +27,11 @@ import org.eclipse.jetty.server.Handler;
 import org.eclipse.jetty.server.handler.RequestLogHandler;
 import org.eclipse.jetty.server.handler.gzip.GzipHandler;
 import org.eclipse.jetty.servlet.FilterHolder;
+import org.eclipse.jetty.servlet.FilterMapping;
 import org.eclipse.jetty.servlet.ServletContextHandler;
 
 import javax.ws.rs.HttpMethod;
+import java.util.Arrays;
 import java.util.Set;
 
 public class JettyServerInitUtils
@@ -63,14 +65,23 @@ public class JettyServerInitUtils
       } else if (servletFilterHolder.getFilterClass() != null) {
         holder = new FilterHolder(servletFilterHolder.getFilterClass());
       } else {
-        throw new ISE("Filter[%s] for path[%s] didn't have a Filter!?", servletFilterHolder, servletFilterHolder.getPath());
+        throw new ISE(
+            "Filter[%s] for paths[%s] didn't have a Filter!?",
+            servletFilterHolder,
+            Arrays.toString(servletFilterHolder.getPaths())
+        );
       }
 
       if (servletFilterHolder.getInitParameters() != null) {
         holder.setInitParameters(servletFilterHolder.getInitParameters());
       }
 
-      handler.addFilter(holder, servletFilterHolder.getPath(), servletFilterHolder.getDispatcherType());
+      FilterMapping filterMapping = new FilterMapping();
+      filterMapping.setFilterName(holder.getName());
+      filterMapping.setPathSpecs(servletFilterHolder.getPaths());
+      filterMapping.setDispatcherTypes(servletFilterHolder.getDispatcherType());
+
+      handler.getServletHandler().addFilter(holder, filterMapping);
     }
   }
 
diff --git a/server/src/main/java/org/apache/druid/server/initialization/jetty/ServletFilterHolder.java b/server/src/main/java/org/apache/druid/server/initialization/jetty/ServletFilterHolder.java
index 373c28b..a3a43a1 100644
--- a/server/src/main/java/org/apache/druid/server/initialization/jetty/ServletFilterHolder.java
+++ b/server/src/main/java/org/apache/druid/server/initialization/jetty/ServletFilterHolder.java
@@ -38,7 +38,6 @@ import java.util.Map;
 @ExtensionPoint
 public interface ServletFilterHolder
 {
-
   /**
    * Get the Filter object that should be added to the servlet.
    *
@@ -68,13 +67,26 @@ public interface ServletFilterHolder
   Map<String, String> getInitParameters();
 
   /**
+   * This method is deprecated, please implement {@link #getPaths()}.
+   *
    * The path that this Filter should apply to
    *
    * @return the path that this Filter should apply to
    */
+  @Deprecated
   String getPath();
 
   /**
+   * The paths that this Filter should apply to
+   *
+   * @return the paths that this Filter should apply to
+   */
+  default String[] getPaths()
+  {
+    return new String[]{getPath()};
+  }
+
+  /**
    * The dispatcher type that this Filter should apply to
    *
    * @return the enumeration of DispatcherTypes that this Filter should apply to
diff --git a/server/src/test/java/org/apache/druid/initialization/ServerConfigSerdeTest.java b/server/src/test/java/org/apache/druid/initialization/ServerConfigSerdeTest.java
new file mode 100644
index 0000000..a67899c
--- /dev/null
+++ b/server/src/test/java/org/apache/druid/initialization/ServerConfigSerdeTest.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.initialization;
+
+import org.apache.druid.jackson.DefaultObjectMapper;
+import org.apache.druid.server.initialization.ServerConfig;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class ServerConfigSerdeTest
+{
+  private static final DefaultObjectMapper objectMapper = new DefaultObjectMapper();
+
+  @Test
+  public void testSerde() throws Exception
+  {
+    ServerConfig defaultConfig = new ServerConfig();
+    String defaultConfigJson = objectMapper.writeValueAsString(defaultConfig);
+    ServerConfig defaultConfig2 = objectMapper.readValue(defaultConfigJson, ServerConfig.class);
+    Assert.assertEquals(defaultConfig, defaultConfig2);
+
+    ServerConfig modifiedConfig = new ServerConfig(
+        999,
+        888,
+        defaultConfig.isEnableRequestLimit(),
+        defaultConfig.getMaxIdleTime(),
+        defaultConfig.getDefaultQueryTimeout(),
+        defaultConfig.getMaxScatterGatherBytes(),
+        defaultConfig.getMaxQueryTimeout(),
+        defaultConfig.getMaxRequestHeaderSize(),
+        defaultConfig.getGracefulShutdownTimeout(),
+        defaultConfig.getUnannouncePropagationDelay(),
+        defaultConfig.getInflateBufferSize(),
+        defaultConfig.getCompressionLevel()
+    );
+    String modifiedConfigJson = objectMapper.writeValueAsString(modifiedConfig);
+    ServerConfig modifiedConfig2 = objectMapper.readValue(modifiedConfigJson, ServerConfig.class);
+    Assert.assertEquals(modifiedConfig, modifiedConfig2);
+    Assert.assertEquals(999, modifiedConfig2.getNumThreads());
+    Assert.assertEquals(888, modifiedConfig2.getQueueSize());
+  }
+}
diff --git a/server/src/test/java/org/apache/druid/segment/loading/LocalDataSegmentPusherTest.java b/server/src/test/java/org/apache/druid/segment/loading/LocalDataSegmentPusherTest.java
index c49a548..c61e963 100644
--- a/server/src/test/java/org/apache/druid/segment/loading/LocalDataSegmentPusherTest.java
+++ b/server/src/test/java/org/apache/druid/segment/loading/LocalDataSegmentPusherTest.java
@@ -165,6 +165,8 @@ public class LocalDataSegmentPusherTest
   {
     config.storageDirectory = new File("/druid");
 
+    // If this test fails because the path is returned as "file:/druid/", this can happen
+    // when a /druid directory exists on the local filesystem.
     Assert.assertEquals(
         "file:/druid",
         new LocalDataSegmentPusher(config).getPathForHadoop()
diff --git a/services/src/main/java/org/apache/druid/cli/CliMiddleManager.java b/services/src/main/java/org/apache/druid/cli/CliIndexer.java
similarity index 50%
copy from services/src/main/java/org/apache/druid/cli/CliMiddleManager.java
copy to services/src/main/java/org/apache/druid/cli/CliIndexer.java
index 68d374c..45756c6 100644
--- a/services/src/main/java/org/apache/druid/cli/CliMiddleManager.java
+++ b/services/src/main/java/org/apache/druid/cli/CliIndexer.java
@@ -21,18 +21,15 @@ package org.apache.druid.cli;
 
 import com.google.common.collect.ImmutableList;
 import com.google.inject.Binder;
-import com.google.inject.Key;
+import com.google.inject.Inject;
 import com.google.inject.Module;
 import com.google.inject.Provides;
-import com.google.inject.TypeLiteral;
-import com.google.inject.multibindings.MapBinder;
 import com.google.inject.name.Names;
-import com.google.inject.util.Providers;
 import io.airlift.airline.Command;
-import org.apache.druid.client.indexing.HttpIndexingServiceClient;
-import org.apache.druid.client.indexing.IndexingServiceClient;
+import org.apache.druid.discovery.LookupNodeService;
 import org.apache.druid.discovery.NodeType;
 import org.apache.druid.discovery.WorkerNodeService;
+import org.apache.druid.guice.DruidProcessingModule;
 import org.apache.druid.guice.IndexingServiceFirehoseModule;
 import org.apache.druid.guice.IndexingServiceModuleHelper;
 import org.apache.druid.guice.IndexingServiceTaskLogsModule;
@@ -40,44 +37,50 @@ import org.apache.druid.guice.Jerseys;
 import org.apache.druid.guice.JsonConfigProvider;
 import org.apache.druid.guice.LazySingleton;
 import org.apache.druid.guice.LifecycleModule;
-import org.apache.druid.guice.ManageLifecycle;
-import org.apache.druid.guice.PolyBind;
+import org.apache.druid.guice.NodeTypeConfig;
+import org.apache.druid.guice.QueryRunnerFactoryModule;
+import org.apache.druid.guice.QueryableModule;
+import org.apache.druid.guice.QueryablePeonModule;
+import org.apache.druid.guice.annotations.RemoteChatHandler;
 import org.apache.druid.guice.annotations.Self;
-import org.apache.druid.indexing.common.config.TaskConfig;
-import org.apache.druid.indexing.common.stats.DropwizardRowIngestionMetersFactory;
-import org.apache.druid.indexing.common.stats.RowIngestionMetersFactory;
-import org.apache.druid.indexing.common.task.IndexTaskClientFactory;
-import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexTaskClient;
-import org.apache.druid.indexing.overlord.ForkingTaskRunner;
+import org.apache.druid.indexing.common.MultipleFileTaskReportFileWriter;
+import org.apache.druid.indexing.common.TaskReportFileWriter;
 import org.apache.druid.indexing.overlord.TaskRunner;
+import org.apache.druid.indexing.overlord.ThreadingTaskRunner;
 import org.apache.druid.indexing.worker.Worker;
-import org.apache.druid.indexing.worker.WorkerCuratorCoordinator;
-import org.apache.druid.indexing.worker.WorkerTaskMonitor;
 import org.apache.druid.indexing.worker.config.WorkerConfig;
 import org.apache.druid.indexing.worker.http.ShuffleResource;
-import org.apache.druid.indexing.worker.http.TaskManagementResource;
-import org.apache.druid.indexing.worker.http.WorkerResource;
 import org.apache.druid.java.util.common.logger.Logger;
-import org.apache.druid.query.lookup.LookupSerdeModule;
-import org.apache.druid.segment.realtime.firehose.ChatHandlerProvider;
+import org.apache.druid.query.QuerySegmentWalker;
+import org.apache.druid.query.lookup.LookupModule;
+import org.apache.druid.segment.realtime.CliIndexerDataSegmentServerAnnouncerLifecycleHandler;
+import org.apache.druid.segment.realtime.appenderator.AppenderatorsManager;
+import org.apache.druid.segment.realtime.appenderator.UnifiedIndexerAppenderatorsManager;
 import org.apache.druid.server.DruidNode;
+import org.apache.druid.server.coordination.ServerType;
+import org.apache.druid.server.http.SegmentListerResource;
+import org.apache.druid.server.initialization.jetty.CliIndexerServerModule;
 import org.apache.druid.server.initialization.jetty.JettyServerInitializer;
 import org.eclipse.jetty.server.Server;
 
 import java.util.List;
+import java.util.Properties;
 
 /**
  *
  */
 @Command(
-    name = "middleManager",
-    description = "Runs a Middle Manager, this is a \"task\" node used as part of the remote indexing service, see https://druid.apache.org/docs/latest/design/middlemanager.html for a description"
+    name = "indexer",
+    description = "Runs an Indexer. The Indexer is a task execution process that runs each task in a separate thread."
 )
-public class CliMiddleManager extends ServerRunnable
+public class CliIndexer extends ServerRunnable
 {
-  private static final Logger log = new Logger(CliMiddleManager.class);
+  private static final Logger log = new Logger(CliIndexer.class);
 
-  public CliMiddleManager()
+  @Inject
+  private Properties properties;
+
+  public CliIndexer()
   {
     super(log);
   }
@@ -86,59 +89,63 @@ public class CliMiddleManager extends ServerRunnable
   protected List<? extends Module> getModules()
   {
     return ImmutableList.of(
+        new DruidProcessingModule(),
+        new QueryableModule(),
+        new QueryRunnerFactoryModule(),
         new Module()
         {
           @Override
           public void configure(Binder binder)
           {
-            binder.bindConstant().annotatedWith(Names.named("serviceName")).to("druid/middlemanager");
+            binder.bindConstant().annotatedWith(Names.named("serviceName")).to("druid/indexer");
             binder.bindConstant().annotatedWith(Names.named("servicePort")).to(8091);
-            binder.bindConstant().annotatedWith(Names.named("tlsServicePort")).to(8291);
+            binder.bindConstant().annotatedWith(Names.named("tlsServicePort")).to(8091);
 
             IndexingServiceModuleHelper.configureTaskRunnerConfigs(binder);
 
-            JsonConfigProvider.bind(binder, "druid.indexer.task", TaskConfig.class);
             JsonConfigProvider.bind(binder, "druid.worker", WorkerConfig.class);
 
-            binder.bind(TaskRunner.class).to(ForkingTaskRunner.class);
-            binder.bind(ForkingTaskRunner.class).in(LazySingleton.class);
+            CliPeon.bindTaskConfigAndClients(binder);
 
-            binder.bind(IndexingServiceClient.class).to(HttpIndexingServiceClient.class).in(LazySingleton.class);
-            binder.bind(new TypeLiteral<IndexTaskClientFactory<ParallelIndexTaskClient>>() {})
-                  .toProvider(Providers.of(null));
-            binder.bind(ChatHandlerProvider.class).toProvider(Providers.of(null));
-            PolyBind.createChoice(
-                binder,
-                "druid.indexer.task.rowIngestionMeters.type",
-                Key.get(RowIngestionMetersFactory.class),
-                Key.get(DropwizardRowIngestionMetersFactory.class)
-            );
-            final MapBinder<String, RowIngestionMetersFactory> rowIngestionMetersHandlerProviderBinder =
-                PolyBind.optionBinder(binder, Key.get(RowIngestionMetersFactory.class));
-            rowIngestionMetersHandlerProviderBinder
-                .addBinding("dropwizard")
-                .to(DropwizardRowIngestionMetersFactory.class)
-                .in(LazySingleton.class);
-            binder.bind(DropwizardRowIngestionMetersFactory.class).in(LazySingleton.class);
+            binder.bind(TaskReportFileWriter.class).toInstance(new MultipleFileTaskReportFileWriter());
+
+            binder.bind(TaskRunner.class).to(ThreadingTaskRunner.class);
+            binder.bind(QuerySegmentWalker.class).to(ThreadingTaskRunner.class);
+            binder.bind(ThreadingTaskRunner.class).in(LazySingleton.class);
+
+            CliPeon.bindRowIngestionMeters(binder);
+
+            CliPeon.bindChatHandler(binder);
 
+            CliPeon.bindPeonDataSegmentHandlers(binder);
 
-            binder.bind(WorkerTaskMonitor.class).in(ManageLifecycle.class);
-            binder.bind(WorkerCuratorCoordinator.class).in(ManageLifecycle.class);
+            CliPeon.bindRealtimeCache(binder);
 
-            LifecycleModule.register(binder, WorkerTaskMonitor.class);
-            binder.bind(JettyServerInitializer.class)
-                  .to(MiddleManagerJettyServerInitializer.class)
+            CliPeon.bindCoordinatorHandoffNotiferAndClient(binder);
+
+            CliMiddleManager.bindWorkerManagementClasses(binder);
+
+            binder.bind(AppenderatorsManager.class)
+                  .to(UnifiedIndexerAppenderatorsManager.class)
                   .in(LazySingleton.class);
-            Jerseys.addResource(binder, WorkerResource.class);
-            Jerseys.addResource(binder, TaskManagementResource.class);
+
+            binder.bind(NodeTypeConfig.class).toInstance(new NodeTypeConfig(ServerType.INDEXER_EXECUTOR));
+
+            binder.bind(JettyServerInitializer.class).to(QueryJettyServerInitializer.class);
+            Jerseys.addResource(binder, SegmentListerResource.class);
+
+            LifecycleModule.register(binder, CliIndexerDataSegmentServerAnnouncerLifecycleHandler.class);
+
             Jerseys.addResource(binder, ShuffleResource.class);
 
-            LifecycleModule.register(binder, Server.class);
+            LifecycleModule.register(binder, Server.class, RemoteChatHandler.class);
 
             bindAnnouncer(
                 binder,
-                DiscoverySideEffectsProvider.builder(NodeType.MIDDLE_MANAGER)
-                                            .serviceClasses(ImmutableList.of(WorkerNodeService.class))
+                DiscoverySideEffectsProvider.builder(NodeType.INDEXER)
+                                            .serviceClasses(
+                                                ImmutableList.of(LookupNodeService.class, WorkerNodeService.class)
+                                            )
                                             .build()
             );
           }
@@ -169,7 +176,9 @@ public class CliMiddleManager extends ServerRunnable
         },
         new IndexingServiceFirehoseModule(),
         new IndexingServiceTaskLogsModule(),
-        new LookupSerdeModule()
+        new QueryablePeonModule(),
+        new CliIndexerServerModule(properties),
+        new LookupModule()
     );
   }
 }
diff --git a/services/src/main/java/org/apache/druid/cli/CliMiddleManager.java b/services/src/main/java/org/apache/druid/cli/CliMiddleManager.java
index 68d374c..571d7d6 100644
--- a/services/src/main/java/org/apache/druid/cli/CliMiddleManager.java
+++ b/services/src/main/java/org/apache/druid/cli/CliMiddleManager.java
@@ -59,6 +59,8 @@ import org.apache.druid.indexing.worker.http.TaskManagementResource;
 import org.apache.druid.indexing.worker.http.WorkerResource;
 import org.apache.druid.java.util.common.logger.Logger;
 import org.apache.druid.query.lookup.LookupSerdeModule;
+import org.apache.druid.segment.realtime.appenderator.AppenderatorsManager;
+import org.apache.druid.segment.realtime.appenderator.DummyForInjectionAppenderatorsManager;
 import org.apache.druid.segment.realtime.firehose.ChatHandlerProvider;
 import org.apache.druid.server.DruidNode;
 import org.apache.druid.server.initialization.jetty.JettyServerInitializer;
@@ -121,16 +123,16 @@ public class CliMiddleManager extends ServerRunnable
                 .in(LazySingleton.class);
             binder.bind(DropwizardRowIngestionMetersFactory.class).in(LazySingleton.class);
 
+            bindWorkerManagementClasses(binder);
 
-            binder.bind(WorkerTaskMonitor.class).in(ManageLifecycle.class);
-            binder.bind(WorkerCuratorCoordinator.class).in(ManageLifecycle.class);
-
-            LifecycleModule.register(binder, WorkerTaskMonitor.class);
             binder.bind(JettyServerInitializer.class)
                   .to(MiddleManagerJettyServerInitializer.class)
                   .in(LazySingleton.class);
-            Jerseys.addResource(binder, WorkerResource.class);
-            Jerseys.addResource(binder, TaskManagementResource.class);
+
+            binder.bind(AppenderatorsManager.class)
+                  .to(DummyForInjectionAppenderatorsManager.class)
+                  .in(LazySingleton.class);
+
             Jerseys.addResource(binder, ShuffleResource.class);
 
             LifecycleModule.register(binder, Server.class);
@@ -172,4 +174,13 @@ public class CliMiddleManager extends ServerRunnable
         new LookupSerdeModule()
     );
   }
+
+  public static void bindWorkerManagementClasses(Binder binder)
+  {
+    binder.bind(WorkerTaskMonitor.class).in(ManageLifecycle.class);
+    binder.bind(WorkerCuratorCoordinator.class).in(ManageLifecycle.class);
+    LifecycleModule.register(binder, WorkerTaskMonitor.class);
+    Jerseys.addResource(binder, WorkerResource.class);
+    Jerseys.addResource(binder, TaskManagementResource.class);
+  }
 }
diff --git a/services/src/main/java/org/apache/druid/cli/CliOverlord.java b/services/src/main/java/org/apache/druid/cli/CliOverlord.java
index db103b3..d8a6ec9 100644
--- a/services/src/main/java/org/apache/druid/cli/CliOverlord.java
+++ b/services/src/main/java/org/apache/druid/cli/CliOverlord.java
@@ -95,6 +95,8 @@ import org.apache.druid.indexing.overlord.supervisor.SupervisorResource;
 import org.apache.druid.indexing.worker.config.WorkerConfig;
 import org.apache.druid.java.util.common.logger.Logger;
 import org.apache.druid.query.lookup.LookupSerdeModule;
+import org.apache.druid.segment.realtime.appenderator.AppenderatorsManager;
+import org.apache.druid.segment.realtime.appenderator.DummyForInjectionAppenderatorsManager;
 import org.apache.druid.segment.realtime.firehose.ChatHandlerProvider;
 import org.apache.druid.server.audit.AuditManagerProvider;
 import org.apache.druid.server.coordinator.CoordinatorOverlordServiceConfig;
@@ -240,6 +242,11 @@ public class CliOverlord extends ServerRunnable
             Jerseys.addResource(binder, SupervisorResource.class);
             Jerseys.addResource(binder, HttpRemoteTaskRunnerResource.class);
 
+
+            binder.bind(AppenderatorsManager.class)
+                  .to(DummyForInjectionAppenderatorsManager.class)
+                  .in(LazySingleton.class);
+
             if (standalone) {
               LifecycleModule.register(binder, Server.class);
             }
diff --git a/services/src/main/java/org/apache/druid/cli/CliPeon.java b/services/src/main/java/org/apache/druid/cli/CliPeon.java
index bbf11bd..4a6c2d8 100644
--- a/services/src/main/java/org/apache/druid/cli/CliPeon.java
+++ b/services/src/main/java/org/apache/druid/cli/CliPeon.java
@@ -58,6 +58,7 @@ import org.apache.druid.guice.annotations.Json;
 import org.apache.druid.guice.annotations.Smile;
 import org.apache.druid.indexing.common.RetryPolicyConfig;
 import org.apache.druid.indexing.common.RetryPolicyFactory;
+import org.apache.druid.indexing.common.SingleFileTaskReportFileWriter;
 import org.apache.druid.indexing.common.TaskReportFileWriter;
 import org.apache.druid.indexing.common.TaskToolboxFactory;
 import org.apache.druid.indexing.common.actions.LocalTaskActionClientFactory;
@@ -91,6 +92,8 @@ import org.apache.druid.segment.loading.DataSegmentMover;
 import org.apache.druid.segment.loading.OmniDataSegmentArchiver;
 import org.apache.druid.segment.loading.OmniDataSegmentKiller;
 import org.apache.druid.segment.loading.OmniDataSegmentMover;
+import org.apache.druid.segment.realtime.appenderator.AppenderatorsManager;
+import org.apache.druid.segment.realtime.appenderator.PeonAppenderatorsManager;
 import org.apache.druid.segment.realtime.firehose.ChatHandlerProvider;
 import org.apache.druid.segment.realtime.firehose.NoopChatHandlerProvider;
 import org.apache.druid.segment.realtime.firehose.ServiceAnnouncingChatHandlerProvider;
@@ -166,62 +169,13 @@ public class CliPeon extends GuiceRunnable
             binder.bindConstant().annotatedWith(Names.named("servicePort")).to(0);
             binder.bindConstant().annotatedWith(Names.named("tlsServicePort")).to(-1);
 
-            PolyBind.createChoice(
-                binder,
-                "druid.indexer.task.rowIngestionMeters.type",
-                Key.get(RowIngestionMetersFactory.class),
-                Key.get(DropwizardRowIngestionMetersFactory.class)
-            );
-            final MapBinder<String, RowIngestionMetersFactory> rowIngestionMetersHandlerProviderBinder =
-                PolyBind.optionBinder(binder, Key.get(RowIngestionMetersFactory.class));
-            rowIngestionMetersHandlerProviderBinder
-                .addBinding("dropwizard")
-                .to(DropwizardRowIngestionMetersFactory.class)
-                .in(LazySingleton.class);
-            binder.bind(DropwizardRowIngestionMetersFactory.class).in(LazySingleton.class);
-
-            PolyBind.createChoice(
-                binder,
-                "druid.indexer.task.chathandler.type",
-                Key.get(ChatHandlerProvider.class),
-                Key.get(ServiceAnnouncingChatHandlerProvider.class)
-            );
-            final MapBinder<String, ChatHandlerProvider> handlerProviderBinder =
-                PolyBind.optionBinder(binder, Key.get(ChatHandlerProvider.class));
-            handlerProviderBinder
-                .addBinding("announce")
-                .to(ServiceAnnouncingChatHandlerProvider.class)
-                .in(LazySingleton.class);
-            handlerProviderBinder
-                .addBinding("noop")
-                .to(NoopChatHandlerProvider.class)
-                .in(LazySingleton.class);
-            binder.bind(ServiceAnnouncingChatHandlerProvider.class).in(LazySingleton.class);
-
-            binder.bind(NoopChatHandlerProvider.class).in(LazySingleton.class);
-
-            binder.bind(TaskToolboxFactory.class).in(LazySingleton.class);
-
-            JsonConfigProvider.bind(binder, "druid.indexer.task", TaskConfig.class);
-            JsonConfigProvider.bind(binder, "druid.indexer.auditlog", TaskAuditLogConfig.class);
-            JsonConfigProvider.bind(binder, "druid.peon.taskActionClient.retry", RetryPolicyConfig.class);
-
-            configureTaskActionClient(binder);
-            binder.bind(IndexingServiceClient.class).to(HttpIndexingServiceClient.class).in(LazySingleton.class);
-
-            binder.bind(new TypeLiteral<IndexTaskClientFactory<ParallelIndexTaskClient>>(){})
-                  .to(ParallelIndexTaskClientFactory.class)
-                  .in(LazySingleton.class);
+            bindRowIngestionMeters(binder);
 
-            binder.bind(RetryPolicyFactory.class).in(LazySingleton.class);
+            bindChatHandler(binder);
 
-            // Build it to make it bind even if nothing binds to it.
-            Binders.dataSegmentKillerBinder(binder);
-            binder.bind(DataSegmentKiller.class).to(OmniDataSegmentKiller.class).in(LazySingleton.class);
-            Binders.dataSegmentMoverBinder(binder);
-            binder.bind(DataSegmentMover.class).to(OmniDataSegmentMover.class).in(LazySingleton.class);
-            Binders.dataSegmentArchiverBinder(binder);
-            binder.bind(DataSegmentArchiver.class).to(OmniDataSegmentArchiver.class).in(LazySingleton.class);
+            bindTaskConfigAndClients(binder);
+
+            bindPeonDataSegmentHandlers(binder);
 
             binder.bind(ExecutorLifecycle.class).in(ManageLifecycle.class);
             LifecycleModule.register(binder, ExecutorLifecycle.class);
@@ -232,7 +186,7 @@ public class CliPeon extends GuiceRunnable
             );
 
             binder.bind(TaskReportFileWriter.class).toInstance(
-                new TaskReportFileWriter(
+                new SingleFileTaskReportFileWriter(
                     new File(taskReportPath)
                 )
             );
@@ -241,19 +195,13 @@ public class CliPeon extends GuiceRunnable
             binder.bind(QuerySegmentWalker.class).to(SingleTaskBackgroundRunner.class);
             binder.bind(SingleTaskBackgroundRunner.class).in(ManageLifecycle.class);
 
-            JsonConfigProvider.bind(binder, "druid.realtime.cache", CacheConfig.class);
-            binder.install(new CacheModule());
+            bindRealtimeCache(binder);
 
-            JsonConfigProvider.bind(
-                binder,
-                "druid.segment.handoff",
-                CoordinatorBasedSegmentHandoffNotifierConfig.class
-            );
-            binder.bind(SegmentHandoffNotifierFactory.class)
-                  .to(CoordinatorBasedSegmentHandoffNotifierFactory.class)
-                  .in(LazySingleton.class);
+            bindCoordinatorHandoffNotiferAndClient(binder);
 
-            binder.bind(CoordinatorClient.class).in(LazySingleton.class);
+            binder.bind(AppenderatorsManager.class)
+                  .to(PeonAppenderatorsManager.class)
+                  .in(LazySingleton.class);
 
             binder.bind(JettyServerInitializer.class).to(QueryJettyServerInitializer.class);
             Jerseys.addResource(binder, SegmentListerResource.class);
@@ -261,33 +209,6 @@ public class CliPeon extends GuiceRunnable
             LifecycleModule.register(binder, Server.class);
           }
 
-          private void configureTaskActionClient(Binder binder)
-          {
-            PolyBind.createChoice(
-                binder,
-                "druid.peon.mode",
-                Key.get(TaskActionClientFactory.class),
-                Key.get(RemoteTaskActionClientFactory.class)
-            );
-            final MapBinder<String, TaskActionClientFactory> taskActionBinder =
-                PolyBind.optionBinder(binder, Key.get(TaskActionClientFactory.class));
-            taskActionBinder
-                .addBinding("local")
-                .to(LocalTaskActionClientFactory.class)
-                .in(LazySingleton.class);
-            // all of these bindings are so that we can run the peon in local mode
-            JsonConfigProvider.bind(binder, "druid.indexer.storage", TaskStorageConfig.class);
-            binder.bind(TaskStorage.class).to(HeapMemoryTaskStorage.class).in(LazySingleton.class);
-            binder.bind(TaskActionToolbox.class).in(LazySingleton.class);
-            binder.bind(IndexerMetadataStorageCoordinator.class)
-                  .to(IndexerSQLMetadataStorageCoordinator.class)
-                  .in(LazySingleton.class);
-            taskActionBinder
-                .addBinding("remote")
-                .to(RemoteTaskActionClientFactory.class)
-                .in(LazySingleton.class);
-          }
-
           @Provides
           @LazySingleton
           public Task readTask(@Json ObjectMapper mapper, ExecutorLifecycleConfig config)
@@ -383,4 +304,120 @@ public class CliPeon extends GuiceRunnable
       throw new RuntimeException(e);
     }
   }
+
+  public static void bindRowIngestionMeters(Binder binder)
+  {
+    PolyBind.createChoice(
+        binder,
+        "druid.indexer.task.rowIngestionMeters.type",
+        Key.get(RowIngestionMetersFactory.class),
+        Key.get(DropwizardRowIngestionMetersFactory.class)
+    );
+    final MapBinder<String, RowIngestionMetersFactory> rowIngestionMetersHandlerProviderBinder =
+        PolyBind.optionBinder(binder, Key.get(RowIngestionMetersFactory.class));
+    rowIngestionMetersHandlerProviderBinder
+        .addBinding("dropwizard")
+        .to(DropwizardRowIngestionMetersFactory.class)
+        .in(LazySingleton.class);
+    binder.bind(DropwizardRowIngestionMetersFactory.class).in(LazySingleton.class);
+  }
+
+  public static void bindChatHandler(Binder binder)
+  {
+    PolyBind.createChoice(
+        binder,
+        "druid.indexer.task.chathandler.type",
+        Key.get(ChatHandlerProvider.class),
+        Key.get(ServiceAnnouncingChatHandlerProvider.class)
+    );
+    final MapBinder<String, ChatHandlerProvider> handlerProviderBinder =
+        PolyBind.optionBinder(binder, Key.get(ChatHandlerProvider.class));
+    handlerProviderBinder
+        .addBinding("announce")
+        .to(ServiceAnnouncingChatHandlerProvider.class)
+        .in(LazySingleton.class);
+    handlerProviderBinder
+        .addBinding("noop")
+        .to(NoopChatHandlerProvider.class)
+        .in(LazySingleton.class);
+    binder.bind(ServiceAnnouncingChatHandlerProvider.class).in(LazySingleton.class);
+    binder.bind(NoopChatHandlerProvider.class).in(LazySingleton.class);
+  }
+
+  public static void bindPeonDataSegmentHandlers(Binder binder)
+  {
+    // Build it to make it bind even if nothing binds to it.
+    Binders.dataSegmentKillerBinder(binder);
+    binder.bind(DataSegmentKiller.class).to(OmniDataSegmentKiller.class).in(LazySingleton.class);
+    Binders.dataSegmentMoverBinder(binder);
+    binder.bind(DataSegmentMover.class).to(OmniDataSegmentMover.class).in(LazySingleton.class);
+    Binders.dataSegmentArchiverBinder(binder);
+    binder.bind(DataSegmentArchiver.class).to(OmniDataSegmentArchiver.class).in(LazySingleton.class);
+  }
+
+  public static void configureTaskActionClient(Binder binder)
+  {
+    PolyBind.createChoice(
+        binder,
+        "druid.peon.mode",
+        Key.get(TaskActionClientFactory.class),
+        Key.get(RemoteTaskActionClientFactory.class)
+    );
+    final MapBinder<String, TaskActionClientFactory> taskActionBinder =
+        PolyBind.optionBinder(binder, Key.get(TaskActionClientFactory.class));
+    taskActionBinder
+        .addBinding("local")
+        .to(LocalTaskActionClientFactory.class)
+        .in(LazySingleton.class);
+    // all of these bindings are so that we can run the peon in local mode
+    JsonConfigProvider.bind(binder, "druid.indexer.storage", TaskStorageConfig.class);
+    binder.bind(TaskStorage.class).to(HeapMemoryTaskStorage.class).in(LazySingleton.class);
+    binder.bind(TaskActionToolbox.class).in(LazySingleton.class);
+    binder.bind(IndexerMetadataStorageCoordinator.class)
+          .to(IndexerSQLMetadataStorageCoordinator.class)
+          .in(LazySingleton.class);
+    taskActionBinder
+        .addBinding("remote")
+        .to(RemoteTaskActionClientFactory.class)
+        .in(LazySingleton.class);
+  }
+
+  public static void bindTaskConfigAndClients(Binder binder)
+  {
+    binder.bind(TaskToolboxFactory.class).in(LazySingleton.class);
+
+    JsonConfigProvider.bind(binder, "druid.indexer.task", TaskConfig.class);
+    JsonConfigProvider.bind(binder, "druid.indexer.auditlog", TaskAuditLogConfig.class);
+    JsonConfigProvider.bind(binder, "druid.peon.taskActionClient.retry", RetryPolicyConfig.class);
+
+    configureTaskActionClient(binder);
+    binder.bind(IndexingServiceClient.class).to(HttpIndexingServiceClient.class).in(LazySingleton.class);
+
+    binder.bind(new TypeLiteral<IndexTaskClientFactory<ParallelIndexTaskClient>>(){})
+          .to(ParallelIndexTaskClientFactory.class)
+          .in(LazySingleton.class);
+
+    binder.bind(RetryPolicyFactory.class).in(LazySingleton.class);
+  }
+
+  public static void bindRealtimeCache(Binder binder)
+  {
+    JsonConfigProvider.bind(binder, "druid.realtime.cache", CacheConfig.class);
+    binder.install(new CacheModule());
+  }
+
+  public static void bindCoordinatorHandoffNotiferAndClient(Binder binder)
+  {
+    JsonConfigProvider.bind(
+        binder,
+        "druid.segment.handoff",
+        CoordinatorBasedSegmentHandoffNotifierConfig.class
+    );
+    binder.bind(SegmentHandoffNotifierFactory.class)
+          .to(CoordinatorBasedSegmentHandoffNotifierFactory.class)
+          .in(LazySingleton.class);
+
+    binder.bind(CoordinatorClient.class).in(LazySingleton.class);
+  }
+
 }
diff --git a/services/src/main/java/org/apache/druid/cli/Main.java b/services/src/main/java/org/apache/druid/cli/Main.java
index 0cc4427..f966a69 100644
--- a/services/src/main/java/org/apache/druid/cli/Main.java
+++ b/services/src/main/java/org/apache/druid/cli/Main.java
@@ -60,6 +60,7 @@ public class Main
         CliHistorical.class,
         CliBroker.class,
         CliOverlord.class,
+        CliIndexer.class,
         CliMiddleManager.class,
         CliRouter.class
     );
diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java
index 7bfb10e..4f0b519 100644
--- a/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java
+++ b/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java
@@ -421,6 +421,13 @@ public class SystemSchemaTest extends CalciteTestBase
           DataNodeService.DISCOVERY_SERVICE_KEY, new DataNodeService("tier", 1000, ServerType.HISTORICAL, 0))
   );
 
+  private final DiscoveryDruidNode indexer = new DiscoveryDruidNode(
+      new DruidNode("s8", "indexerHost", false, 8091, null, true, false),
+      NodeType.INDEXER,
+      ImmutableMap.of(
+          DataNodeService.DISCOVERY_SERVICE_KEY, new DataNodeService("tier", 1000, ServerType.INDEXER_EXECUTOR, 0))
+  );
+
   private final ImmutableDruidServer druidServer1 = new ImmutableDruidServer(
       new DruidServerMetadata("server1", "localhost:0000", null, 5L, ServerType.REALTIME, DruidServer.DEFAULT_TIER, 0),
       1L,
@@ -686,6 +693,8 @@ public class SystemSchemaTest extends CalciteTestBase
     final DruidNodeDiscovery historicalNodeDiscovery = EasyMock.createMock(DruidNodeDiscovery.class);
     final DruidNodeDiscovery mmNodeDiscovery = EasyMock.createMock(DruidNodeDiscovery.class);
     final DruidNodeDiscovery peonNodeDiscovery = EasyMock.createMock(DruidNodeDiscovery.class);
+    final DruidNodeDiscovery indexerNodeDiscovery = EasyMock.createMock(DruidNodeDiscovery.class);
+
 
     EasyMock.expect(druidNodeDiscoveryProvider.getForNodeType(NodeType.COORDINATOR))
             .andReturn(coordinatorNodeDiscovery)
@@ -701,6 +710,9 @@ public class SystemSchemaTest extends CalciteTestBase
     EasyMock.expect(druidNodeDiscoveryProvider.getForNodeType(NodeType.MIDDLE_MANAGER))
             .andReturn(mmNodeDiscovery)
             .once();
+    EasyMock.expect(druidNodeDiscoveryProvider.getForNodeType(NodeType.INDEXER))
+            .andReturn(indexerNodeDiscovery)
+            .once();
     EasyMock.expect(druidNodeDiscoveryProvider.getForNodeType(NodeType.PEON)).andReturn(peonNodeDiscovery).once();
 
     EasyMock.expect(coordinatorNodeDiscovery.getAllNodes()).andReturn(ImmutableList.of(coordinator)).once();
@@ -710,6 +722,7 @@ public class SystemSchemaTest extends CalciteTestBase
     EasyMock.expect(historicalNodeDiscovery.getAllNodes()).andReturn(ImmutableList.of(historical1, historical2)).once();
     EasyMock.expect(mmNodeDiscovery.getAllNodes()).andReturn(ImmutableList.of(middleManager)).once();
     EasyMock.expect(peonNodeDiscovery.getAllNodes()).andReturn(ImmutableList.of(peon1, peon2)).once();
+    EasyMock.expect(indexerNodeDiscovery.getAllNodes()).andReturn(ImmutableList.of(indexer)).once();
 
     final DruidServer server1 = EasyMock.createMock(DruidServer.class);
     EasyMock.expect(serverInventoryView.getInventoryValue(historical1.toDruidServer().getName())).andReturn(server1).once();
@@ -726,7 +739,8 @@ public class SystemSchemaTest extends CalciteTestBase
         routerNodeDiscovery,
         historicalNodeDiscovery,
         mmNodeDiscovery,
-        peonNodeDiscovery
+        peonNodeDiscovery,
+        indexerNodeDiscovery
     );
 
     DataContext dataContext = new DataContext()
@@ -757,7 +771,7 @@ public class SystemSchemaTest extends CalciteTestBase
     };
     final List<Object[]> rows = serversTable.scan(dataContext).toList();
     rows.sort((Object[] row1, Object[] row2) -> ((Comparable) row1[0]).compareTo(row2[0]));
-    Assert.assertEquals(10, rows.size());
+    Assert.assertEquals(11, rows.size());
     verifyServerRow(
         rows.get(0),
         "brokerHost:8082",
@@ -782,6 +796,17 @@ public class SystemSchemaTest extends CalciteTestBase
     );
     verifyServerRow(
         rows.get(2),
+        "indexerHost:8091",
+        "indexerHost",
+        8091,
+        -1,
+        "indexer",
+        null,
+        0,
+        0
+    );
+    verifyServerRow(
+        rows.get(3),
         "localhost:8080",
         "localhost",
         8080,
@@ -792,7 +817,7 @@ public class SystemSchemaTest extends CalciteTestBase
         0
     );
     verifyServerRow(
-        rows.get(3),
+        rows.get(4),
         "localhost:8081",
         "localhost",
         8081,
@@ -803,7 +828,7 @@ public class SystemSchemaTest extends CalciteTestBase
         0
     );
     verifyServerRow(
-        rows.get(4),
+        rows.get(5),
         "localhost:8082",
         "localhost",
         8082,
@@ -814,7 +839,7 @@ public class SystemSchemaTest extends CalciteTestBase
         0
     );
     verifyServerRow(
-        rows.get(5),
+        rows.get(6),
         "localhost:8083",
         "localhost",
         8083,
@@ -825,7 +850,7 @@ public class SystemSchemaTest extends CalciteTestBase
         1000
     );
     verifyServerRow(
-        rows.get(6),
+        rows.get(7),
         "localhost:8090",
         "localhost",
         8090,
@@ -836,7 +861,7 @@ public class SystemSchemaTest extends CalciteTestBase
         0
     );
     verifyServerRow(
-        rows.get(7),
+        rows.get(8),
         "localhost:8888",
         "localhost",
         8888,
@@ -847,7 +872,7 @@ public class SystemSchemaTest extends CalciteTestBase
         0
     );
     verifyServerRow(
-        rows.get(8),
+        rows.get(9),
         "mmHost:8091",
         "mmHost",
         8091,
@@ -858,7 +883,7 @@ public class SystemSchemaTest extends CalciteTestBase
         0
     );
     verifyServerRow(
-        rows.get(9),
+        rows.get(10),
         "peonHost:8080",
         "peonHost",
         8080,


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org