You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by ji...@apache.org on 2019/02/24 01:03:04 UTC

[incubator-druid] branch master updated: ParallelIndexSubTask: support ingestSegment in delegating factories (#7089)

This is an automated email from the ASF dual-hosted git repository.

jihoonson pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 1c2753a  ParallelIndexSubTask: support ingestSegment in delegating factories (#7089)
1c2753a is described below

commit 1c2753ab9033ff0c785d6e80f5f7c07dc34c3889
Author: David Glasser <gl...@apollographql.com>
AuthorDate: Sat Feb 23 17:02:56 2019 -0800

    ParallelIndexSubTask: support ingestSegment in delegating factories (#7089)
    
    IndexTask had special-cased code to properly send a TaskToolbox to a
    IngestSegmentFirehoseFactory that's nested inside a CombiningFirehoseFactory,
    but ParallelIndexSubTask didn't.
    
    This change refactors IngestSegmentFirehoseFactory so that it doesn't need a
    TaskToolbox; it instead gets a CoordinatorClient and a SegmentLoaderFactory
    directly injected into it.
    
    This also refactors SegmentLoaderFactory so it doesn't depend on
    an injectable SegmentLoaderConfig, since its only method always
    replaces the preconfigured SegmentLoaderConfig anyway.
    This makes it possible to use SegmentLoaderFactory without setting
    druid.segmentCaches.locations to some dummy value.
    
    Another goal of this PR is to make it possible for IngestSegmentFirehoseFactory
    to list data segments outside of connect() --- specifically, to make it a
    FiniteFirehoseFactory which can query the coordinator in order to calculate its
    splits. See #7048.
    
    This also adds missing datasource name URL-encoding to an API used by
    CoordinatorBasedSegmentHandoffNotifier.
---
 .../druid/indexing/kafka/KafkaIndexTaskTest.java   |   5 +-
 .../indexing/kinesis/KinesisIndexTaskTest.java     |   5 +-
 .../indexing/common/SegmentLoaderFactory.java      |  20 +-
 .../druid/indexing/common/task/CompactionTask.java | 113 +++++++--
 .../druid/indexing/common/task/IndexTask.java      |  23 --
 .../task/batch/parallel/ParallelIndexSubTask.java  |   6 -
 .../firehose/IngestSegmentFirehoseFactory.java     |  81 +++++--
 .../druid/indexing/common/TaskToolboxTest.java     |  11 +-
 .../AppenderatorDriverRealtimeIndexTaskTest.java   |   5 +-
 .../common/task/CompactionTaskRunTest.java         |  37 ++-
 .../indexing/common/task/CompactionTaskTest.java   | 143 ++++++++++--
 .../common/task/RealtimeIndexTaskTest.java         |   5 +-
 .../firehose/IngestSegmentFirehoseFactoryTest.java | 257 +++++----------------
 .../IngestSegmentFirehoseFactoryTimelineTest.java  |  94 ++------
 .../overlord/SingleTaskBackgroundRunnerTest.java   |   3 +-
 .../druid/indexing/overlord/TaskLifecycleTest.java |   5 +-
 .../indexing/worker/WorkerTaskManagerTest.java     |   3 +-
 .../indexing/worker/WorkerTaskMonitorTest.java     |  21 +-
 .../client/coordinator/CoordinatorClient.java      |  49 +++-
 .../loading/SegmentLoaderLocalCacheManager.java    |   8 +-
 .../main/java/org/apache/druid/cli/CliPeon.java    |   8 -
 21 files changed, 458 insertions(+), 444 deletions(-)

diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java
index 1a55128..892349f 100644
--- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java
+++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java
@@ -139,7 +139,6 @@ import org.apache.druid.segment.loading.DataSegmentPusher;
 import org.apache.druid.segment.loading.LocalDataSegmentPusher;
 import org.apache.druid.segment.loading.LocalDataSegmentPusherConfig;
 import org.apache.druid.segment.loading.SegmentLoaderConfig;
-import org.apache.druid.segment.loading.SegmentLoaderLocalCacheManager;
 import org.apache.druid.segment.loading.StorageLocationConfig;
 import org.apache.druid.segment.realtime.appenderator.AppenderatorImpl;
 import org.apache.druid.segment.realtime.plumber.SegmentHandoffNotifier;
@@ -2559,9 +2558,7 @@ public class KafkaIndexTaskTest
         this::makeTimeseriesAndScanConglomerate,
         Execs.directExecutor(), // queryExecutorService
         EasyMock.createMock(MonitorScheduler.class),
-        new SegmentLoaderFactory(
-            new SegmentLoaderLocalCacheManager(null, segmentLoaderConfig, testUtils.getTestObjectMapper())
-        ),
+        new SegmentLoaderFactory(null, testUtils.getTestObjectMapper()),
         testUtils.getTestObjectMapper(),
         testUtils.getTestIndexIO(),
         MapCache.create(1024),
diff --git a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java
index f0ef143..5294116 100644
--- a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java
+++ b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java
@@ -135,7 +135,6 @@ import org.apache.druid.segment.loading.DataSegmentPusher;
 import org.apache.druid.segment.loading.LocalDataSegmentPusher;
 import org.apache.druid.segment.loading.LocalDataSegmentPusherConfig;
 import org.apache.druid.segment.loading.SegmentLoaderConfig;
-import org.apache.druid.segment.loading.SegmentLoaderLocalCacheManager;
 import org.apache.druid.segment.loading.StorageLocationConfig;
 import org.apache.druid.segment.realtime.appenderator.AppenderatorImpl;
 import org.apache.druid.segment.realtime.firehose.ChatHandlerProvider;
@@ -2765,9 +2764,7 @@ public class KinesisIndexTaskTest extends EasyMockSupport
         this::makeTimeseriesOnlyConglomerate,
         Execs.directExecutor(), // queryExecutorService
         EasyMock.createMock(MonitorScheduler.class),
-        new SegmentLoaderFactory(
-            new SegmentLoaderLocalCacheManager(null, segmentLoaderConfig, testUtils.getTestObjectMapper())
-        ),
+        new SegmentLoaderFactory(null, testUtils.getTestObjectMapper()),
         testUtils.getTestObjectMapper(),
         testUtils.getTestIndexIO(),
         MapCache.create(1024),
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/SegmentLoaderFactory.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/SegmentLoaderFactory.java
index b15d3df..83fa9db 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/SegmentLoaderFactory.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/SegmentLoaderFactory.java
@@ -19,7 +19,10 @@
 
 package org.apache.druid.indexing.common;
 
+import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.inject.Inject;
+import org.apache.druid.guice.annotations.Json;
+import org.apache.druid.segment.IndexIO;
 import org.apache.druid.segment.loading.SegmentLoader;
 import org.apache.druid.segment.loading.SegmentLoaderConfig;
 import org.apache.druid.segment.loading.SegmentLoaderLocalCacheManager;
@@ -29,23 +32,30 @@ import java.io.File;
 import java.util.Collections;
 
 /**
+ *
  */
 public class SegmentLoaderFactory
 {
-  private final SegmentLoaderLocalCacheManager loader;
+  private final IndexIO indexIO;
+  private final ObjectMapper jsonMapper;
 
   @Inject
   public SegmentLoaderFactory(
-      SegmentLoaderLocalCacheManager loader
+      IndexIO indexIO,
+      @Json ObjectMapper mapper
   )
   {
-    this.loader = loader;
+    this.indexIO = indexIO;
+    this.jsonMapper = mapper;
   }
 
   public SegmentLoader manufacturate(File storageDir)
   {
-    return loader.withConfig(
-        new SegmentLoaderConfig().withLocations(Collections.singletonList(new StorageLocationConfig().setPath(storageDir)))
+    return new SegmentLoaderLocalCacheManager(
+        indexIO,
+        new SegmentLoaderConfig().withLocations(
+            Collections.singletonList(new StorageLocationConfig().setPath(storageDir))),
+        jsonMapper
     );
   }
 }
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java
index 740d78d..c897de3 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java
@@ -29,6 +29,7 @@ import com.google.common.base.Preconditions;
 import com.google.common.collect.BiMap;
 import com.google.common.collect.HashBiMap;
 import com.google.common.collect.Lists;
+import org.apache.druid.client.coordinator.CoordinatorClient;
 import org.apache.druid.data.input.impl.DimensionSchema;
 import org.apache.druid.data.input.impl.DimensionSchema.MultiValueHandling;
 import org.apache.druid.data.input.impl.DimensionsSpec;
@@ -40,6 +41,8 @@ import org.apache.druid.data.input.impl.NoopInputRowParser;
 import org.apache.druid.data.input.impl.StringDimensionSchema;
 import org.apache.druid.data.input.impl.TimeAndDimsParseSpec;
 import org.apache.druid.indexer.TaskStatus;
+import org.apache.druid.indexing.common.RetryPolicyFactory;
+import org.apache.druid.indexing.common.SegmentLoaderFactory;
 import org.apache.druid.indexing.common.TaskToolbox;
 import org.apache.druid.indexing.common.actions.SegmentListUsedAction;
 import org.apache.druid.indexing.common.actions.TaskActionClient;
@@ -133,6 +136,15 @@ public class CompactionTask extends AbstractTask
   private final RowIngestionMetersFactory rowIngestionMetersFactory;
 
   @JsonIgnore
+  private final CoordinatorClient coordinatorClient;
+
+  @JsonIgnore
+  private final SegmentLoaderFactory segmentLoaderFactory;
+
+  @JsonIgnore
+  private final RetryPolicyFactory retryPolicyFactory;
+
+  @JsonIgnore
   private List<IndexTask> indexTaskSpecs;
 
   @JsonCreator
@@ -153,7 +165,10 @@ public class CompactionTask extends AbstractTask
       @JacksonInject ObjectMapper jsonMapper,
       @JacksonInject AuthorizerMapper authorizerMapper,
       @JacksonInject ChatHandlerProvider chatHandlerProvider,
-      @JacksonInject RowIngestionMetersFactory rowIngestionMetersFactory
+      @JacksonInject RowIngestionMetersFactory rowIngestionMetersFactory,
+      @JacksonInject CoordinatorClient coordinatorClient,
+      @JacksonInject SegmentLoaderFactory segmentLoaderFactory,
+      @JacksonInject RetryPolicyFactory retryPolicyFactory
   )
   {
     super(getOrMakeId(id, TYPE, dataSource), null, taskResource, dataSource, context);
@@ -186,6 +201,9 @@ public class CompactionTask extends AbstractTask
     this.authorizerMapper = authorizerMapper;
     this.chatHandlerProvider = chatHandlerProvider;
     this.rowIngestionMetersFactory = rowIngestionMetersFactory;
+    this.coordinatorClient = coordinatorClient;
+    this.segmentLoaderFactory = segmentLoaderFactory;
+    this.retryPolicyFactory = retryPolicyFactory;
   }
 
   @JsonProperty
@@ -278,20 +296,23 @@ public class CompactionTask extends AbstractTask
           metricsSpec,
           keepSegmentGranularity,
           segmentGranularity,
-          jsonMapper
+          jsonMapper,
+          coordinatorClient,
+          segmentLoaderFactory,
+          retryPolicyFactory
       ).stream()
-      .map(spec -> new IndexTask(
-          getId(),
-          getGroupId(),
-          getTaskResource(),
-          getDataSource(),
-          spec,
-          getContext(),
-          authorizerMapper,
-          chatHandlerProvider,
-          rowIngestionMetersFactory
-      ))
-      .collect(Collectors.toList());
+       .map(spec -> new IndexTask(
+           getId(),
+           getGroupId(),
+           getTaskResource(),
+           getDataSource(),
+           spec,
+           getContext(),
+           authorizerMapper,
+           chatHandlerProvider,
+           rowIngestionMetersFactory
+       ))
+       .collect(Collectors.toList());
     }
 
     if (indexTaskSpecs.isEmpty()) {
@@ -338,7 +359,10 @@ public class CompactionTask extends AbstractTask
       @Nullable final AggregatorFactory[] metricsSpec,
       @Nullable final Boolean keepSegmentGranularity,
       @Nullable final Granularity segmentGranularity,
-      final ObjectMapper jsonMapper
+      final ObjectMapper jsonMapper,
+      final CoordinatorClient coordinatorClient,
+      final SegmentLoaderFactory segmentLoaderFactory,
+      final RetryPolicyFactory retryPolicyFactory
   ) throws IOException, SegmentLoadingException
   {
     Pair<Map<DataSegment, File>, List<TimelineObjectHolder<String, DataSegment>>> pair = prepareSegments(
@@ -379,7 +403,14 @@ public class CompactionTask extends AbstractTask
         return Collections.singletonList(
             new IndexIngestionSpec(
                 dataSchema,
-                createIoConfig(toolbox, dataSchema, segmentProvider.interval),
+                createIoConfig(
+                    toolbox,
+                    dataSchema,
+                    segmentProvider.interval,
+                    coordinatorClient,
+                    segmentLoaderFactory,
+                    retryPolicyFactory
+                ),
                 compactionTuningConfig
             )
         );
@@ -411,7 +442,14 @@ public class CompactionTask extends AbstractTask
           specs.add(
               new IndexIngestionSpec(
                   dataSchema,
-                  createIoConfig(toolbox, dataSchema, interval),
+                  createIoConfig(
+                      toolbox,
+                      dataSchema,
+                      interval,
+                      coordinatorClient,
+                      segmentLoaderFactory,
+                      retryPolicyFactory
+                  ),
                   compactionTuningConfig
               )
           );
@@ -438,7 +476,14 @@ public class CompactionTask extends AbstractTask
         return Collections.singletonList(
             new IndexIngestionSpec(
                 dataSchema,
-                createIoConfig(toolbox, dataSchema, segmentProvider.interval),
+                createIoConfig(
+                    toolbox,
+                    dataSchema,
+                    segmentProvider.interval,
+                    coordinatorClient,
+                    segmentLoaderFactory,
+                    retryPolicyFactory
+                ),
                 compactionTuningConfig
             )
         );
@@ -446,7 +491,14 @@ public class CompactionTask extends AbstractTask
     }
   }
 
-  private static IndexIOConfig createIoConfig(TaskToolbox toolbox, DataSchema dataSchema, Interval interval)
+  private static IndexIOConfig createIoConfig(
+      TaskToolbox toolbox,
+      DataSchema dataSchema,
+      Interval interval,
+      CoordinatorClient coordinatorClient,
+      SegmentLoaderFactory segmentLoaderFactory,
+      RetryPolicyFactory retryPolicyFactory
+  )
   {
     return new IndexIOConfig(
         new IngestSegmentFirehoseFactory(
@@ -456,7 +508,10 @@ public class CompactionTask extends AbstractTask
             // set dimensions and metrics names to make sure that the generated dataSchema is used for the firehose
             dataSchema.getParser().getParseSpec().getDimensionsSpec().getDimensionNames(),
             Arrays.stream(dataSchema.getAggregators()).map(AggregatorFactory::getName).collect(Collectors.toList()),
-            toolbox.getIndexIO()
+            toolbox.getIndexIO(),
+            coordinatorClient,
+            segmentLoaderFactory,
+            retryPolicyFactory
         ),
         false
     );
@@ -811,7 +866,7 @@ public class CompactionTask extends AbstractTask
      * targetCompactionSizeBytes cannot be used with {@link IndexTuningConfig#maxRowsPerSegment},
      * {@link IndexTuningConfig#maxTotalRows}, or {@link IndexTuningConfig#numShards} together.
      * {@link #hasPartitionConfig} checks one of those configs is set.
-     *
+     * <p>
      * This throws an {@link IllegalArgumentException} if targetCompactionSizeBytes is set and hasPartitionConfig
      * returns true. If targetCompactionSizeBytes is not set, this returns null or
      * {@link DataSourceCompactionConfig#DEFAULT_TARGET_COMPACTION_SIZE_BYTES} according to the result of
@@ -860,6 +915,9 @@ public class CompactionTask extends AbstractTask
     private final AuthorizerMapper authorizerMapper;
     private final ChatHandlerProvider chatHandlerProvider;
     private final RowIngestionMetersFactory rowIngestionMetersFactory;
+    private final CoordinatorClient coordinatorClient;
+    private final SegmentLoaderFactory segmentLoaderFactory;
+    private final RetryPolicyFactory retryPolicyFactory;
 
     @Nullable
     private Interval interval;
@@ -885,7 +943,10 @@ public class CompactionTask extends AbstractTask
         ObjectMapper jsonMapper,
         AuthorizerMapper authorizerMapper,
         ChatHandlerProvider chatHandlerProvider,
-        RowIngestionMetersFactory rowIngestionMetersFactory
+        RowIngestionMetersFactory rowIngestionMetersFactory,
+        CoordinatorClient coordinatorClient,
+        SegmentLoaderFactory segmentLoaderFactory,
+        RetryPolicyFactory retryPolicyFactory
     )
     {
       this.dataSource = dataSource;
@@ -893,6 +954,9 @@ public class CompactionTask extends AbstractTask
       this.authorizerMapper = authorizerMapper;
       this.chatHandlerProvider = chatHandlerProvider;
       this.rowIngestionMetersFactory = rowIngestionMetersFactory;
+      this.coordinatorClient = coordinatorClient;
+      this.segmentLoaderFactory = segmentLoaderFactory;
+      this.retryPolicyFactory = retryPolicyFactory;
     }
 
     public Builder interval(Interval interval)
@@ -968,7 +1032,10 @@ public class CompactionTask extends AbstractTask
           jsonMapper,
           authorizerMapper,
           chatHandlerProvider,
-          rowIngestionMetersFactory
+          rowIngestionMetersFactory,
+          coordinatorClient,
+          segmentLoaderFactory,
+          retryPolicyFactory
       );
     }
   }
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java
index 6d5a0d8..d0e083a 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java
@@ -54,7 +54,6 @@ import org.apache.druid.indexing.common.actions.TaskActionClient;
 import org.apache.druid.indexing.common.stats.RowIngestionMeters;
 import org.apache.druid.indexing.common.stats.RowIngestionMetersFactory;
 import org.apache.druid.indexing.common.stats.TaskRealtimeMetricsMonitor;
-import org.apache.druid.indexing.firehose.IngestSegmentFirehoseFactory;
 import org.apache.druid.java.util.common.ISE;
 import org.apache.druid.java.util.common.Intervals;
 import org.apache.druid.java.util.common.JodaUtils;
@@ -84,7 +83,6 @@ import org.apache.druid.segment.realtime.appenderator.SegmentsAndMetadata;
 import org.apache.druid.segment.realtime.appenderator.TransactionalSegmentPublisher;
 import org.apache.druid.segment.realtime.firehose.ChatHandler;
 import org.apache.druid.segment.realtime.firehose.ChatHandlerProvider;
-import org.apache.druid.segment.realtime.firehose.CombiningFirehoseFactory;
 import org.apache.druid.segment.writeout.SegmentWriteOutMediumFactory;
 import org.apache.druid.server.security.Action;
 import org.apache.druid.server.security.AuthorizerMapper;
@@ -419,8 +417,6 @@ public class IndexTask extends AbstractTask implements ChatHandler
 
       final FirehoseFactory firehoseFactory = ingestionSchema.getIOConfig().getFirehoseFactory();
 
-      setFirehoseFactoryToolbox(firehoseFactory, toolbox);
-
       final File firehoseTempDir = toolbox.getFirehoseTemporaryDir();
       // Firehose temporary directory is automatically removed when this IndexTask completes.
       FileUtils.forceMkdir(firehoseTempDir);
@@ -489,25 +485,6 @@ public class IndexTask extends AbstractTask implements ChatHandler
     }
   }
 
-  // pass toolbox to any IngestSegmentFirehoseFactory
-  private void setFirehoseFactoryToolbox(FirehoseFactory firehoseFactory, TaskToolbox toolbox)
-  {
-    if (firehoseFactory instanceof IngestSegmentFirehoseFactory) {
-      ((IngestSegmentFirehoseFactory) firehoseFactory).setTaskToolbox(toolbox);
-      return;
-    }
-
-    if (firehoseFactory instanceof CombiningFirehoseFactory) {
-      for (FirehoseFactory delegateFactory : ((CombiningFirehoseFactory) firehoseFactory).getDelegateFactoryList()) {
-        if (delegateFactory instanceof IngestSegmentFirehoseFactory) {
-          ((IngestSegmentFirehoseFactory) delegateFactory).setTaskToolbox(toolbox);
-        } else if (delegateFactory instanceof CombiningFirehoseFactory) {
-          setFirehoseFactoryToolbox(delegateFactory, toolbox);
-        }
-      }
-    }
-  }
-
   private Map<String, TaskReport> getTaskCompletionReports()
   {
     return TaskReport.buildTaskReports(
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSubTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSubTask.java
index 8004243..435de05 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSubTask.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSubTask.java
@@ -44,7 +44,6 @@ import org.apache.druid.indexing.common.task.IndexTask;
 import org.apache.druid.indexing.common.task.IndexTaskClientFactory;
 import org.apache.druid.indexing.common.task.TaskResource;
 import org.apache.druid.indexing.common.task.Tasks;
-import org.apache.druid.indexing.firehose.IngestSegmentFirehoseFactory;
 import org.apache.druid.java.util.common.ISE;
 import org.apache.druid.java.util.common.Intervals;
 import org.apache.druid.java.util.common.StringUtils;
@@ -185,11 +184,6 @@ public class ParallelIndexSubTask extends AbstractTask
   {
     final FirehoseFactory firehoseFactory = ingestionSchema.getIOConfig().getFirehoseFactory();
 
-    if (firehoseFactory instanceof IngestSegmentFirehoseFactory) {
-      // pass toolbox to Firehose
-      ((IngestSegmentFirehoseFactory) firehoseFactory).setTaskToolbox(toolbox);
-    }
-
     final File firehoseTempDir = toolbox.getFirehoseTemporaryDir();
     // Firehose temporary directory is automatically removed when this IndexTask completes.
     FileUtils.forceMkdir(firehoseTempDir);
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/firehose/IngestSegmentFirehoseFactory.java b/indexing-service/src/main/java/org/apache/druid/indexing/firehose/IngestSegmentFirehoseFactory.java
index 8087582..bae2946 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/firehose/IngestSegmentFirehoseFactory.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/firehose/IngestSegmentFirehoseFactory.java
@@ -30,16 +30,20 @@ import com.google.common.collect.BiMap;
 import com.google.common.collect.HashBiMap;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import org.apache.druid.client.coordinator.CoordinatorClient;
 import org.apache.druid.data.input.Firehose;
 import org.apache.druid.data.input.FirehoseFactory;
 import org.apache.druid.data.input.impl.InputRowParser;
-import org.apache.druid.indexing.common.TaskToolbox;
-import org.apache.druid.indexing.common.actions.SegmentListUsedAction;
+import org.apache.druid.indexing.common.RetryPolicy;
+import org.apache.druid.indexing.common.RetryPolicyFactory;
+import org.apache.druid.indexing.common.SegmentLoaderFactory;
 import org.apache.druid.java.util.common.parsers.ParseException;
 import org.apache.druid.java.util.emitter.EmittingLogger;
 import org.apache.druid.query.filter.DimFilter;
 import org.apache.druid.segment.IndexIO;
 import org.apache.druid.segment.QueryableIndexStorageAdapter;
+import org.apache.druid.segment.loading.SegmentLoader;
 import org.apache.druid.segment.loading.SegmentLoadingException;
 import org.apache.druid.segment.realtime.firehose.IngestSegmentFirehose;
 import org.apache.druid.segment.realtime.firehose.WindowedStorageAdapter;
@@ -48,14 +52,17 @@ import org.apache.druid.timeline.DataSegment;
 import org.apache.druid.timeline.TimelineObjectHolder;
 import org.apache.druid.timeline.VersionedIntervalTimeline;
 import org.apache.druid.timeline.partition.PartitionChunk;
+import org.joda.time.Duration;
 import org.joda.time.Interval;
 
 import javax.annotation.Nullable;
 import java.io.File;
 import java.io.IOException;
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.ThreadLocalRandom;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 
@@ -68,7 +75,9 @@ public class IngestSegmentFirehoseFactory implements FirehoseFactory<InputRowPar
   private final List<String> dimensions;
   private final List<String> metrics;
   private final IndexIO indexIO;
-  private TaskToolbox taskToolbox;
+  private final CoordinatorClient coordinatorClient;
+  private final SegmentLoaderFactory segmentLoaderFactory;
+  private final RetryPolicyFactory retryPolicyFactory;
 
   @JsonCreator
   public IngestSegmentFirehoseFactory(
@@ -77,7 +86,10 @@ public class IngestSegmentFirehoseFactory implements FirehoseFactory<InputRowPar
       @JsonProperty("filter") DimFilter dimFilter,
       @JsonProperty("dimensions") List<String> dimensions,
       @JsonProperty("metrics") List<String> metrics,
-      @JacksonInject IndexIO indexIO
+      @JacksonInject IndexIO indexIO,
+      @JacksonInject CoordinatorClient coordinatorClient,
+      @JacksonInject SegmentLoaderFactory segmentLoaderFactory,
+      @JacksonInject RetryPolicyFactory retryPolicyFactory
   )
   {
     Preconditions.checkNotNull(dataSource, "dataSource");
@@ -88,6 +100,9 @@ public class IngestSegmentFirehoseFactory implements FirehoseFactory<InputRowPar
     this.dimensions = dimensions;
     this.metrics = metrics;
     this.indexIO = Preconditions.checkNotNull(indexIO, "null IndexIO");
+    this.coordinatorClient = Preconditions.checkNotNull(coordinatorClient, "null CoordinatorClient");
+    this.segmentLoaderFactory = Preconditions.checkNotNull(segmentLoaderFactory, "null SegmentLoaderFactory");
+    this.retryPolicyFactory = Preconditions.checkNotNull(retryPolicyFactory, "null RetryPolicyFactory");
   }
 
   @JsonProperty
@@ -120,23 +135,46 @@ public class IngestSegmentFirehoseFactory implements FirehoseFactory<InputRowPar
     return metrics;
   }
 
-  public void setTaskToolbox(TaskToolbox taskToolbox)
-  {
-    this.taskToolbox = taskToolbox;
-  }
-
   @Override
   public Firehose connect(InputRowParser inputRowParser, File temporaryDirectory) throws ParseException
   {
     log.info("Connecting firehose: dataSource[%s], interval[%s]", dataSource, interval);
 
-    Preconditions.checkNotNull(taskToolbox, "taskToolbox is not set");
-
     try {
-      final List<DataSegment> usedSegments = taskToolbox
-          .getTaskActionClient()
-          .submit(new SegmentListUsedAction(dataSource, interval, null));
-      final Map<DataSegment, File> segmentFileMap = taskToolbox.fetchSegments(usedSegments);
+      // This call used to use the TaskActionClient, so for compatibility we use the same retry configuration
+      // as TaskActionClient.
+      final RetryPolicy retryPolicy = retryPolicyFactory.makeRetryPolicy();
+      List<DataSegment> usedSegments;
+      while (true) {
+        try {
+          usedSegments =
+              coordinatorClient.getDatabaseSegmentDataSourceSegments(dataSource, Collections.singletonList(interval));
+          break;
+        }
+        catch (Throwable e) {
+          log.warn(e, "Exception getting database segments");
+          final Duration delay = retryPolicy.getAndIncrementRetryDelay();
+          if (delay == null) {
+            throw e;
+          } else {
+            final long sleepTime = jitter(delay.getMillis());
+            log.info("Will try again in [%s].", new Duration(sleepTime).toString());
+            try {
+              Thread.sleep(sleepTime);
+            }
+            catch (InterruptedException e2) {
+              throw new RuntimeException(e2);
+            }
+          }
+        }
+      }
+
+      final SegmentLoader segmentLoader = segmentLoaderFactory.manufacturate(temporaryDirectory);
+      Map<DataSegment, File> segmentFileMap = Maps.newLinkedHashMap();
+      for (DataSegment segment : usedSegments) {
+        segmentFileMap.put(segment, segmentLoader.getSegmentFiles(segment));
+      }
+
       final List<TimelineObjectHolder<String, DataSegment>> timeLineSegments = VersionedIntervalTimeline
           .forSegments(usedSegments)
           .lookup(interval);
@@ -201,11 +239,18 @@ public class IngestSegmentFirehoseFactory implements FirehoseFactory<InputRowPar
       final TransformSpec transformSpec = TransformSpec.fromInputRowParser(inputRowParser);
       return new IngestSegmentFirehose(adapters, transformSpec, dims, metricsList, dimFilter);
     }
-    catch (IOException | SegmentLoadingException e) {
+    catch (SegmentLoadingException e) {
       throw Throwables.propagate(e);
     }
   }
 
+  private long jitter(long input)
+  {
+    final double jitter = ThreadLocalRandom.current().nextGaussian() * input / 4.0;
+    long retval = input + (long) jitter;
+    return retval < 0 ? 0 : retval;
+  }
+
   @VisibleForTesting
   static List<String> getUniqueDimensions(
       List<TimelineObjectHolder<String, DataSegment>> timelineSegments,
@@ -260,7 +305,7 @@ public class IngestSegmentFirehoseFactory implements FirehoseFactory<InputRowPar
 
     final BiMap<Integer, String> orderedMetrics = uniqueMetrics.inverse();
     return IntStream.range(0, orderedMetrics.size())
-        .mapToObj(orderedMetrics::get)
-        .collect(Collectors.toList());
+                    .mapToObj(orderedMetrics::get)
+                    .collect(Collectors.toList());
   }
 }
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/TaskToolboxTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/TaskToolboxTest.java
index d9acfd6..0966d1b 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/common/TaskToolboxTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/TaskToolboxTest.java
@@ -76,6 +76,7 @@ public class TaskToolboxTest
   private MonitorScheduler mockMonitorScheduler = EasyMock.createMock(MonitorScheduler.class);
   private ExecutorService mockQueryExecutorService = EasyMock.createMock(ExecutorService.class);
   private ObjectMapper ObjectMapper = new ObjectMapper();
+  private SegmentLoaderFactory mockSegmentLoaderFactory = EasyMock.createMock(SegmentLoaderFactory.class);
   private SegmentLoaderLocalCacheManager mockSegmentLoaderLocalCacheManager = EasyMock.createMock(SegmentLoaderLocalCacheManager.class);
   private Task task = EasyMock.createMock(Task.class);
   private IndexMergerV9 mockIndexMergerV9 = EasyMock.createMock(IndexMergerV9.class);
@@ -107,7 +108,7 @@ public class TaskToolboxTest
         () -> mockQueryRunnerFactoryConglomerate,
         mockQueryExecutorService,
         mockMonitorScheduler,
-        new SegmentLoaderFactory(mockSegmentLoaderLocalCacheManager),
+        mockSegmentLoaderFactory,
         ObjectMapper,
         mockIndexIO,
         mockCache,
@@ -163,12 +164,12 @@ public class TaskToolboxTest
   {
     File expectedFile = temporaryFolder.newFile();
     EasyMock
+        .expect(mockSegmentLoaderFactory.manufacturate(EasyMock.anyObject()))
+        .andReturn(mockSegmentLoaderLocalCacheManager).anyTimes();
+    EasyMock
         .expect(mockSegmentLoaderLocalCacheManager.getSegmentFiles(EasyMock.anyObject()))
         .andReturn(expectedFile).anyTimes();
-    EasyMock
-        .expect(mockSegmentLoaderLocalCacheManager.withConfig(EasyMock.anyObject()))
-        .andReturn(mockSegmentLoaderLocalCacheManager).anyTimes();
-    EasyMock.replay(mockSegmentLoaderLocalCacheManager);
+    EasyMock.replay(mockSegmentLoaderFactory, mockSegmentLoaderLocalCacheManager);
     DataSegment dataSegment = DataSegment.builder().dataSource("source").interval(Intervals.of("2012-01-01/P1D")).version("1").size(1).build();
     List<DataSegment> segments = ImmutableList.of
         (
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTaskTest.java
index b57397a..b809e7a 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTaskTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTaskTest.java
@@ -119,7 +119,6 @@ import org.apache.druid.segment.indexing.DataSchema;
 import org.apache.druid.segment.indexing.RealtimeIOConfig;
 import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec;
 import org.apache.druid.segment.loading.SegmentLoaderConfig;
-import org.apache.druid.segment.loading.SegmentLoaderLocalCacheManager;
 import org.apache.druid.segment.loading.StorageLocationConfig;
 import org.apache.druid.segment.realtime.plumber.SegmentHandoffNotifier;
 import org.apache.druid.segment.realtime.plumber.SegmentHandoffNotifierFactory;
@@ -1607,9 +1606,7 @@ public class AppenderatorDriverRealtimeIndexTaskTest
         () -> conglomerate,
         Execs.directExecutor(), // queryExecutorService
         EasyMock.createMock(MonitorScheduler.class),
-        new SegmentLoaderFactory(
-            new SegmentLoaderLocalCacheManager(null, segmentLoaderConfig, testUtils.getTestObjectMapper())
-        ),
+        new SegmentLoaderFactory(null, testUtils.getTestObjectMapper()),
         testUtils.getTestObjectMapper(),
         testUtils.getTestIndexIO(),
         MapCache.create(1024),
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskRunTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskRunTest.java
index bbf2870..6ad0ec4 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskRunTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskRunTest.java
@@ -23,11 +23,15 @@ import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.jsontype.NamedType;
 import com.google.common.collect.ImmutableList;
 import com.google.common.io.Files;
+import org.apache.druid.client.coordinator.CoordinatorClient;
 import org.apache.druid.data.input.impl.CSVParseSpec;
 import org.apache.druid.data.input.impl.DimensionsSpec;
 import org.apache.druid.data.input.impl.ParseSpec;
 import org.apache.druid.data.input.impl.TimestampSpec;
 import org.apache.druid.indexer.TaskStatus;
+import org.apache.druid.indexing.common.RetryPolicyConfig;
+import org.apache.druid.indexing.common.RetryPolicyFactory;
+import org.apache.druid.indexing.common.SegmentLoaderFactory;
 import org.apache.druid.indexing.common.TaskToolbox;
 import org.apache.druid.indexing.common.TestUtils;
 import org.apache.druid.indexing.common.actions.LocalTaskActionClient;
@@ -52,6 +56,7 @@ import org.apache.druid.segment.loading.StorageLocationConfig;
 import org.apache.druid.server.security.AuthTestUtils;
 import org.apache.druid.timeline.DataSegment;
 import org.apache.druid.timeline.partition.NumberedShardSpec;
+import org.joda.time.Interval;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
@@ -96,12 +101,24 @@ public class CompactionTaskRunTest extends IngestionTestBase
   );
 
   private RowIngestionMetersFactory rowIngestionMetersFactory;
+  private CoordinatorClient coordinatorClient;
+  private SegmentLoaderFactory segmentLoaderFactory;
   private ExecutorService exec;
+  private static RetryPolicyFactory retryPolicyFactory = new RetryPolicyFactory(new RetryPolicyConfig());
 
   public CompactionTaskRunTest()
   {
     TestUtils testUtils = new TestUtils();
     rowIngestionMetersFactory = testUtils.getRowIngestionMetersFactory();
+    coordinatorClient = new CoordinatorClient(null, null)
+    {
+      @Override
+      public List<DataSegment> getDatabaseSegmentDataSourceSegments(String dataSource, List<Interval> intervals)
+      {
+        return getStorageCoordinator().getUsedSegmentsForIntervals(dataSource, intervals);
+      }
+    };
+    segmentLoaderFactory = new SegmentLoaderFactory(getIndexIO(), getObjectMapper());
   }
 
   @Before
@@ -126,7 +143,10 @@ public class CompactionTaskRunTest extends IngestionTestBase
         getObjectMapper(),
         AuthTestUtils.TEST_AUTHORIZER_MAPPER,
         null,
-        rowIngestionMetersFactory
+        rowIngestionMetersFactory,
+        coordinatorClient,
+        segmentLoaderFactory,
+        retryPolicyFactory
     );
 
     final CompactionTask compactionTask = builder
@@ -156,7 +176,10 @@ public class CompactionTaskRunTest extends IngestionTestBase
         getObjectMapper(),
         AuthTestUtils.TEST_AUTHORIZER_MAPPER,
         null,
-        rowIngestionMetersFactory
+        rowIngestionMetersFactory,
+        coordinatorClient,
+        segmentLoaderFactory,
+        retryPolicyFactory
     );
 
     final CompactionTask compactionTask1 = builder
@@ -200,7 +223,10 @@ public class CompactionTaskRunTest extends IngestionTestBase
         getObjectMapper(),
         AuthTestUtils.TEST_AUTHORIZER_MAPPER,
         null,
-        rowIngestionMetersFactory
+        rowIngestionMetersFactory,
+        coordinatorClient,
+        segmentLoaderFactory,
+        retryPolicyFactory
     );
 
     final CompactionTask compactionTask1 = builder
@@ -248,7 +274,10 @@ public class CompactionTaskRunTest extends IngestionTestBase
         getObjectMapper(),
         AuthTestUtils.TEST_AUTHORIZER_MAPPER,
         null,
-        rowIngestionMetersFactory
+        rowIngestionMetersFactory,
+        coordinatorClient,
+        segmentLoaderFactory,
+        retryPolicyFactory
     );
 
     // day segmentGranularity
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java
index f759cc7..5117c1a 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java
@@ -28,6 +28,7 @@ import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Lists;
+import org.apache.druid.client.coordinator.CoordinatorClient;
 import org.apache.druid.data.input.FirehoseFactory;
 import org.apache.druid.data.input.impl.DimensionSchema;
 import org.apache.druid.data.input.impl.DimensionsSpec;
@@ -41,6 +42,9 @@ import org.apache.druid.data.input.impl.TimeAndDimsParseSpec;
 import org.apache.druid.guice.GuiceAnnotationIntrospector;
 import org.apache.druid.guice.GuiceInjectableValues;
 import org.apache.druid.guice.GuiceInjectors;
+import org.apache.druid.indexing.common.RetryPolicyConfig;
+import org.apache.druid.indexing.common.RetryPolicyFactory;
+import org.apache.druid.indexing.common.SegmentLoaderFactory;
 import org.apache.druid.indexing.common.TaskToolbox;
 import org.apache.druid.indexing.common.TestUtils;
 import org.apache.druid.indexing.common.actions.SegmentListUsedAction;
@@ -153,12 +157,15 @@ public class CompactionTaskTest
   private static List<AggregatorFactory> AGGREGATORS;
   private static List<DataSegment> SEGMENTS;
   private static RowIngestionMetersFactory rowIngestionMetersFactory = new TestUtils().getRowIngestionMetersFactory();
+  private static Map<DataSegment, File> segmentMap = new HashMap<>();
+  private static CoordinatorClient coordinatorClient = new TestCoordinatorClient(segmentMap);
   private static ObjectMapper objectMapper = setupInjectablesInObjectMapper(new DefaultObjectMapper());
-  private static Map<DataSegment, File> segmentMap;
+  private static RetryPolicyFactory retryPolicyFactory = new RetryPolicyFactory(new RetryPolicyConfig());
 
   private final boolean keepSegmentGranularity;
 
   private TaskToolbox toolbox;
+  private SegmentLoaderFactory segmentLoaderFactory;
 
   @BeforeClass
   public static void setupClass()
@@ -202,7 +209,6 @@ public class CompactionTaskTest
     AGGREGATORS.add(new FloatFirstAggregatorFactory("agg_3", "float_dim_3"));
     AGGREGATORS.add(new DoubleLastAggregatorFactory("agg_4", "double_dim_4"));
 
-    segmentMap = new HashMap<>(SEGMENT_INTERVALS.size());
     for (int i = 0; i < SEGMENT_INTERVALS.size(); i++) {
       final Interval segmentInterval = Intervals.of(StringUtils.format("2017-0%d-01/2017-0%d-01", (i + 1), (i + 2)));
       segmentMap.put(
@@ -243,6 +249,8 @@ public class CompactionTaskTest
                   binder.bind(AuthorizerMapper.class).toInstance(AuthTestUtils.TEST_AUTHORIZER_MAPPER);
                   binder.bind(ChatHandlerProvider.class).toInstance(new NoopChatHandlerProvider());
                   binder.bind(RowIngestionMetersFactory.class).toInstance(rowIngestionMetersFactory);
+                  binder.bind(CoordinatorClient.class).toInstance(coordinatorClient);
+                  binder.bind(SegmentLoaderFactory.class).toInstance(new SegmentLoaderFactory(null, objectMapper));
                 }
             )
         )
@@ -307,19 +315,21 @@ public class CompactionTaskTest
   @Before
   public void setup()
   {
+    final IndexIO testIndexIO = new TestIndexIO(objectMapper, segmentMap);
     toolbox = new TestTaskToolbox(
         new TestTaskActionClient(new ArrayList<>(segmentMap.keySet())),
-        new TestIndexIO(objectMapper, segmentMap),
+        testIndexIO,
         segmentMap
     );
+    segmentLoaderFactory = new SegmentLoaderFactory(testIndexIO, objectMapper);
   }
 
   @Parameters(name = "keepSegmentGranularity={0}")
   public static Collection<Object[]> parameters()
   {
     return ImmutableList.of(
-        new Object[] {false},
-        new Object[] {true}
+        new Object[]{false},
+        new Object[]{true}
     );
   }
 
@@ -336,7 +346,10 @@ public class CompactionTaskTest
         objectMapper,
         AuthTestUtils.TEST_AUTHORIZER_MAPPER,
         null,
-        rowIngestionMetersFactory
+        rowIngestionMetersFactory,
+        coordinatorClient,
+        segmentLoaderFactory,
+        retryPolicyFactory
     );
     final CompactionTask task = builder
         .interval(COMPACTION_INTERVAL)
@@ -357,7 +370,10 @@ public class CompactionTaskTest
         objectMapper,
         AuthTestUtils.TEST_AUTHORIZER_MAPPER,
         null,
-        rowIngestionMetersFactory
+        rowIngestionMetersFactory,
+        coordinatorClient,
+        segmentLoaderFactory,
+        retryPolicyFactory
     );
     final CompactionTask task = builder
         .segments(SEGMENTS)
@@ -378,7 +394,10 @@ public class CompactionTaskTest
         objectMapper,
         AuthTestUtils.TEST_AUTHORIZER_MAPPER,
         null,
-        rowIngestionMetersFactory
+        rowIngestionMetersFactory,
+        coordinatorClient,
+        segmentLoaderFactory,
+        retryPolicyFactory
     );
 
     final CompactionTask task = builder
@@ -426,7 +445,10 @@ public class CompactionTaskTest
         null,
         keepSegmentGranularity,
         null,
-        objectMapper
+        objectMapper,
+        coordinatorClient,
+        segmentLoaderFactory,
+        retryPolicyFactory
     );
     final List<DimensionsSpec> expectedDimensionsSpec = getExpectedDimensionsSpecForAutoGeneration(
         keepSegmentGranularity
@@ -440,7 +462,13 @@ public class CompactionTaskTest
           )
       );
       Assert.assertEquals(6, ingestionSpecs.size());
-      assertIngestionSchema(ingestionSpecs, expectedDimensionsSpec, AGGREGATORS, SEGMENT_INTERVALS, Granularities.MONTH);
+      assertIngestionSchema(
+          ingestionSpecs,
+          expectedDimensionsSpec,
+          AGGREGATORS,
+          SEGMENT_INTERVALS,
+          Granularities.MONTH
+      );
     } else {
       Assert.assertEquals(1, ingestionSpecs.size());
       assertIngestionSchema(
@@ -491,7 +519,10 @@ public class CompactionTaskTest
         null,
         keepSegmentGranularity,
         null,
-        objectMapper
+        objectMapper,
+        coordinatorClient,
+        segmentLoaderFactory,
+        retryPolicyFactory
     );
     final List<DimensionsSpec> expectedDimensionsSpec = getExpectedDimensionsSpecForAutoGeneration(
         keepSegmentGranularity
@@ -564,7 +595,10 @@ public class CompactionTaskTest
         null,
         keepSegmentGranularity,
         null,
-        objectMapper
+        objectMapper,
+        coordinatorClient,
+        segmentLoaderFactory,
+        retryPolicyFactory
     );
     final List<DimensionsSpec> expectedDimensionsSpec = getExpectedDimensionsSpecForAutoGeneration(
         keepSegmentGranularity
@@ -637,7 +671,10 @@ public class CompactionTaskTest
         null,
         keepSegmentGranularity,
         null,
-        objectMapper
+        objectMapper,
+        coordinatorClient,
+        segmentLoaderFactory,
+        retryPolicyFactory
     );
     final List<DimensionsSpec> expectedDimensionsSpec = getExpectedDimensionsSpecForAutoGeneration(
         keepSegmentGranularity
@@ -710,7 +747,10 @@ public class CompactionTaskTest
         null,
         keepSegmentGranularity,
         null,
-        objectMapper
+        objectMapper,
+        coordinatorClient,
+        segmentLoaderFactory,
+        retryPolicyFactory
     );
 
     if (keepSegmentGranularity) {
@@ -760,7 +800,10 @@ public class CompactionTaskTest
         customMetricsSpec,
         keepSegmentGranularity,
         null,
-        objectMapper
+        objectMapper,
+        coordinatorClient,
+        segmentLoaderFactory,
+        retryPolicyFactory
     );
 
     final List<DimensionsSpec> expectedDimensionsSpec = getExpectedDimensionsSpecForAutoGeneration(
@@ -805,7 +848,10 @@ public class CompactionTaskTest
         null,
         keepSegmentGranularity,
         null,
-        objectMapper
+        objectMapper,
+        coordinatorClient,
+        segmentLoaderFactory,
+        retryPolicyFactory
     );
     final List<DimensionsSpec> expectedDimensionsSpec = getExpectedDimensionsSpecForAutoGeneration(
         keepSegmentGranularity
@@ -819,7 +865,13 @@ public class CompactionTaskTest
           )
       );
       Assert.assertEquals(6, ingestionSpecs.size());
-      assertIngestionSchema(ingestionSpecs, expectedDimensionsSpec, AGGREGATORS, SEGMENT_INTERVALS, Granularities.MONTH);
+      assertIngestionSchema(
+          ingestionSpecs,
+          expectedDimensionsSpec,
+          AGGREGATORS,
+          SEGMENT_INTERVALS,
+          Granularities.MONTH
+      );
     } else {
       Assert.assertEquals(1, ingestionSpecs.size());
       assertIngestionSchema(
@@ -850,7 +902,10 @@ public class CompactionTaskTest
         null,
         keepSegmentGranularity,
         null,
-        objectMapper
+        objectMapper,
+        coordinatorClient,
+        segmentLoaderFactory,
+        retryPolicyFactory
     );
   }
 
@@ -871,7 +926,10 @@ public class CompactionTaskTest
         null,
         keepSegmentGranularity,
         null,
-        objectMapper
+        objectMapper,
+        coordinatorClient,
+        segmentLoaderFactory,
+        retryPolicyFactory
     );
   }
 
@@ -886,7 +944,10 @@ public class CompactionTaskTest
         objectMapper,
         AuthTestUtils.TEST_AUTHORIZER_MAPPER,
         null,
-        rowIngestionMetersFactory
+        rowIngestionMetersFactory,
+        coordinatorClient,
+        segmentLoaderFactory,
+        retryPolicyFactory
     );
 
     final CompactionTask task = builder
@@ -934,7 +995,10 @@ public class CompactionTaskTest
         null,
         keepSegmentGranularity,
         null,
-        objectMapper
+        objectMapper,
+        coordinatorClient,
+        segmentLoaderFactory,
+        retryPolicyFactory
     );
   }
 
@@ -949,7 +1013,10 @@ public class CompactionTaskTest
         null,
         null,
         new PeriodGranularity(Period.months(3), null, null),
-        objectMapper
+        objectMapper,
+        coordinatorClient,
+        segmentLoaderFactory,
+        retryPolicyFactory
     );
     final List<DimensionsSpec> expectedDimensionsSpec = ImmutableList.of(
         new DimensionsSpec(getDimensionSchema(new DoubleDimensionSchema("string_to_double")))
@@ -982,7 +1049,10 @@ public class CompactionTaskTest
         null,
         false,
         new PeriodGranularity(Period.months(3), null, null),
-        objectMapper
+        objectMapper,
+        coordinatorClient,
+        segmentLoaderFactory,
+        retryPolicyFactory
     );
     final List<DimensionsSpec> expectedDimensionsSpec = ImmutableList.of(
         new DimensionsSpec(getDimensionSchema(new DoubleDimensionSchema("string_to_double")))
@@ -1015,7 +1085,10 @@ public class CompactionTaskTest
         null,
         null,
         null,
-        objectMapper
+        objectMapper,
+        coordinatorClient,
+        segmentLoaderFactory,
+        retryPolicyFactory
     );
     final List<DimensionsSpec> expectedDimensionsSpec = getExpectedDimensionsSpecForAutoGeneration(
         true
@@ -1048,7 +1121,10 @@ public class CompactionTaskTest
         objectMapper,
         AuthTestUtils.TEST_AUTHORIZER_MAPPER,
         null,
-        rowIngestionMetersFactory
+        rowIngestionMetersFactory,
+        coordinatorClient,
+        segmentLoaderFactory,
+        retryPolicyFactory
     );
     final CompactionTask task = builder
         .interval(COMPACTION_INTERVAL)
@@ -1222,6 +1298,23 @@ public class CompactionTaskTest
     }
   }
 
+  private static class TestCoordinatorClient extends CoordinatorClient
+  {
+    private final Map<DataSegment, File> segmentMap;
+
+    TestCoordinatorClient(Map<DataSegment, File> segmentMap)
+    {
+      super(null, null);
+      this.segmentMap = segmentMap;
+    }
+
+    @Override
+    public List<DataSegment> getDatabaseSegmentDataSourceSegments(String dataSource, List<Interval> intervals)
+    {
+      return new ArrayList<>(segmentMap.keySet());
+    }
+  }
+
   private static class TestTaskToolbox extends TaskToolbox
   {
     private final Map<DataSegment, File> segmentFileMap;
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/RealtimeIndexTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/RealtimeIndexTaskTest.java
index 2c3d3c3..2db9272 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/RealtimeIndexTaskTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/RealtimeIndexTaskTest.java
@@ -107,7 +107,6 @@ import org.apache.druid.segment.indexing.RealtimeIOConfig;
 import org.apache.druid.segment.indexing.RealtimeTuningConfig;
 import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec;
 import org.apache.druid.segment.loading.SegmentLoaderConfig;
-import org.apache.druid.segment.loading.SegmentLoaderLocalCacheManager;
 import org.apache.druid.segment.loading.StorageLocationConfig;
 import org.apache.druid.segment.realtime.FireDepartment;
 import org.apache.druid.segment.realtime.plumber.SegmentHandoffNotifier;
@@ -1076,9 +1075,7 @@ public class RealtimeIndexTaskTest
         () -> conglomerate,
         Execs.directExecutor(), // queryExecutorService
         EasyMock.createMock(MonitorScheduler.class),
-        new SegmentLoaderFactory(
-            new SegmentLoaderLocalCacheManager(null, segmentLoaderConfig, testUtils.getTestObjectMapper())
-        ),
+        new SegmentLoaderFactory(null, testUtils.getTestObjectMapper()),
         testUtils.getTestObjectMapper(),
         testUtils.getTestIndexIO(),
         MapCache.create(1024),
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/firehose/IngestSegmentFirehoseFactoryTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/firehose/IngestSegmentFirehoseFactoryTest.java
index 3329419..7f44ad6 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/firehose/IngestSegmentFirehoseFactoryTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/firehose/IngestSegmentFirehoseFactoryTest.java
@@ -29,6 +29,9 @@ import com.google.common.collect.ImmutableSet;
 import com.google.common.io.Files;
 import com.google.inject.Binder;
 import com.google.inject.Module;
+import org.apache.druid.client.coordinator.CoordinatorClient;
+import org.apache.druid.data.input.Firehose;
+import org.apache.druid.data.input.FirehoseFactory;
 import org.apache.druid.data.input.InputRow;
 import org.apache.druid.data.input.impl.DimensionsSpec;
 import org.apache.druid.data.input.impl.InputRowParser;
@@ -39,21 +42,16 @@ import org.apache.druid.data.input.impl.TimestampSpec;
 import org.apache.druid.guice.GuiceAnnotationIntrospector;
 import org.apache.druid.guice.GuiceInjectableValues;
 import org.apache.druid.guice.GuiceInjectors;
+import org.apache.druid.indexing.common.RetryPolicyConfig;
+import org.apache.druid.indexing.common.RetryPolicyFactory;
 import org.apache.druid.indexing.common.SegmentLoaderFactory;
-import org.apache.druid.indexing.common.TaskToolboxFactory;
 import org.apache.druid.indexing.common.TestUtils;
-import org.apache.druid.indexing.common.actions.LocalTaskActionClientFactory;
-import org.apache.druid.indexing.common.actions.TaskActionToolbox;
-import org.apache.druid.indexing.common.actions.TaskAuditLogConfig;
-import org.apache.druid.indexing.common.config.TaskConfig;
 import org.apache.druid.indexing.common.config.TaskStorageConfig;
 import org.apache.druid.indexing.common.task.NoopTask;
-import org.apache.druid.indexing.common.task.NoopTestTaskFileWriter;
 import org.apache.druid.indexing.common.task.Task;
 import org.apache.druid.indexing.overlord.HeapMemoryTaskStorage;
 import org.apache.druid.indexing.overlord.TaskLockbox;
 import org.apache.druid.indexing.overlord.TaskStorage;
-import org.apache.druid.indexing.overlord.supervisor.SupervisorManager;
 import org.apache.druid.java.util.common.IOE;
 import org.apache.druid.java.util.common.Intervals;
 import org.apache.druid.java.util.common.JodaUtils;
@@ -61,7 +59,6 @@ import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.java.util.common.logger.Logger;
 import org.apache.druid.java.util.emitter.service.ServiceEmitter;
 import org.apache.druid.math.expr.ExprMacroTable;
-import org.apache.druid.metadata.IndexerSQLMetadataStorageCoordinator;
 import org.apache.druid.query.aggregation.DoubleSumAggregatorFactory;
 import org.apache.druid.query.aggregation.LongSumAggregatorFactory;
 import org.apache.druid.query.filter.SelectorDimFilter;
@@ -72,16 +69,9 @@ import org.apache.druid.segment.TestHelper;
 import org.apache.druid.segment.column.ColumnHolder;
 import org.apache.druid.segment.incremental.IncrementalIndex;
 import org.apache.druid.segment.incremental.IncrementalIndexSchema;
-import org.apache.druid.segment.loading.DataSegmentArchiver;
-import org.apache.druid.segment.loading.DataSegmentKiller;
-import org.apache.druid.segment.loading.DataSegmentMover;
-import org.apache.druid.segment.loading.DataSegmentPusher;
 import org.apache.druid.segment.loading.LocalDataSegmentPuller;
 import org.apache.druid.segment.loading.LocalLoadSpec;
-import org.apache.druid.segment.loading.SegmentLoaderConfig;
-import org.apache.druid.segment.loading.SegmentLoaderLocalCacheManager;
-import org.apache.druid.segment.loading.StorageLocationConfig;
-import org.apache.druid.segment.realtime.firehose.IngestSegmentFirehose;
+import org.apache.druid.segment.realtime.firehose.CombiningFirehoseFactory;
 import org.apache.druid.segment.realtime.plumber.SegmentHandoffNotifierFactory;
 import org.apache.druid.segment.transform.ExpressionTransform;
 import org.apache.druid.segment.transform.TransformSpec;
@@ -103,15 +93,12 @@ import org.junit.runners.Parameterized;
 
 import java.io.File;
 import java.io.IOException;
-import java.net.URI;
 import java.nio.file.Paths;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 
@@ -170,157 +157,21 @@ public class IngestSegmentFirehoseFactoryTest
     }
     INDEX_MERGER_V9.persist(index, persistDir, indexSpec, null);
 
-    final IndexerSQLMetadataStorageCoordinator mdc = new IndexerSQLMetadataStorageCoordinator(null, null, null)
+    final CoordinatorClient cc = new CoordinatorClient(null, null)
     {
-      private final Set<DataSegment> published = new HashSet<>();
-
       @Override
-      public List<DataSegment> getUsedSegmentsForInterval(String dataSource, Interval interval)
+      public List<DataSegment> getDatabaseSegmentDataSourceSegments(String dataSource, List<Interval> intervals)
       {
         return ImmutableList.copyOf(segmentSet);
       }
-
-      @Override
-      public List<DataSegment> getUsedSegmentsForIntervals(String dataSource, List<Interval> interval)
-      {
-        return ImmutableList.copyOf(segmentSet);
-      }
-
-      @Override
-      public List<DataSegment> getUnusedSegmentsForInterval(String dataSource, Interval interval)
-      {
-        return ImmutableList.of();
-      }
-
-      @Override
-      public Set<DataSegment> announceHistoricalSegments(Set<DataSegment> segments)
-      {
-        Set<DataSegment> added = new HashSet<>();
-        for (final DataSegment segment : segments) {
-          if (published.add(segment)) {
-            added.add(segment);
-          }
-        }
-
-        return ImmutableSet.copyOf(added);
-      }
-
-      @Override
-      public void deleteSegments(Set<DataSegment> segments)
-      {
-        // do nothing
-      }
     };
-    final LocalTaskActionClientFactory tac = new LocalTaskActionClientFactory(
-        TASK_STORAGE,
-        new TaskActionToolbox(
-            TASK_LOCKBOX,
-            TASK_STORAGE,
-            mdc,
-            newMockEmitter(),
-            EasyMock.createMock(SupervisorManager.class)
-        ),
-        new TaskAuditLogConfig(false)
-    );
+
     SegmentHandoffNotifierFactory notifierFactory = EasyMock.createNiceMock(SegmentHandoffNotifierFactory.class);
     EasyMock.replay(notifierFactory);
 
-    SegmentLoaderConfig segmentLoaderConfig = new SegmentLoaderConfig()
-    {
-      @Override
-      public List<StorageLocationConfig> getLocations()
-      {
-        return new ArrayList<>();
-      }
-    };
-    final TaskToolboxFactory taskToolboxFactory = new TaskToolboxFactory(
-        new TaskConfig(tmpDir.getAbsolutePath(), null, null, 50000, null, false, null, null),
-        tac,
-        newMockEmitter(),
-        new DataSegmentPusher()
-        {
-          @Deprecated
-          @Override
-          public String getPathForHadoop(String dataSource)
-          {
-            return getPathForHadoop();
-          }
-
-          @Override
-          public String getPathForHadoop()
-          {
-            throw new UnsupportedOperationException();
-          }
-
-          @Override
-          public DataSegment push(File file, DataSegment segment, boolean useUniquePath)
-          {
-            return segment;
-          }
+    final SegmentLoaderFactory slf = new SegmentLoaderFactory(null, MAPPER);
+    final RetryPolicyFactory retryPolicyFactory = new RetryPolicyFactory(new RetryPolicyConfig());
 
-          @Override
-          public Map<String, Object> makeLoadSpec(URI uri)
-          {
-            throw new UnsupportedOperationException();
-          }
-        },
-        new DataSegmentKiller()
-        {
-          @Override
-          public void kill(DataSegment segments)
-          {
-
-          }
-
-          @Override
-          public void killAll()
-          {
-            throw new UnsupportedOperationException("not implemented");
-          }
-        },
-        new DataSegmentMover()
-        {
-          @Override
-          public DataSegment move(DataSegment dataSegment, Map<String, Object> targetLoadSpec)
-          {
-            return dataSegment;
-          }
-        },
-        new DataSegmentArchiver()
-        {
-          @Override
-          public DataSegment archive(DataSegment segment)
-          {
-            return segment;
-          }
-
-          @Override
-          public DataSegment restore(DataSegment segment)
-          {
-            return segment;
-          }
-        },
-        null, // segment announcer
-        null,
-        notifierFactory,
-        null, // query runner factory conglomerate corporation unionized collective
-        null, // query executor service
-        null, // monitor scheduler
-        new SegmentLoaderFactory(
-            new SegmentLoaderLocalCacheManager(null, segmentLoaderConfig, MAPPER)
-        ),
-        MAPPER,
-        INDEX_IO,
-        null,
-        null,
-        null,
-        INDEX_MERGER_V9,
-        null,
-        null,
-        null,
-        null,
-        new NoopTestTaskFileWriter()
-    );
     Collection<Object[]> values = new ArrayList<>();
     for (InputRowParser parser : Arrays.<InputRowParser>asList(
         ROW_PARSER,
@@ -342,27 +193,35 @@ public class IngestSegmentFirehoseFactoryTest
             null,
             ImmutableList.of(METRIC_LONG_NAME, METRIC_FLOAT_NAME)
         )) {
-          final IngestSegmentFirehoseFactory factory = new IngestSegmentFirehoseFactory(
-              TASK.getDataSource(),
-              Intervals.ETERNITY,
-              new SelectorDimFilter(DIM_NAME, DIM_VALUE, null),
-              dim_names,
-              metric_names,
-              INDEX_IO
-          );
-          factory.setTaskToolbox(taskToolboxFactory.build(TASK));
-          values.add(
-              new Object[]{
-                  StringUtils.format(
-                      "DimNames[%s]MetricNames[%s]ParserDimNames[%s]",
-                      dim_names == null ? "null" : "dims",
-                      metric_names == null ? "null" : "metrics",
-                      parser == ROW_PARSER ? "dims" : "null"
-                  ),
-                  factory,
-                  parser
-              }
-          );
+          for (Boolean wrapInCombining : Arrays.asList(false, true)) {
+            final IngestSegmentFirehoseFactory isfFactory = new IngestSegmentFirehoseFactory(
+                TASK.getDataSource(),
+                Intervals.ETERNITY,
+                new SelectorDimFilter(DIM_NAME, DIM_VALUE, null),
+                dim_names,
+                metric_names,
+                INDEX_IO,
+                cc,
+                slf,
+                retryPolicyFactory
+            );
+            final FirehoseFactory factory = wrapInCombining
+                                            ? new CombiningFirehoseFactory(ImmutableList.of(isfFactory))
+                                            : isfFactory;
+            values.add(
+                new Object[]{
+                    StringUtils.format(
+                        "DimNames[%s]MetricNames[%s]ParserDimNames[%s]WrapInCombining[%s]",
+                        dim_names == null ? "null" : "dims",
+                        metric_names == null ? "null" : "metrics",
+                        parser == ROW_PARSER ? "dims" : "null",
+                        wrapInCombining
+                    ),
+                    factory,
+                    parser
+                }
+            );
+          }
         }
       }
     }
@@ -407,7 +266,7 @@ public class IngestSegmentFirehoseFactoryTest
 
   public IngestSegmentFirehoseFactoryTest(
       String testName,
-      IngestSegmentFirehoseFactory factory,
+      FirehoseFactory factory,
       InputRowParser rowParser
   )
   {
@@ -436,7 +295,7 @@ public class IngestSegmentFirehoseFactoryTest
   private static final File persistDir = Paths.get(tmpDir.getAbsolutePath(), "indexTestMerger").toFile();
   private static final List<DataSegment> segmentSet = new ArrayList<>(MAX_SHARD_NUMBER);
 
-  private final IngestSegmentFirehoseFactory factory;
+  private final FirehoseFactory<InputRowParser> factory;
   private final InputRowParser rowParser;
 
   private static final InputRowParser<Map<String, Object>> ROW_PARSER = new MapInputRowParser(
@@ -518,15 +377,20 @@ public class IngestSegmentFirehoseFactoryTest
   @Test
   public void sanityTest()
   {
-    Assert.assertEquals(TASK.getDataSource(), factory.getDataSource());
-    if (factory.getDimensions() != null) {
-      Assert.assertArrayEquals(new String[]{DIM_NAME}, factory.getDimensions().toArray());
+    if (factory instanceof CombiningFirehoseFactory) {
+      // This method tests IngestSegmentFirehoseFactory-specific methods.
+      return;
+    }
+    final IngestSegmentFirehoseFactory isfFactory = (IngestSegmentFirehoseFactory) factory;
+    Assert.assertEquals(TASK.getDataSource(), isfFactory.getDataSource());
+    if (isfFactory.getDimensions() != null) {
+      Assert.assertArrayEquals(new String[]{DIM_NAME}, isfFactory.getDimensions().toArray());
     }
-    Assert.assertEquals(Intervals.ETERNITY, factory.getInterval());
-    if (factory.getMetrics() != null) {
+    Assert.assertEquals(Intervals.ETERNITY, isfFactory.getInterval());
+    if (isfFactory.getMetrics() != null) {
       Assert.assertEquals(
           ImmutableSet.of(METRIC_LONG_NAME, METRIC_FLOAT_NAME),
-          ImmutableSet.copyOf(factory.getMetrics())
+          ImmutableSet.copyOf(isfFactory.getMetrics())
       );
     }
   }
@@ -536,15 +400,17 @@ public class IngestSegmentFirehoseFactoryTest
   {
     Assert.assertEquals(MAX_SHARD_NUMBER.longValue(), segmentSet.size());
     Integer rowcount = 0;
-    try (final IngestSegmentFirehose firehose =
-             (IngestSegmentFirehose)
-                 factory.connect(rowParser, null)) {
+    try (final Firehose firehose = factory.connect(rowParser, null)) {
       while (firehose.hasMore()) {
         InputRow row = firehose.nextRow();
         Assert.assertArrayEquals(new String[]{DIM_NAME}, row.getDimensions().toArray());
         Assert.assertArrayEquals(new String[]{DIM_VALUE}, row.getDimension(DIM_NAME).toArray());
         Assert.assertEquals(METRIC_LONG_VALUE.longValue(), row.getMetric(METRIC_LONG_NAME));
-        Assert.assertEquals(METRIC_FLOAT_VALUE, row.getMetric(METRIC_FLOAT_NAME).floatValue(), METRIC_FLOAT_VALUE * 0.0001);
+        Assert.assertEquals(
+            METRIC_FLOAT_VALUE,
+            row.getMetric(METRIC_FLOAT_NAME).floatValue(),
+            METRIC_FLOAT_VALUE * 0.0001
+        );
         ++rowcount;
       }
     }
@@ -563,9 +429,8 @@ public class IngestSegmentFirehoseFactoryTest
         )
     );
     int skipped = 0;
-    try (final IngestSegmentFirehose firehose =
-             (IngestSegmentFirehose)
-                 factory.connect(transformSpec.decorate(rowParser), null)) {
+    try (final Firehose firehose =
+             factory.connect(transformSpec.decorate(rowParser), null)) {
       while (firehose.hasMore()) {
         InputRow row = firehose.nextRow();
         if (row == null) {
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/firehose/IngestSegmentFirehoseFactoryTimelineTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/firehose/IngestSegmentFirehoseFactoryTimelineTest.java
index 70e877b..2cc0948 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/firehose/IngestSegmentFirehoseFactoryTimelineTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/firehose/IngestSegmentFirehoseFactoryTimelineTest.java
@@ -26,6 +26,7 @@ import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Iterables;
 import com.google.common.io.Files;
 import org.apache.commons.io.FileUtils;
+import org.apache.druid.client.coordinator.CoordinatorClient;
 import org.apache.druid.data.input.Firehose;
 import org.apache.druid.data.input.InputRow;
 import org.apache.druid.data.input.MapBasedInputRow;
@@ -34,20 +35,10 @@ import org.apache.druid.data.input.impl.InputRowParser;
 import org.apache.druid.data.input.impl.JSONParseSpec;
 import org.apache.druid.data.input.impl.MapInputRowParser;
 import org.apache.druid.data.input.impl.TimestampSpec;
+import org.apache.druid.indexing.common.RetryPolicyConfig;
+import org.apache.druid.indexing.common.RetryPolicyFactory;
 import org.apache.druid.indexing.common.SegmentLoaderFactory;
-import org.apache.druid.indexing.common.TaskLock;
-import org.apache.druid.indexing.common.TaskLockType;
-import org.apache.druid.indexing.common.TaskToolboxFactory;
 import org.apache.druid.indexing.common.TestUtils;
-import org.apache.druid.indexing.common.actions.LockAcquireAction;
-import org.apache.druid.indexing.common.actions.SegmentListUsedAction;
-import org.apache.druid.indexing.common.actions.TaskAction;
-import org.apache.druid.indexing.common.actions.TaskActionClient;
-import org.apache.druid.indexing.common.actions.TaskActionClientFactory;
-import org.apache.druid.indexing.common.config.TaskConfig;
-import org.apache.druid.indexing.common.task.NoopTask;
-import org.apache.druid.indexing.common.task.NoopTestTaskFileWriter;
-import org.apache.druid.indexing.common.task.Task;
 import org.apache.druid.java.util.common.DateTimes;
 import org.apache.druid.java.util.common.Intervals;
 import org.apache.druid.java.util.common.JodaUtils;
@@ -59,12 +50,8 @@ import org.apache.druid.segment.IndexSpec;
 import org.apache.druid.segment.incremental.IncrementalIndex;
 import org.apache.druid.segment.incremental.IncrementalIndexSchema;
 import org.apache.druid.segment.incremental.IndexSizeExceededException;
-import org.apache.druid.segment.loading.SegmentLoaderConfig;
-import org.apache.druid.segment.loading.SegmentLoaderLocalCacheManager;
-import org.apache.druid.segment.loading.StorageLocationConfig;
 import org.apache.druid.segment.realtime.plumber.SegmentHandoffNotifierFactory;
 import org.apache.druid.segment.transform.TransformSpec;
-import org.apache.druid.server.metrics.NoopServiceEmitter;
 import org.apache.druid.timeline.DataSegment;
 import org.apache.druid.timeline.SegmentId;
 import org.apache.druid.timeline.partition.LinearShardSpec;
@@ -283,81 +270,34 @@ public class IngestSegmentFirehoseFactoryTimelineTest
     final List<Object[]> constructors = new ArrayList<>();
 
     for (final TestCase testCase : testCases) {
-      final TaskActionClient taskActionClient = new TaskActionClient()
-      {
-        @Override
-        public <RetType> RetType submit(TaskAction<RetType> taskAction)
-        {
-          if (taskAction instanceof SegmentListUsedAction) {
-            // Expect the interval we asked for
-            final SegmentListUsedAction action = (SegmentListUsedAction) taskAction;
-            if (action.getIntervals().equals(ImmutableList.of(testCase.interval))) {
-              return (RetType) ImmutableList.copyOf(testCase.segments);
-            } else {
-              throw new IllegalArgumentException("WTF");
-            }
-          } else if (taskAction instanceof LockAcquireAction) {
-            return (RetType) new TaskLock(TaskLockType.EXCLUSIVE, null, DATA_SOURCE, Intervals.of("2000/2001"), "v1", 0);
-          } else {
-            throw new UnsupportedOperationException();
-          }
-        }
-      };
       SegmentHandoffNotifierFactory notifierFactory = EasyMock.createNiceMock(SegmentHandoffNotifierFactory.class);
       EasyMock.replay(notifierFactory);
-      SegmentLoaderConfig segmentLoaderConfig = new SegmentLoaderConfig()
+      final SegmentLoaderFactory slf = new SegmentLoaderFactory(null, MAPPER);
+      final RetryPolicyFactory retryPolicyFactory = new RetryPolicyFactory(new RetryPolicyConfig());
+      final CoordinatorClient cc = new CoordinatorClient(null, null)
       {
         @Override
-        public List<StorageLocationConfig> getLocations()
+        public List<DataSegment> getDatabaseSegmentDataSourceSegments(String dataSource, List<Interval> intervals)
         {
-          return new ArrayList<>();
+          // Expect the interval we asked for
+          if (intervals.equals(ImmutableList.of(testCase.interval))) {
+            return ImmutableList.copyOf(testCase.segments);
+          } else {
+            throw new IllegalArgumentException("WTF");
+          }
         }
       };
-      final TaskToolboxFactory taskToolboxFactory = new TaskToolboxFactory(
-          new TaskConfig(testCase.tmpDir.getAbsolutePath(), null, null, 50000, null, false, null, null),
-          new TaskActionClientFactory()
-          {
-            @Override
-            public TaskActionClient create(Task task)
-            {
-              return taskActionClient;
-            }
-          },
-          new NoopServiceEmitter(),
-          null, // segment pusher
-          null, // segment killer
-          null, // segment mover
-          null, // segment archiver
-          null, // segment announcer,
-          null,
-          notifierFactory,
-          null, // query runner factory conglomerate corporation unionized collective
-          null, // query executor service
-          null, // monitor scheduler
-          new SegmentLoaderFactory(
-              new SegmentLoaderLocalCacheManager(null, segmentLoaderConfig, MAPPER)
-          ),
-          MAPPER,
-          INDEX_IO,
-          null,
-          null,
-          null,
-          INDEX_MERGER_V9,
-          null,
-          null,
-          null,
-          null,
-          new NoopTestTaskFileWriter()
-      );
       final IngestSegmentFirehoseFactory factory = new IngestSegmentFirehoseFactory(
           DATA_SOURCE,
           testCase.interval,
           new TrueDimFilter(),
           Arrays.asList(DIMENSIONS),
           Arrays.asList(METRICS),
-          INDEX_IO
+          INDEX_IO,
+          cc,
+          slf,
+          retryPolicyFactory
       );
-      factory.setTaskToolbox(taskToolboxFactory.build(NoopTask.create(DATA_SOURCE)));
 
       constructors.add(
           new Object[]{
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/SingleTaskBackgroundRunnerTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/SingleTaskBackgroundRunnerTest.java
index c8f9380..49315d3 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/SingleTaskBackgroundRunnerTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/SingleTaskBackgroundRunnerTest.java
@@ -37,7 +37,6 @@ import org.apache.druid.segment.loading.NoopDataSegmentArchiver;
 import org.apache.druid.segment.loading.NoopDataSegmentKiller;
 import org.apache.druid.segment.loading.NoopDataSegmentMover;
 import org.apache.druid.segment.loading.NoopDataSegmentPusher;
-import org.apache.druid.segment.loading.SegmentLoaderLocalCacheManager;
 import org.apache.druid.server.DruidNode;
 import org.apache.druid.server.coordination.NoopDataSegmentAnnouncer;
 import org.apache.druid.server.initialization.ServerConfig;
@@ -94,7 +93,7 @@ public class SingleTaskBackgroundRunnerTest
         null,
         null,
         null,
-        new SegmentLoaderFactory(EasyMock.createMock(SegmentLoaderLocalCacheManager.class)),
+        new SegmentLoaderFactory(null, utils.getTestObjectMapper()),
         utils.getTestObjectMapper(),
         utils.getTestIndexIO(),
         null,
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java
index 265150c..455c3f0 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java
@@ -106,7 +106,6 @@ import org.apache.druid.segment.loading.DataSegmentPusher;
 import org.apache.druid.segment.loading.LocalDataSegmentKiller;
 import org.apache.druid.segment.loading.LocalDataSegmentPusherConfig;
 import org.apache.druid.segment.loading.SegmentLoaderConfig;
-import org.apache.druid.segment.loading.SegmentLoaderLocalCacheManager;
 import org.apache.druid.segment.loading.StorageLocationConfig;
 import org.apache.druid.segment.realtime.FireDepartment;
 import org.apache.druid.segment.realtime.FireDepartmentTest;
@@ -611,9 +610,7 @@ public class TaskLifecycleTest
         () -> queryRunnerFactoryConglomerate, // query runner factory conglomerate corporation unionized collective
         Execs.directExecutor(), // query executor service
         monitorScheduler, // monitor scheduler
-        new SegmentLoaderFactory(
-            new SegmentLoaderLocalCacheManager(null, segmentLoaderConfig, new DefaultObjectMapper())
-        ),
+        new SegmentLoaderFactory(null, new DefaultObjectMapper()),
         MAPPER,
         INDEX_IO,
         MapCache.create(0),
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/worker/WorkerTaskManagerTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/worker/WorkerTaskManagerTest.java
index 749d44a..b86b654 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/worker/WorkerTaskManagerTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/worker/WorkerTaskManagerTest.java
@@ -41,7 +41,6 @@ import org.apache.druid.indexing.overlord.TestTaskRunner;
 import org.apache.druid.segment.IndexIO;
 import org.apache.druid.segment.IndexMergerV9;
 import org.apache.druid.segment.loading.SegmentLoaderConfig;
-import org.apache.druid.segment.loading.SegmentLoaderLocalCacheManager;
 import org.apache.druid.segment.loading.StorageLocationConfig;
 import org.apache.druid.segment.realtime.plumber.SegmentHandoffNotifierFactory;
 import org.apache.druid.server.coordination.ChangeRequestHistory;
@@ -120,7 +119,7 @@ public class WorkerTaskManagerTest
                 null,
                 null,
                 null,
-                new SegmentLoaderFactory(new SegmentLoaderLocalCacheManager(null, loaderConfig, jsonMapper)),
+                new SegmentLoaderFactory(null, jsonMapper),
                 jsonMapper,
                 indexIO,
                 null,
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/worker/WorkerTaskMonitorTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/worker/WorkerTaskMonitorTest.java
index 20eb5fc..4afdd3c 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/worker/WorkerTaskMonitorTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/worker/WorkerTaskMonitorTest.java
@@ -46,9 +46,6 @@ import org.apache.druid.indexing.overlord.TestRemoteTaskRunnerConfig;
 import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.segment.IndexIO;
 import org.apache.druid.segment.IndexMergerV9;
-import org.apache.druid.segment.loading.SegmentLoaderConfig;
-import org.apache.druid.segment.loading.SegmentLoaderLocalCacheManager;
-import org.apache.druid.segment.loading.StorageLocationConfig;
 import org.apache.druid.segment.realtime.plumber.SegmentHandoffNotifierFactory;
 import org.apache.druid.server.DruidNode;
 import org.apache.druid.server.initialization.IndexerZkConfig;
@@ -62,10 +59,10 @@ import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 
-import java.util.ArrayList;
 import java.util.List;
 
 /**
+ *
  */
 public class WorkerTaskMonitorTest
 {
@@ -169,20 +166,8 @@ public class WorkerTaskMonitorTest
             new TaskToolboxFactory(
                 taskConfig,
                 taskActionClientFactory,
-                null, null, null, null, null, null, null, notifierFactory, null, null, null, new SegmentLoaderFactory(
-                new SegmentLoaderLocalCacheManager(
-                    null,
-                    new SegmentLoaderConfig()
-                    {
-                      @Override
-                      public List<StorageLocationConfig> getLocations()
-                      {
-                        return new ArrayList<>();
-                      }
-                    },
-                    jsonMapper
-                )
-            ),
+                null, null, null, null, null, null, null, notifierFactory, null, null, null,
+                new SegmentLoaderFactory(null, jsonMapper),
                 jsonMapper,
                 indexIO,
                 null,
diff --git a/server/src/main/java/org/apache/druid/client/coordinator/CoordinatorClient.java b/server/src/main/java/org/apache/druid/client/coordinator/CoordinatorClient.java
index 2a197cf..4188540 100644
--- a/server/src/main/java/org/apache/druid/client/coordinator/CoordinatorClient.java
+++ b/server/src/main/java/org/apache/druid/client/coordinator/CoordinatorClient.java
@@ -28,11 +28,13 @@ import org.apache.druid.java.util.common.ISE;
 import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.java.util.http.client.response.FullResponseHolder;
 import org.apache.druid.query.SegmentDescriptor;
+import org.apache.druid.timeline.DataSegment;
 import org.jboss.netty.handler.codec.http.HttpMethod;
 import org.jboss.netty.handler.codec.http.HttpResponseStatus;
 import org.joda.time.Interval;
 
 import javax.annotation.Nullable;
+import javax.ws.rs.core.MediaType;
 import java.util.List;
 
 public class CoordinatorClient
@@ -95,13 +97,15 @@ public class CoordinatorClient
   {
     try {
       FullResponseHolder response = druidLeaderClient.go(
-          druidLeaderClient.makeRequest(HttpMethod.GET,
-                                        StringUtils.format(
-                                            "/druid/coordinator/v1/datasources/%s/intervals/%s/serverview?partial=%s",
-                                            dataSource,
-                                            interval.toString().replace('/', '_'),
-                                            incompleteOk
-                                        ))
+          druidLeaderClient.makeRequest(
+              HttpMethod.GET,
+              StringUtils.format(
+                  "/druid/coordinator/v1/datasources/%s/intervals/%s/serverview?partial=%s",
+                  StringUtils.urlEncode(dataSource),
+                  interval.toString().replace('/', '_'),
+                  incompleteOk
+              )
+          )
       );
 
       if (!response.getStatus().equals(HttpResponseStatus.OK)) {
@@ -121,4 +125,35 @@ public class CoordinatorClient
       throw new RuntimeException(e);
     }
   }
+
+  public List<DataSegment> getDatabaseSegmentDataSourceSegments(String dataSource, List<Interval> intervals)
+  {
+    try {
+      FullResponseHolder response = druidLeaderClient.go(
+          druidLeaderClient.makeRequest(
+              HttpMethod.POST,
+              StringUtils.format(
+                  "/druid/coordinator/v1/metadata/datasources/%s/segments?full",
+                  StringUtils.urlEncode(dataSource)
+              )
+          ).setContent(MediaType.APPLICATION_JSON, jsonMapper.writeValueAsBytes(intervals))
+      );
+
+      if (!response.getStatus().equals(HttpResponseStatus.OK)) {
+        throw new ISE(
+            "Error while fetching database segment data source segments status[%s] content[%s]",
+            response.getStatus(),
+            response.getContent()
+        );
+      }
+      return jsonMapper.readValue(
+          response.getContent(), new TypeReference<List<DataSegment>>()
+          {
+          }
+      );
+    }
+    catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+  }
 }
diff --git a/server/src/main/java/org/apache/druid/segment/loading/SegmentLoaderLocalCacheManager.java b/server/src/main/java/org/apache/druid/segment/loading/SegmentLoaderLocalCacheManager.java
index 68e8a20..9298735 100644
--- a/server/src/main/java/org/apache/druid/segment/loading/SegmentLoaderLocalCacheManager.java
+++ b/server/src/main/java/org/apache/druid/segment/loading/SegmentLoaderLocalCacheManager.java
@@ -58,6 +58,9 @@ public class SegmentLoaderLocalCacheManager implements SegmentLoader
     }
   };
 
+  // Note that we only create this via injection in historical and realtime nodes. Peons create these
+  // objects via SegmentLoaderFactory objects, so that they can store segments in task-specific
+  // directories rather than statically configured directories.
   @Inject
   public SegmentLoaderLocalCacheManager(
       IndexIO indexIO,
@@ -79,11 +82,6 @@ public class SegmentLoaderLocalCacheManager implements SegmentLoader
     }
   }
 
-  public SegmentLoaderLocalCacheManager withConfig(SegmentLoaderConfig config)
-  {
-    return new SegmentLoaderLocalCacheManager(indexIO, config, jsonMapper);
-  }
-
   @Override
   public boolean isSegmentLoaded(final DataSegment segment)
   {
diff --git a/services/src/main/java/org/apache/druid/cli/CliPeon.java b/services/src/main/java/org/apache/druid/cli/CliPeon.java
index 71dc305..e4334f4 100644
--- a/services/src/main/java/org/apache/druid/cli/CliPeon.java
+++ b/services/src/main/java/org/apache/druid/cli/CliPeon.java
@@ -91,7 +91,6 @@ import org.apache.druid.segment.loading.DataSegmentMover;
 import org.apache.druid.segment.loading.OmniDataSegmentArchiver;
 import org.apache.druid.segment.loading.OmniDataSegmentKiller;
 import org.apache.druid.segment.loading.OmniDataSegmentMover;
-import org.apache.druid.segment.loading.SegmentLoaderConfig;
 import org.apache.druid.segment.realtime.firehose.ChatHandlerProvider;
 import org.apache.druid.segment.realtime.firehose.NoopChatHandlerProvider;
 import org.apache.druid.segment.realtime.firehose.ServiceAnnouncingChatHandlerProvider;
@@ -109,7 +108,6 @@ import org.eclipse.jetty.server.Server;
 import javax.annotation.Nullable;
 import java.io.File;
 import java.io.IOException;
-import java.util.Collections;
 import java.util.List;
 import java.util.Properties;
 import java.util.Set;
@@ -255,12 +253,6 @@ public class CliPeon extends GuiceRunnable
                   .to(CoordinatorBasedSegmentHandoffNotifierFactory.class)
                   .in(LazySingleton.class);
 
-            // Override the default SegmentLoaderConfig because we don't actually care about the
-            // configuration based locations.  This will override them anyway.  This is also stopping
-            // configuration of other parameters, but I don't think that's actually a problem.
-            // Note, if that is actually not a problem, then that probably means we have the wrong abstraction.
-            binder.bind(SegmentLoaderConfig.class)
-                  .toInstance(new SegmentLoaderConfig().withLocations(Collections.emptyList()));
             binder.bind(CoordinatorClient.class).in(LazySingleton.class);
 
             binder.bind(JettyServerInitializer.class).to(QueryJettyServerInitializer.class);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org