You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by ji...@apache.org on 2019/07/18 21:46:56 UTC

[incubator-druid] branch master updated: Add intermediary data server for shuffle (#8088)

This is an automated email from the ASF dual-hosted git repository.

jihoonson pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-druid.git


The following commit(s) were added to refs/heads/master by this push:
     new c7eb7cd  Add intermediary data server for shuffle (#8088)
c7eb7cd is described below

commit c7eb7cd01837c48914ba284d08b6096b47c957b0
Author: Jihoon Son <ji...@apache.org>
AuthorDate: Thu Jul 18 14:46:47 2019 -0700

    Add intermediary data server for shuffle (#8088)
    
    * Add intermediary data server for shuffle
    
    * javadoc
    
    * adjust timeout
    
    * resolved todo
    
    * fix test
    
    * style
    
    * address comments
    
    * rename to shuffleDataLocations
    
    * Address comments
    
    * bit adjustment StorageLocation
    
    * fix test
    
    * address comment & fix test
    
    * handle interrupted exception
---
 .../druid/indexing/kafka/KafkaIndexTaskTest.java   |   1 +
 .../indexing/kinesis/KinesisIndexTaskTest.java     |   1 +
 .../indexing/common/SegmentLoaderFactory.java      |   2 +-
 .../druid/indexing/common/config/TaskConfig.java   |  25 +-
 .../indexing/overlord/http/OverlordResource.java   |   4 +-
 .../indexing/worker/IntermediaryDataManager.java   | 331 +++++++++++++++++++++
 .../druid/indexing/worker/config/WorkerConfig.java |  31 +-
 .../indexing/worker/http/ShuffleResource.java      | 122 ++++++++
 .../druid/indexing/common/TaskToolboxTest.java     |   2 +-
 .../AppenderatorDriverRealtimeIndexTaskTest.java   |   2 +-
 .../common/task/CompactionTaskRunTest.java         |  11 +-
 .../druid/indexing/common/task/HadoopTaskTest.java |   1 +
 .../druid/indexing/common/task/IndexTaskTest.java  |   2 +-
 .../common/task/RealtimeIndexTaskTest.java         |   2 +-
 .../firehose/IngestSegmentFirehoseFactoryTest.java |  24 +-
 .../IngestSegmentFirehoseFactoryTimelineTest.java  |   4 +-
 .../overlord/SingleTaskBackgroundRunnerTest.java   |   1 +
 .../druid/indexing/overlord/TaskLifecycleTest.java |   2 +-
 .../IntermediaryDataManagerAutoCleanupTest.java    | 148 +++++++++
 ...ermediaryDataManagerManualAddAndDeleteTest.java | 174 +++++++++++
 .../indexing/worker/WorkerTaskManagerTest.java     |   1 +
 .../indexing/worker/WorkerTaskMonitorTest.java     |   1 +
 .../client/indexing/HttpIndexingServiceClient.java |  21 ++
 .../client/indexing/IndexingServiceClient.java     |   3 +
 .../druid/client/indexing/TaskStatusResponse.java  |   2 +-
 .../loading/SegmentLoaderLocalCacheManager.java    |   7 +-
 .../druid/segment/loading/StorageLocation.java     |  55 +++-
 .../segment/loading/StorageLocationConfig.java     |  56 ++--
 .../client/indexing/NoopIndexingServiceClient.java |   7 +
 .../SegmentLoaderLocalCacheManagerTest.java        |  36 +--
 .../druid/segment/loading/StorageLocationTest.java |  12 +-
 .../server/SegmentManagerThreadSafetyTest.java     |   2 +-
 .../org/apache/druid/cli/CliMiddleManager.java     |   5 +-
 33 files changed, 989 insertions(+), 109 deletions(-)

diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java
index edf4c22..3fd6511 100644
--- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java
+++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java
@@ -2562,6 +2562,7 @@ public class KafkaIndexTaskTest
         null,
         true,
         null,
+        null,
         null
     );
     final TestDerbyConnector derbyConnector = derby.getConnector();
diff --git a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java
index bb83c08..4fc7ba1 100644
--- a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java
+++ b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java
@@ -2747,6 +2747,7 @@ public class KinesisIndexTaskTest extends EasyMockSupport
         null,
         true,
         null,
+        null,
         null
     );
     final TestDerbyConnector derbyConnector = derby.getConnector();
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/SegmentLoaderFactory.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/SegmentLoaderFactory.java
index 83fa9db..17b8dc1 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/SegmentLoaderFactory.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/SegmentLoaderFactory.java
@@ -54,7 +54,7 @@ public class SegmentLoaderFactory
     return new SegmentLoaderLocalCacheManager(
         indexIO,
         new SegmentLoaderConfig().withLocations(
-            Collections.singletonList(new StorageLocationConfig().setPath(storageDir))),
+            Collections.singletonList(new StorageLocationConfig(storageDir, null, null))),
         jsonMapper
     );
   }
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/config/TaskConfig.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/config/TaskConfig.java
index 94a7c7d..31405fe 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/config/TaskConfig.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/config/TaskConfig.java
@@ -22,10 +22,13 @@ package org.apache.druid.indexing.common.config;
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.google.common.collect.ImmutableList;
+import org.apache.druid.segment.loading.StorageLocationConfig;
 import org.joda.time.Period;
 
+import javax.annotation.Nullable;
 import java.io.File;
 import java.nio.file.Paths;
+import java.util.Collections;
 import java.util.List;
 
 public class TaskConfig
@@ -35,7 +38,6 @@ public class TaskConfig
   );
 
   private static final Period DEFAULT_DIRECTORY_LOCK_TIMEOUT = new Period("PT10M");
-
   private static final Period DEFAULT_GRACEFUL_SHUTDOWN_TIMEOUT = new Period("PT5M");
 
   @JsonProperty
@@ -62,6 +64,9 @@ public class TaskConfig
   @JsonProperty
   private final Period directoryLockTimeout;
 
+  @JsonProperty
+  private final List<StorageLocationConfig> shuffleDataLocations;
+
   @JsonCreator
   public TaskConfig(
       @JsonProperty("baseDir") String baseDir,
@@ -71,7 +76,8 @@ public class TaskConfig
       @JsonProperty("defaultHadoopCoordinates") List<String> defaultHadoopCoordinates,
       @JsonProperty("restoreTasksOnRestart") boolean restoreTasksOnRestart,
       @JsonProperty("gracefulShutdownTimeout") Period gracefulShutdownTimeout,
-      @JsonProperty("directoryLockTimeout") Period directoryLockTimeout
+      @JsonProperty("directoryLockTimeout") Period directoryLockTimeout,
+      @JsonProperty("shuffleDataLocations") List<StorageLocationConfig> shuffleDataLocations
   )
   {
     this.baseDir = baseDir == null ? System.getProperty("java.io.tmpdir") : baseDir;
@@ -89,6 +95,13 @@ public class TaskConfig
     this.directoryLockTimeout = directoryLockTimeout == null
                                 ? DEFAULT_DIRECTORY_LOCK_TIMEOUT
                                 : directoryLockTimeout;
+    if (shuffleDataLocations == null) {
+      this.shuffleDataLocations = Collections.singletonList(
+          new StorageLocationConfig(new File(defaultDir(null, "intermediary-segments")), null, null)
+      );
+    } else {
+      this.shuffleDataLocations = shuffleDataLocations;
+    }
   }
 
   @JsonProperty
@@ -154,7 +167,13 @@ public class TaskConfig
     return directoryLockTimeout;
   }
 
-  private String defaultDir(String configParameter, final String defaultVal)
+  @JsonProperty
+  public List<StorageLocationConfig> getShuffleDataLocations()
+  {
+    return shuffleDataLocations;
+  }
+
+  private String defaultDir(@Nullable String configParameter, final String defaultVal)
   {
     if (configParameter == null) {
       return Paths.get(getBaseDir(), defaultVal).toString();
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/OverlordResource.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/OverlordResource.java
index 7ea5c8e..b153958 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/OverlordResource.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/OverlordResource.java
@@ -372,9 +372,7 @@ public class OverlordResource
   @Path("/taskStatus")
   @Produces(MediaType.APPLICATION_JSON)
   @ResourceFilters(StateResourceFilter.class)
-  public Response getMultipleTaskStatuses(
-      Set<String> taskIds
-  )
+  public Response getMultipleTaskStatuses(Set<String> taskIds)
   {
     if (taskIds == null || taskIds.size() == 0) {
       return Response.status(Response.Status.BAD_REQUEST).entity("No TaskIds provided.").build();
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/worker/IntermediaryDataManager.java b/indexing-service/src/main/java/org/apache/druid/indexing/worker/IntermediaryDataManager.java
new file mode 100644
index 0000000..bfd202e
--- /dev/null
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/worker/IntermediaryDataManager.java
@@ -0,0 +1,331 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.worker;
+
+import com.google.common.collect.Iterators;
+import com.google.common.io.Files;
+import com.google.inject.Inject;
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.lang3.mutable.MutableInt;
+import org.apache.druid.client.indexing.IndexingServiceClient;
+import org.apache.druid.client.indexing.TaskStatus;
+import org.apache.druid.guice.ManageLifecycle;
+import org.apache.druid.indexing.common.config.TaskConfig;
+import org.apache.druid.indexing.worker.config.WorkerConfig;
+import org.apache.druid.java.util.common.DateTimes;
+import org.apache.druid.java.util.common.IOE;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.concurrent.Execs;
+import org.apache.druid.java.util.common.lifecycle.LifecycleStart;
+import org.apache.druid.java.util.common.lifecycle.LifecycleStop;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.segment.loading.StorageLocation;
+import org.apache.druid.timeline.DataSegment;
+import org.joda.time.DateTime;
+import org.joda.time.Interval;
+import org.joda.time.Period;
+
+import javax.annotation.Nullable;
+import java.io.File;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+/**
+ * This class manages intermediary segments for data shuffle between native parallel index tasks.
+ * In native parallel indexing, phase 1 tasks store segment files in local storage of middleManagers
+ * and phase 2 tasks read those files via HTTP.
+ *
+ * The directory where segment files are placed is structured as
+ * {@link StorageLocation#path}/supervisorTaskId/startTimeOfSegment/endTimeOfSegment/partitionIdOfSegment.
+ *
+ * This class provides interfaces to store, find, and remove segment files.
+ * It also has a self-cleanup mechanism to clean up stale segment files. It periodically checks the last access time
+ * per supervisorTask and removes its all segment files if the supervisorTask is not running anymore.
+ */
+@ManageLifecycle
+public class IntermediaryDataManager
+{
+  private static final Logger log = new Logger(IntermediaryDataManager.class);
+
+  private final long intermediaryPartitionDiscoveryPeriodSec;
+  private final long intermediaryPartitionCleanupPeriodSec;
+  private final Period intermediaryPartitionTimeout;
+  private final List<StorageLocation> shuffleDataLocations;
+  private final IndexingServiceClient indexingServiceClient;
+
+  // supervisorTaskId -> time to check supervisorTask status
+  // This time is initialized when a new supervisorTask is found and updated whenever a partition is accessed for
+  // the supervisor.
+  private final ConcurrentHashMap<String, DateTime> supervisorTaskCheckTimes = new ConcurrentHashMap<>();
+
+  // supervisorTaskId -> cyclic iterator of storage locations
+  private final Map<String, Iterator<StorageLocation>> locationIterators = new HashMap<>();
+
+  // The overlord is supposed to send a cleanup request as soon as the supervisorTask is finished in parallel indexing,
+  // but middleManager or indexer could miss the request. This executor is to automatically clean up unused intermediary
+  // partitions.
+  // This can be null until IntermediaryDataManager is started.
+  @Nullable
+  private ScheduledExecutorService supervisorTaskChecker;
+
+  @Inject
+  public IntermediaryDataManager(
+      WorkerConfig workerConfig,
+      TaskConfig taskConfig,
+      IndexingServiceClient indexingServiceClient
+  )
+  {
+    this.intermediaryPartitionDiscoveryPeriodSec = workerConfig.getIntermediaryPartitionDiscoveryPeriodSec();
+    this.intermediaryPartitionCleanupPeriodSec = workerConfig.getIntermediaryPartitionCleanupPeriodSec();
+    this.intermediaryPartitionTimeout = workerConfig.getIntermediaryPartitionTimeout();
+    this.shuffleDataLocations = taskConfig
+        .getShuffleDataLocations()
+        .stream()
+        .map(config -> new StorageLocation(config.getPath(), config.getMaxSize(), config.getFreeSpacePercent()))
+        .collect(Collectors.toList());
+    this.indexingServiceClient = indexingServiceClient;
+  }
+
+  @LifecycleStart
+  public void start()
+  {
+    supervisorTaskChecker = Execs.scheduledSingleThreaded("intermediary-data-manager-%d");
+    // Discover partitions for new supervisorTasks
+    supervisorTaskChecker.scheduleAtFixedRate(
+        () -> {
+          try {
+            discoverSupervisorTaskPartitions();
+          }
+          catch (Exception e) {
+            log.warn(e, "Error while discovering supervisorTasks");
+          }
+        },
+        intermediaryPartitionDiscoveryPeriodSec,
+        intermediaryPartitionDiscoveryPeriodSec,
+        TimeUnit.SECONDS
+    );
+
+    supervisorTaskChecker.scheduleAtFixedRate(
+        () -> {
+          try {
+            deleteExpiredSuprevisorTaskPartitionsIfNotRunning();
+          }
+          catch (InterruptedException e) {
+            log.error(e, "Error while cleaning up partitions for expired supervisors");
+          }
+          catch (Exception e) {
+            log.warn(e, "Error while cleaning up partitions for expired supervisors");
+          }
+        },
+        intermediaryPartitionCleanupPeriodSec,
+        intermediaryPartitionCleanupPeriodSec,
+        TimeUnit.SECONDS
+    );
+  }
+
+  @LifecycleStop
+  public void stop() throws InterruptedException
+  {
+    if (supervisorTaskChecker != null) {
+      supervisorTaskChecker.shutdownNow();
+      supervisorTaskChecker.awaitTermination(10, TimeUnit.SECONDS);
+    }
+    supervisorTaskCheckTimes.clear();
+  }
+
+  private void discoverSupervisorTaskPartitions()
+  {
+    for (StorageLocation location : shuffleDataLocations) {
+      final MutableInt numDiscovered = new MutableInt(0);
+      final File[] dirsPerSupervisorTask = location.getPath().listFiles();
+      if (dirsPerSupervisorTask != null) {
+        for (File supervisorTaskDir : dirsPerSupervisorTask) {
+          final String supervisorTaskId = supervisorTaskDir.getName();
+          supervisorTaskCheckTimes.computeIfAbsent(
+              supervisorTaskId,
+              k -> {
+                numDiscovered.increment();
+                return DateTimes.nowUtc().plus(intermediaryPartitionTimeout);
+              }
+          );
+        }
+      }
+      log.info("Discovered partitions for [%s] new supervisor tasks", numDiscovered.getValue());
+    }
+  }
+
+  /**
+   * Check supervisorTask status if its partitions have not been accessed in timeout and
+   * delete all partitions for the supervisorTask if it is already finished.
+   *
+   * Note that the overlord sends a cleanup request when a supervisorTask is finished. The below check is to trigger
+   * the self-cleanup for when the cleanup request is missing.
+   */
+  private void deleteExpiredSuprevisorTaskPartitionsIfNotRunning() throws InterruptedException
+  {
+    final DateTime now = DateTimes.nowUtc();
+    final Set<String> expiredSupervisorTasks = new HashSet<>();
+    for (Entry<String, DateTime> entry : supervisorTaskCheckTimes.entrySet()) {
+      final String supervisorTaskId = entry.getKey();
+      final DateTime checkTime = entry.getValue();
+      if (checkTime.isAfter(now)) {
+        expiredSupervisorTasks.add(supervisorTaskId);
+      }
+    }
+
+    log.info("Found [%s] expired supervisor tasks", expiredSupervisorTasks.size());
+
+    final Map<String, TaskStatus> taskStatuses = indexingServiceClient.getTaskStatuses(expiredSupervisorTasks);
+    for (Entry<String, TaskStatus> entry : taskStatuses.entrySet()) {
+      final String supervisorTaskId = entry.getKey();
+      final TaskStatus status = entry.getValue();
+      if (status.getStatusCode().isComplete()) {
+        // If it's finished, clean up all partitions for the supervisor task.
+        try {
+          deletePartitions(supervisorTaskId);
+        }
+        catch (IOException e) {
+          log.warn(e, "Failed to delete partitions for task[%s]", supervisorTaskId);
+        }
+      } else {
+        // If it's still running, update last access time.
+        supervisorTaskCheckTimes.put(supervisorTaskId, DateTimes.nowUtc());
+      }
+    }
+  }
+
+  /**
+   * Write a segment into one of configured locations. The location to write is chosen in a round-robin manner per
+   * supervisorTaskId.
+   *
+   * This method is only useful for the new Indexer model. Tasks running in the existing middleManager should the static
+   * addSegment method.
+   */
+  public void addSegment(String supervisorTaskId, String subTaskId, DataSegment segment, File segmentFile)
+      throws IOException
+  {
+    final Iterator<StorageLocation> iterator = locationIterators.computeIfAbsent(
+        supervisorTaskId,
+        k -> Iterators.cycle(shuffleDataLocations)
+    );
+    addSegment(iterator, shuffleDataLocations.size(), supervisorTaskId, subTaskId, segment, segmentFile);
+  }
+
+  public List<File> findPartitionFiles(String supervisorTaskId, Interval interval, int partitionId)
+  {
+    for (StorageLocation location : shuffleDataLocations) {
+      final File partitionDir = getPartitionDir(location, supervisorTaskId, interval, partitionId);
+      if (partitionDir.exists()) {
+        supervisorTaskCheckTimes.put(supervisorTaskId, DateTimes.nowUtc());
+        final File[] segmentFiles = partitionDir.listFiles();
+        return segmentFiles == null ? Collections.emptyList() : Arrays.asList(segmentFiles);
+      }
+    }
+
+    return Collections.emptyList();
+  }
+
+  public void deletePartitions(String supervisorTaskId) throws IOException
+  {
+    for (StorageLocation location : shuffleDataLocations) {
+      final File supervisorTaskPath = new File(location.getPath(), supervisorTaskId);
+      if (supervisorTaskPath.exists()) {
+        log.info("Cleaning up [%s]", supervisorTaskPath);
+        for (File eachFile : FileUtils.listFiles(supervisorTaskPath, null, true)) {
+          location.removeFile(eachFile);
+        }
+        FileUtils.forceDelete(supervisorTaskPath);
+      }
+    }
+    supervisorTaskCheckTimes.remove(supervisorTaskId);
+  }
+
+  /**
+   * Iterate through the given storage locations to find one which can handle the given segment.
+   */
+  public static void addSegment(
+      Iterator<StorageLocation> cyclicIterator,
+      int numLocations,
+      String supervisorTaskId,
+      String subTaskId,
+      DataSegment segment,
+      File segmentFile
+  ) throws IOException
+  {
+    final StorageLocation location = findLocationForSegment(cyclicIterator, numLocations, segment);
+    final File destFile = new File(
+        getPartitionDir(location, supervisorTaskId, segment.getInterval(), segment.getShardSpec().getPartitionNum()),
+        subTaskId
+    );
+    FileUtils.forceMkdirParent(destFile);
+    final long copiedBytes = Files.asByteSource(segmentFile).copyTo(Files.asByteSink(destFile));
+    if (copiedBytes == 0) {
+      throw new IOE(
+          "0 bytes copied after copying a segment file from [%s] to [%s]",
+          segmentFile.getAbsolutePath(),
+          destFile.getAbsolutePath()
+      );
+    }
+    location.addFile(destFile);
+  }
+
+  private static StorageLocation findLocationForSegment(
+      Iterator<StorageLocation> cyclicIterator,
+      int numLocations,
+      DataSegment segment
+  )
+  {
+    for (int i = 0; i < numLocations; i++) {
+      final StorageLocation location = cyclicIterator.next();
+      if (location.canHandle(segment)) {
+        return location;
+      }
+    }
+    throw new ISE("Can't find location to handle segment[%s]", segment);
+  }
+
+  private static File getPartitionDir(
+      StorageLocation location,
+      String supervisorTaskId,
+      Interval interval,
+      int partitionId
+  )
+  {
+    return FileUtils.getFile(
+        location.getPath(),
+        supervisorTaskId,
+        interval.getStart().toString(),
+        interval.getEnd().toString(),
+        String.valueOf(partitionId)
+    );
+  }
+}
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/worker/config/WorkerConfig.java b/indexing-service/src/main/java/org/apache/druid/indexing/worker/config/WorkerConfig.java
index e574a0d..65d09e7 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/worker/config/WorkerConfig.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/worker/config/WorkerConfig.java
@@ -22,6 +22,7 @@ package org.apache.druid.indexing.worker.config;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import org.apache.druid.server.DruidNode;
 import org.apache.druid.utils.JvmUtils;
+import org.joda.time.Period;
 
 import javax.validation.constraints.Min;
 import javax.validation.constraints.NotNull;
@@ -32,15 +33,24 @@ public class WorkerConfig
 {
   @JsonProperty
   @NotNull
-  private String ip = DruidNode.getDefaultHost();
+  private final String ip = DruidNode.getDefaultHost();
 
   @JsonProperty
   @NotNull
-  private String version = "0";
+  private final String version = "0";
 
   @JsonProperty
   @Min(1)
-  private int capacity = Math.max(1, JvmUtils.getRuntimeInfo().getAvailableProcessors() - 1);
+  private final int capacity = Math.max(1, JvmUtils.getRuntimeInfo().getAvailableProcessors() - 1);
+
+  @JsonProperty
+  private final long intermediaryPartitionDiscoveryPeriodSec = 60L;
+
+  @JsonProperty
+  private final long intermediaryPartitionCleanupPeriodSec = 300L;
+
+  @JsonProperty
+  private final Period intermediaryPartitionTimeout = new Period("P1D");
 
   public String getIp()
   {
@@ -56,4 +66,19 @@ public class WorkerConfig
   {
     return capacity;
   }
+
+  public long getIntermediaryPartitionDiscoveryPeriodSec()
+  {
+    return intermediaryPartitionDiscoveryPeriodSec;
+  }
+
+  public long getIntermediaryPartitionCleanupPeriodSec()
+  {
+    return intermediaryPartitionCleanupPeriodSec;
+  }
+
+  public Period getIntermediaryPartitionTimeout()
+  {
+    return intermediaryPartitionTimeout;
+  }
 }
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/worker/http/ShuffleResource.java b/indexing-service/src/main/java/org/apache/druid/indexing/worker/http/ShuffleResource.java
new file mode 100644
index 0000000..9bf197c
--- /dev/null
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/worker/http/ShuffleResource.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.worker.http;
+
+import com.google.common.io.ByteStreams;
+import com.google.inject.Inject;
+import com.sun.jersey.spi.container.ResourceFilters;
+import org.apache.druid.indexing.worker.IntermediaryDataManager;
+import org.apache.druid.java.util.common.DateTimes;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.server.http.security.StateResourceFilter;
+import org.joda.time.Interval;
+
+import javax.ws.rs.DELETE;
+import javax.ws.rs.GET;
+import javax.ws.rs.Path;
+import javax.ws.rs.PathParam;
+import javax.ws.rs.Produces;
+import javax.ws.rs.QueryParam;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+import javax.ws.rs.core.Response.Status;
+import javax.ws.rs.core.StreamingOutput;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * HTTP endpoints for shuffle system. The MiddleManager and Indexer use this resource to serve intermediary shuffle
+ * data.
+ *
+ * We use {@link StateResourceFilter} here because it performs an admin-like authorization and
+ * all endpoints here are supposed to be used for only internal communcation.
+ * Another possible alternate could be performing datasource-level authorization as in TaskResourceFilter.
+ * However, datasource information is not available in middleManagers or indexers yet which makes hard to use it.
+ * We could develop a new ResourceFileter in the future if needed.
+ */
+@Path("/druid/worker/v1/shuffle")
+@ResourceFilters(StateResourceFilter.class)
+public class ShuffleResource
+{
+  private static final Logger log = new Logger(ShuffleResource.class);
+
+  private final IntermediaryDataManager intermediaryDataManager;
+
+  @Inject
+  public ShuffleResource(IntermediaryDataManager intermediaryDataManager)
+  {
+    this.intermediaryDataManager = intermediaryDataManager;
+  }
+
+  @GET
+  @Path("/task/{supervisorTaskId}/partition")
+  @Produces(MediaType.APPLICATION_OCTET_STREAM)
+  public Response getPartition(
+      @PathParam("supervisorTaskId") String supervisorTaskId,
+      @QueryParam("startTime") String startTime,
+      @QueryParam("endTime") String endTime,
+      @QueryParam("partitionId") int partitionId
+  )
+  {
+    final Interval interval = new Interval(DateTimes.of(startTime), DateTimes.of(endTime));
+    final List<File> partitionFiles = intermediaryDataManager.findPartitionFiles(
+        supervisorTaskId,
+        interval,
+        partitionId
+    );
+
+    if (partitionFiles.isEmpty()) {
+      final String errorMessage = StringUtils.format(
+          "Can't find the partition for supervisor[%s], interval[%s], and partitionId[%s]",
+          supervisorTaskId,
+          interval,
+          partitionId
+      );
+      return Response.status(Status.NOT_FOUND).entity(errorMessage).build();
+    } else {
+      return Response.ok(
+          (StreamingOutput) output -> {
+            for (File partitionFile : partitionFiles) {
+              try (final FileInputStream fileInputStream = new FileInputStream(partitionFile)) {
+                ByteStreams.copy(fileInputStream, output);
+              }
+            }
+          }
+      ).build();
+    }
+  }
+
+  @DELETE
+  @Path("/task/{supervisorTaskId}")
+  public Response deletePartitions(@PathParam("supervisorTaskId") String supervisorTaskId)
+  {
+    try {
+      intermediaryDataManager.deletePartitions(supervisorTaskId);
+      return Response.ok(supervisorTaskId).build();
+    }
+    catch (IOException e) {
+      log.error(e, "Error while deleting partitions of supervisorTask[%s]", supervisorTaskId);
+      return Response.status(Status.INTERNAL_SERVER_ERROR).entity(e.getMessage()).build();
+    }
+  }
+}
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/TaskToolboxTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/TaskToolboxTest.java
index 0966d1b..0575afe 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/common/TaskToolboxTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/TaskToolboxTest.java
@@ -95,7 +95,7 @@ public class TaskToolboxTest
     EasyMock.replay(task, mockHandoffNotifierFactory);
 
     taskToolbox = new TaskToolboxFactory(
-        new TaskConfig(temporaryFolder.newFile().toString(), null, null, 50000, null, false, null, null),
+        new TaskConfig(temporaryFolder.newFile().toString(), null, null, 50000, null, false, null, null, null),
         mockTaskActionClientFactory,
         mockEmitter,
         mockSegmentPusher,
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTaskTest.java
index d675069..7628dea 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTaskTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTaskTest.java
@@ -1519,7 +1519,7 @@ public class AppenderatorDriverRealtimeIndexTaskTest
         return result;
       }
     };
-    final TaskConfig taskConfig = new TaskConfig(directory.getPath(), null, null, 50000, null, true, null, null);
+    final TaskConfig taskConfig = new TaskConfig(directory.getPath(), null, null, 50000, null, true, null, null, null);
 
     final TaskActionToolbox taskActionToolbox = new TaskActionToolbox(
         taskLockbox,
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskRunTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskRunTest.java
index f157513..9e5ab4c 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskRunTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskRunTest.java
@@ -346,16 +346,7 @@ public class CompactionTaskRunTest extends IngestionTestBase
           @Override
           public List<StorageLocationConfig> getLocations()
           {
-            return ImmutableList.of(
-                new StorageLocationConfig()
-                {
-                  @Override
-                  public File getPath()
-                  {
-                    return deepStorageDir;
-                  }
-                }
-            );
+            return ImmutableList.of(new StorageLocationConfig(deepStorageDir, null, null));
           }
         },
         objectMapper
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/HadoopTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/HadoopTaskTest.java
index c77f063..64d3dd8 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/HadoopTaskTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/HadoopTaskTest.java
@@ -78,6 +78,7 @@ public class HadoopTaskTest
         ImmutableList.of("something:hadoop:1"),
         false,
         null,
+        null,
         null
     )).once();
     EasyMock.replay(toolbox);
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexTaskTest.java
index 4bae4c8..a8f02e6 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexTaskTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexTaskTest.java
@@ -204,7 +204,7 @@ public class IndexTaskTest
           public List<StorageLocationConfig> getLocations()
           {
             return Collections.singletonList(
-                new StorageLocationConfig().setPath(cacheDir)
+                new StorageLocationConfig(cacheDir, null, null)
             );
           }
         },
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/RealtimeIndexTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/RealtimeIndexTaskTest.java
index daeeaab..316327b 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/RealtimeIndexTaskTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/RealtimeIndexTaskTest.java
@@ -881,7 +881,7 @@ public class RealtimeIndexTaskTest
       final File directory
   )
   {
-    final TaskConfig taskConfig = new TaskConfig(directory.getPath(), null, null, 50000, null, true, null, null);
+    final TaskConfig taskConfig = new TaskConfig(directory.getPath(), null, null, 50000, null, true, null, null, null);
     final TaskLockbox taskLockbox = new TaskLockbox(taskStorage);
     try {
       taskStorage.insert(task, TaskStatus.running(task.getId()));
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/firehose/IngestSegmentFirehoseFactoryTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/firehose/IngestSegmentFirehoseFactoryTest.java
index 70e5544..f17274a 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/firehose/IngestSegmentFirehoseFactoryTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/firehose/IngestSegmentFirehoseFactoryTest.java
@@ -84,10 +84,14 @@ import org.apache.druid.timeline.partition.PartitionChunk;
 import org.apache.druid.timeline.partition.PartitionHolder;
 import org.easymock.EasyMock;
 import org.joda.time.Interval;
+import org.junit.After;
 import org.junit.AfterClass;
 import org.junit.Assert;
+import org.junit.Before;
 import org.junit.BeforeClass;
+import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 
@@ -115,6 +119,9 @@ public class IngestSegmentFirehoseFactoryTest
   private static final TaskLockbox TASK_LOCKBOX;
   private static final Task TASK;
 
+  @Rule
+  public TemporaryFolder temporaryFolder = new TemporaryFolder();
+
   static {
     TestUtils testUtils = new TestUtils();
     MAPPER = setupInjectablesInObjectMapper(TestHelper.makeJsonMapper());
@@ -299,6 +306,7 @@ public class IngestSegmentFirehoseFactoryTest
 
   private final FirehoseFactory<InputRowParser> factory;
   private final InputRowParser rowParser;
+  private File tempDir;
 
   private static final InputRowParser<Map<String, Object>> ROW_PARSER = new MapInputRowParser(
       new TimeAndDimsParseSpec(
@@ -376,6 +384,18 @@ public class IngestSegmentFirehoseFactoryTest
     }
   }
 
+  @Before
+  public void setup() throws IOException
+  {
+    tempDir = temporaryFolder.newFolder();
+  }
+
+  @After
+  public void teardown()
+  {
+    tempDir.delete();
+  }
+
   @Test
   public void sanityTest()
   {
@@ -402,7 +422,7 @@ public class IngestSegmentFirehoseFactoryTest
   {
     Assert.assertEquals(MAX_SHARD_NUMBER.longValue(), segmentSet.size());
     Integer rowcount = 0;
-    try (final Firehose firehose = factory.connect(rowParser, null)) {
+    try (final Firehose firehose = factory.connect(rowParser, tmpDir)) {
       while (firehose.hasMore()) {
         InputRow row = firehose.nextRow();
         Assert.assertArrayEquals(new String[]{DIM_NAME}, row.getDimensions().toArray());
@@ -432,7 +452,7 @@ public class IngestSegmentFirehoseFactoryTest
     );
     int skipped = 0;
     try (final Firehose firehose =
-             factory.connect(transformSpec.decorate(rowParser), null)) {
+             factory.connect(transformSpec.decorate(rowParser), tmpDir)) {
       while (firehose.hasMore()) {
         InputRow row = firehose.nextRow();
         if (row == null) {
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/firehose/IngestSegmentFirehoseFactoryTimelineTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/firehose/IngestSegmentFirehoseFactoryTimelineTest.java
index 119097b..4944d27 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/firehose/IngestSegmentFirehoseFactoryTimelineTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/firehose/IngestSegmentFirehoseFactoryTimelineTest.java
@@ -148,7 +148,7 @@ public class IngestSegmentFirehoseFactoryTimelineTest
     int count = 0;
     long sum = 0;
 
-    try (final Firehose firehose = factory.connect(ROW_PARSER, null)) {
+    try (final Firehose firehose = factory.connect(ROW_PARSER, tmpDir)) {
       while (firehose.hasMore()) {
         final InputRow row = firehose.nextRow();
         count++;
@@ -176,7 +176,7 @@ public class IngestSegmentFirehoseFactoryTimelineTest
     for (InputSplit<List<WindowedSegmentId>> split : splits) {
       final FiniteFirehoseFactory<InputRowParser, List<WindowedSegmentId>> splitFactory =
           factory.withSplit(split);
-      try (final Firehose firehose = splitFactory.connect(ROW_PARSER, null)) {
+      try (final Firehose firehose = splitFactory.connect(ROW_PARSER, tmpDir)) {
         while (firehose.hasMore()) {
           final InputRow row = firehose.nextRow();
           count++;
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/SingleTaskBackgroundRunnerTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/SingleTaskBackgroundRunnerTest.java
index 49315d3..a16a80a 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/SingleTaskBackgroundRunnerTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/SingleTaskBackgroundRunnerTest.java
@@ -76,6 +76,7 @@ public class SingleTaskBackgroundRunnerTest
         null,
         true,
         null,
+        null,
         null
     );
     final ServiceEmitter emitter = new NoopServiceEmitter();
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java
index d8c0af3..8139f8d 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java
@@ -540,7 +540,7 @@ public class TaskLifecycleTest
         new TaskAuditLogConfig(true)
     );
     File tmpDir = temporaryFolder.newFolder();
-    taskConfig = new TaskConfig(tmpDir.toString(), null, null, 50000, null, false, null, null);
+    taskConfig = new TaskConfig(tmpDir.toString(), null, null, 50000, null, false, null, null, null);
 
     SegmentLoaderConfig segmentLoaderConfig = new SegmentLoaderConfig()
     {
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/worker/IntermediaryDataManagerAutoCleanupTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/worker/IntermediaryDataManagerAutoCleanupTest.java
new file mode 100644
index 0000000..c158c0e
--- /dev/null
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/worker/IntermediaryDataManagerAutoCleanupTest.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.worker;
+
+import com.amazonaws.util.StringUtils;
+import com.google.common.collect.ImmutableList;
+import org.apache.commons.io.FileUtils;
+import org.apache.druid.client.indexing.IndexingServiceClient;
+import org.apache.druid.client.indexing.NoopIndexingServiceClient;
+import org.apache.druid.client.indexing.TaskStatus;
+import org.apache.druid.indexer.TaskState;
+import org.apache.druid.indexing.common.config.TaskConfig;
+import org.apache.druid.indexing.worker.config.WorkerConfig;
+import org.apache.druid.java.util.common.Intervals;
+import org.apache.druid.segment.loading.StorageLocationConfig;
+import org.apache.druid.timeline.DataSegment;
+import org.apache.druid.timeline.partition.NumberedShardSpec;
+import org.joda.time.Interval;
+import org.joda.time.Period;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+public class IntermediaryDataManagerAutoCleanupTest
+{
+  @Rule
+  public TemporaryFolder tempDir = new TemporaryFolder();
+
+  private IntermediaryDataManager intermediaryDataManager;
+
+  @Before
+  public void setup() throws IOException
+  {
+    final WorkerConfig workerConfig = new WorkerConfig()
+    {
+      @Override
+      public long getIntermediaryPartitionDiscoveryPeriodSec()
+      {
+        return 1;
+      }
+
+      @Override
+      public long getIntermediaryPartitionCleanupPeriodSec()
+      {
+        return 2;
+      }
+
+      @Override
+      public Period getIntermediaryPartitionTimeout()
+      {
+        return new Period("PT2S");
+      }
+
+    };
+    final TaskConfig taskConfig = new TaskConfig(
+        null,
+        null,
+        null,
+        null,
+        null,
+        false,
+        null,
+        null,
+        ImmutableList.of(new StorageLocationConfig(tempDir.newFolder(), null, null))
+    );
+    final IndexingServiceClient indexingServiceClient = new NoopIndexingServiceClient()
+    {
+      @Override
+      public Map<String, TaskStatus> getTaskStatuses(Set<String> taskIds)
+      {
+        final Map<String, TaskStatus> result = new HashMap<>();
+        for (String taskId : taskIds) {
+          result.put(taskId, new TaskStatus(taskId, TaskState.SUCCESS, 10));
+        }
+        return result;
+      }
+    };
+    intermediaryDataManager = new IntermediaryDataManager(workerConfig, taskConfig, indexingServiceClient);
+    intermediaryDataManager.start();
+  }
+
+  @After
+  public void teardown() throws InterruptedException
+  {
+    intermediaryDataManager.stop();
+  }
+
+  @Test
+  public void testCleanup() throws IOException, InterruptedException
+  {
+    final String supervisorTaskId = "supervisorTaskId";
+    final Interval interval = Intervals.of("2018/2019");
+    final File segmentFile = generateSegmentFile();
+    final DataSegment segment = newSegment(interval, 0);
+    intermediaryDataManager.addSegment(supervisorTaskId, "subTaskId", segment, segmentFile);
+
+    Thread.sleep(8000);
+    Assert.assertTrue(intermediaryDataManager.findPartitionFiles(supervisorTaskId, interval, 0).isEmpty());
+  }
+
+  private File generateSegmentFile() throws IOException
+  {
+    final File segmentFile = tempDir.newFile();
+    FileUtils.write(segmentFile, "test data.", StringUtils.UTF8);
+    return segmentFile;
+  }
+
+  private DataSegment newSegment(Interval interval, int partitionId)
+  {
+    return new DataSegment(
+        "dataSource",
+        interval,
+        "version",
+        null,
+        null,
+        null,
+        new NumberedShardSpec(partitionId, 0),
+        9,
+        10
+    );
+  }
+}
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/worker/IntermediaryDataManagerManualAddAndDeleteTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/worker/IntermediaryDataManagerManualAddAndDeleteTest.java
new file mode 100644
index 0000000..7bd9e90
--- /dev/null
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/worker/IntermediaryDataManagerManualAddAndDeleteTest.java
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.worker;
+
+import com.amazonaws.util.StringUtils;
+import com.google.common.collect.ImmutableList;
+import org.apache.commons.io.FileUtils;
+import org.apache.druid.client.indexing.IndexingServiceClient;
+import org.apache.druid.client.indexing.NoopIndexingServiceClient;
+import org.apache.druid.indexing.common.config.TaskConfig;
+import org.apache.druid.indexing.worker.config.WorkerConfig;
+import org.apache.druid.java.util.common.Intervals;
+import org.apache.druid.segment.loading.StorageLocationConfig;
+import org.apache.druid.timeline.DataSegment;
+import org.apache.druid.timeline.partition.NumberedShardSpec;
+import org.joda.time.Interval;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Comparator;
+import java.util.List;
+
+public class IntermediaryDataManagerManualAddAndDeleteTest
+{
+  @Rule
+  public TemporaryFolder tempDir = new TemporaryFolder();
+
+  @Rule
+  public ExpectedException expectedException = ExpectedException.none();
+
+  private IntermediaryDataManager intermediaryDataManager;
+
+  @Before
+  public void setup() throws IOException
+  {
+    final WorkerConfig workerConfig = new WorkerConfig();
+    final TaskConfig taskConfig = new TaskConfig(
+        null,
+        null,
+        null,
+        null,
+        null,
+        false,
+        null,
+        null,
+        ImmutableList.of(new StorageLocationConfig(tempDir.newFolder(), 150L, null))
+    );
+    final IndexingServiceClient indexingServiceClient = new NoopIndexingServiceClient();
+    intermediaryDataManager = new IntermediaryDataManager(workerConfig, taskConfig, indexingServiceClient);
+    intermediaryDataManager.start();
+  }
+
+  @After
+  public void teardown() throws InterruptedException
+  {
+    intermediaryDataManager.stop();
+  }
+
+  @Test
+  public void testAddSegmentFailure() throws IOException
+  {
+    for (int i = 0; i < 15; i++) {
+      File segmentFile = generateSegmentFile();
+      DataSegment segment = newSegment(Intervals.of("2018/2019"), i);
+      intermediaryDataManager.addSegment("supervisorTaskId", "subTaskId", segment, segmentFile);
+    }
+    expectedException.expect(IllegalStateException.class);
+    expectedException.expectMessage("Can't find location to handle segment");
+    File segmentFile = generateSegmentFile();
+    DataSegment segment = newSegment(Intervals.of("2018/2019"), 16);
+    intermediaryDataManager.addSegment("supervisorTaskId", "subTaskId", segment, segmentFile);
+  }
+
+  @Test
+  public void testFindPartitionFiles() throws IOException
+  {
+    final String supervisorTaskId = "supervisorTaskId";
+    final Interval interval = Intervals.of("2018/2019");
+    final int partitionId = 0;
+    for (int i = 0; i < 10; i++) {
+      final File segmentFile = generateSegmentFile();
+      final DataSegment segment = newSegment(interval, partitionId);
+      intermediaryDataManager.addSegment(supervisorTaskId, "subTaskId_" + i, segment, segmentFile);
+    }
+    final List<File> files = intermediaryDataManager.findPartitionFiles(supervisorTaskId, interval, partitionId);
+    Assert.assertEquals(10, files.size());
+    files.sort(Comparator.comparing(File::getName));
+    for (int i = 0; i < 10; i++) {
+      Assert.assertEquals("subTaskId_" + i, files.get(i).getName());
+    }
+  }
+
+  @Test
+  public void deletePartitions() throws IOException
+  {
+    final String supervisorTaskId = "supervisorTaskId";
+    final Interval interval = Intervals.of("2018/2019");
+    for (int partitionId = 0; partitionId < 5; partitionId++) {
+      for (int subTaskId = 0; subTaskId < 3; subTaskId++) {
+        final File segmentFile = generateSegmentFile();
+        final DataSegment segment = newSegment(interval, partitionId);
+        intermediaryDataManager.addSegment(supervisorTaskId, "subTaskId_" + subTaskId, segment, segmentFile);
+      }
+    }
+
+    intermediaryDataManager.deletePartitions(supervisorTaskId);
+
+    for (int partitionId = 0; partitionId < 5; partitionId++) {
+      Assert.assertTrue(intermediaryDataManager.findPartitionFiles(supervisorTaskId, interval, partitionId).isEmpty());
+    }
+  }
+
+  @Test
+  public void testAddRemoveAdd() throws IOException
+  {
+    final String supervisorTaskId = "supervisorTaskId";
+    final Interval interval = Intervals.of("2018/2019");
+    for (int i = 0; i < 15; i++) {
+      File segmentFile = generateSegmentFile();
+      DataSegment segment = newSegment(interval, i);
+      intermediaryDataManager.addSegment("supervisorTaskId", "subTaskId", segment, segmentFile);
+    }
+    intermediaryDataManager.deletePartitions(supervisorTaskId);
+    File segmentFile = generateSegmentFile();
+    DataSegment segment = newSegment(interval, 16);
+    intermediaryDataManager.addSegment(supervisorTaskId, "subTaskId", segment, segmentFile);
+  }
+
+  private File generateSegmentFile() throws IOException
+  {
+    final File segmentFile = tempDir.newFile();
+    FileUtils.write(segmentFile, "test data.", StringUtils.UTF8);
+    return segmentFile;
+  }
+
+  private DataSegment newSegment(Interval interval, int partitionId)
+  {
+    return new DataSegment(
+        "dataSource",
+        interval,
+        "version",
+        null,
+        null,
+        null,
+        new NumberedShardSpec(partitionId, 0),
+        9,
+        10
+    );
+  }
+}
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/worker/WorkerTaskManagerTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/worker/WorkerTaskManagerTest.java
index b86b654..635bf4a 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/worker/WorkerTaskManagerTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/worker/WorkerTaskManagerTest.java
@@ -85,6 +85,7 @@ public class WorkerTaskManagerTest
         null,
         false,
         null,
+        null,
         null
     );
     TaskActionClientFactory taskActionClientFactory = EasyMock.createNiceMock(TaskActionClientFactory.class);
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/worker/WorkerTaskMonitorTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/worker/WorkerTaskMonitorTest.java
index 4afdd3c..8c22aaa 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/worker/WorkerTaskMonitorTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/worker/WorkerTaskMonitorTest.java
@@ -153,6 +153,7 @@ public class WorkerTaskMonitorTest
         null,
         false,
         null,
+        null,
         null
     );
     TaskActionClientFactory taskActionClientFactory = EasyMock.createNiceMock(TaskActionClientFactory.class);
diff --git a/server/src/main/java/org/apache/druid/client/indexing/HttpIndexingServiceClient.java b/server/src/main/java/org/apache/druid/client/indexing/HttpIndexingServiceClient.java
index 7588da7..2b609c2 100644
--- a/server/src/main/java/org/apache/druid/client/indexing/HttpIndexingServiceClient.java
+++ b/server/src/main/java/org/apache/druid/client/indexing/HttpIndexingServiceClient.java
@@ -271,6 +271,27 @@ public class HttpIndexingServiceClient implements IndexingServiceClient
   }
 
   @Override
+  public Map<String, TaskStatus> getTaskStatuses(Set<String> taskIds) throws InterruptedException
+  {
+    try {
+      final FullResponseHolder responseHolder = druidLeaderClient.go(
+          druidLeaderClient.makeRequest(HttpMethod.POST, "/druid/indexer/v1/taskStatus")
+                           .setContent(MediaType.APPLICATION_JSON, jsonMapper.writeValueAsBytes(taskIds))
+      );
+
+      return jsonMapper.readValue(
+          responseHolder.getContent(),
+          new TypeReference<Map<String, TaskStatus>>()
+          {
+          }
+      );
+    }
+    catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  @Override
   @Nullable
   public TaskStatusPlus getLastCompleteTask()
   {
diff --git a/server/src/main/java/org/apache/druid/client/indexing/IndexingServiceClient.java b/server/src/main/java/org/apache/druid/client/indexing/IndexingServiceClient.java
index f2299c9..39fd93f 100644
--- a/server/src/main/java/org/apache/druid/client/indexing/IndexingServiceClient.java
+++ b/server/src/main/java/org/apache/druid/client/indexing/IndexingServiceClient.java
@@ -27,6 +27,7 @@ import org.joda.time.Interval;
 import javax.annotation.Nullable;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 
 public interface IndexingServiceClient
 {
@@ -55,6 +56,8 @@ public interface IndexingServiceClient
 
   TaskStatusResponse getTaskStatus(String taskId);
 
+  Map<String, TaskStatus> getTaskStatuses(Set<String> taskIds) throws InterruptedException;
+
   @Nullable
   TaskStatusPlus getLastCompleteTask();
 
diff --git a/server/src/main/java/org/apache/druid/client/indexing/TaskStatusResponse.java b/server/src/main/java/org/apache/druid/client/indexing/TaskStatusResponse.java
index 08a8519..247ecabd 100644
--- a/server/src/main/java/org/apache/druid/client/indexing/TaskStatusResponse.java
+++ b/server/src/main/java/org/apache/druid/client/indexing/TaskStatusResponse.java
@@ -33,7 +33,7 @@ public class TaskStatusResponse
 {
   private final String task; // Task ID, named "task" in the JSONification of this class.
   @Nullable
-  private final TaskStatusPlus status;
+  private final TaskStatusPlus status; // null for unknown tasks
 
   @JsonCreator
   public TaskStatusResponse(
diff --git a/server/src/main/java/org/apache/druid/segment/loading/SegmentLoaderLocalCacheManager.java b/server/src/main/java/org/apache/druid/segment/loading/SegmentLoaderLocalCacheManager.java
index 327db5d..b25c216 100644
--- a/server/src/main/java/org/apache/druid/segment/loading/SegmentLoaderLocalCacheManager.java
+++ b/server/src/main/java/org/apache/druid/segment/loading/SegmentLoaderLocalCacheManager.java
@@ -163,8 +163,9 @@ public class SegmentLoaderLocalCacheManager implements SegmentLoader
         if (loc == null) {
           loc = loadSegmentWithRetry(segment, storageDir);
         }
-        loc.addSegment(segment);
-        return new File(loc.getPath(), storageDir);
+        final File localStorageDir = new File(loc.getPath(), storageDir);
+        loc.addSegmentDir(localStorageDir, segment);
+        return localStorageDir;
       }
       finally {
         unlock(segment, lock);
@@ -270,7 +271,7 @@ public class SegmentLoaderLocalCacheManager implements SegmentLoader
             // Druid creates folders of the form dataSource/interval/version/partitionNum.
             // We need to clean up all these directories if they are all empty.
             cleanupCacheFiles(location.getPath(), localStorageDir);
-            location.removeSegment(segment);
+            location.removeSegmentDir(localStorageDir, segment);
           }
         }
       }
diff --git a/server/src/main/java/org/apache/druid/segment/loading/StorageLocation.java b/server/src/main/java/org/apache/druid/segment/loading/StorageLocation.java
index e91ffee..4fb4c4e 100644
--- a/server/src/main/java/org/apache/druid/segment/loading/StorageLocation.java
+++ b/server/src/main/java/org/apache/druid/segment/loading/StorageLocation.java
@@ -19,6 +19,8 @@
 
 package org.apache.druid.segment.loading;
 
+import org.apache.commons.io.FileUtils;
+import org.apache.druid.java.util.common.ISE;
 import org.apache.druid.java.util.common.logger.Logger;
 import org.apache.druid.timeline.DataSegment;
 
@@ -29,18 +31,18 @@ import java.util.Set;
 
 /**
 */
-class StorageLocation
+public class StorageLocation
 {
   private static final Logger log = new Logger(StorageLocation.class);
 
   private final File path;
   private final long maxSize;
   private final long freeSpaceToKeep;
-  private final Set<DataSegment> segments;
+  private final Set<File> files = new HashSet<>();
 
   private volatile long currSize = 0;
 
-  StorageLocation(File path, long maxSize, @Nullable Double freeSpacePercent)
+  public StorageLocation(File path, long maxSize, @Nullable Double freeSpacePercent)
   {
     this.path = path;
     this.maxSize = maxSize;
@@ -57,35 +59,62 @@ class StorageLocation
     } else {
       this.freeSpaceToKeep = 0;
     }
-
-    this.segments = new HashSet<>();
   }
 
-  File getPath()
+  public File getPath()
   {
     return path;
   }
 
-  long getMaxSize()
+  public long getMaxSize()
   {
     return maxSize;
   }
 
-  synchronized void addSegment(DataSegment segment)
+  /**
+   * Add a new file to this location. The given file argument must be a file rather than directory.
+   */
+  public synchronized void addFile(File file)
+  {
+    if (file.isDirectory()) {
+      throw new ISE("[%s] must be a file. Use a");
+    }
+    if (files.add(file)) {
+      currSize += FileUtils.sizeOf(file);
+    }
+  }
+
+  /**
+   * Add a new segment dir to this location. The segment size is added to currSize.
+   */
+  public synchronized void addSegmentDir(File segmentDir, DataSegment segment)
   {
-    if (segments.add(segment)) {
+    if (files.add(segmentDir)) {
       currSize += segment.getSize();
     }
   }
 
-  synchronized void removeSegment(DataSegment segment)
+  /**
+   * Remove a segment file from this location. The given file argument must be a file rather than directory.
+   */
+  public synchronized void removeFile(File file)
+  {
+    if (files.remove(file)) {
+      currSize -= FileUtils.sizeOf(file);
+    }
+  }
+
+  /**
+   * Remove a segment dir from this location. The segment size is subtracted from currSize.
+   */
+  public synchronized void removeSegmentDir(File segmentDir, DataSegment segment)
   {
-    if (segments.remove(segment)) {
+    if (files.remove(segmentDir)) {
       currSize -= segment.getSize();
     }
   }
 
-  boolean canHandle(DataSegment segment)
+  public boolean canHandle(DataSegment segment)
   {
     if (available() < segment.getSize()) {
       log.warn(
@@ -114,7 +143,7 @@ class StorageLocation
     return true;
   }
 
-  synchronized long available()
+  public synchronized long available()
   {
     return maxSize - currSize;
   }
diff --git a/server/src/main/java/org/apache/druid/segment/loading/StorageLocationConfig.java b/server/src/main/java/org/apache/druid/segment/loading/StorageLocationConfig.java
index 8877940..bbfd7ed 100644
--- a/server/src/main/java/org/apache/druid/segment/loading/StorageLocationConfig.java
+++ b/server/src/main/java/org/apache/druid/segment/loading/StorageLocationConfig.java
@@ -19,66 +19,66 @@
 
 package org.apache.druid.segment.loading;
 
+import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.base.Preconditions;
 
-import javax.validation.constraints.Min;
-import javax.validation.constraints.NotNull;
+import javax.annotation.Nullable;
 import java.io.File;
 
 /**
  */
 public class StorageLocationConfig
 {
-  @JsonProperty
-  @NotNull
-  private File path = null;
+  private final File path;
+  private final long maxSize;
+  @Nullable
+  private final Double freeSpacePercent;
 
-  @JsonProperty
-  @Min(1)
-  private long maxSize = Long.MAX_VALUE;
+  @JsonCreator
+  public StorageLocationConfig(
+      @JsonProperty("path") File path,
+      @JsonProperty("maxSize") @Nullable Long maxSize,
+      @JsonProperty("freeSpacePercent") @Nullable Double freeSpacePercent
+  )
+  {
+    this.path = Preconditions.checkNotNull(path, "path");
+    this.maxSize = maxSize == null ? Long.MAX_VALUE : maxSize;
+    this.freeSpacePercent = freeSpacePercent;
+    Preconditions.checkArgument(this.maxSize > 0, "maxSize[%s] should be positive", this.maxSize);
+    Preconditions.checkArgument(
+        this.freeSpacePercent == null || this.freeSpacePercent >= 0,
+        "freeSpacePercent[%s] should be 0 or a positive double",
+        this.freeSpacePercent
+    );
+  }
 
   @JsonProperty
-  private Double freeSpacePercent;
-
   public File getPath()
   {
     return path;
   }
 
-  public StorageLocationConfig setPath(File path)
-  {
-    this.path = path;
-    return this;
-  }
-
+  @JsonProperty
   public long getMaxSize()
   {
     return maxSize;
   }
 
-  public StorageLocationConfig setMaxSize(long maxSize)
-  {
-    this.maxSize = maxSize;
-    return this;
-  }
-
+  @JsonProperty
+  @Nullable
   public Double getFreeSpacePercent()
   {
     return freeSpacePercent;
   }
 
-  public StorageLocationConfig setFreeSpacePercent(Double freeSpacePercent)
-  {
-    this.freeSpacePercent = freeSpacePercent;
-    return this;
-  }
-
   @Override
   public String toString()
   {
     return "StorageLocationConfig{" +
            "path=" + path +
            ", maxSize=" + maxSize +
+           ", freeSpacePercent=" + freeSpacePercent +
            '}';
   }
 }
diff --git a/server/src/test/java/org/apache/druid/client/indexing/NoopIndexingServiceClient.java b/server/src/test/java/org/apache/druid/client/indexing/NoopIndexingServiceClient.java
index 8908173..794ea08 100644
--- a/server/src/test/java/org/apache/druid/client/indexing/NoopIndexingServiceClient.java
+++ b/server/src/test/java/org/apache/druid/client/indexing/NoopIndexingServiceClient.java
@@ -28,6 +28,7 @@ import javax.annotation.Nullable;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 
 public class NoopIndexingServiceClient implements IndexingServiceClient
 {
@@ -85,6 +86,12 @@ public class NoopIndexingServiceClient implements IndexingServiceClient
     return new TaskStatusResponse(taskId, null);
   }
 
+  @Override
+  public Map<String, TaskStatus> getTaskStatuses(Set<String> taskIds)
+  {
+    return Collections.emptyMap();
+  }
+
   @Nullable
   @Override
   public TaskStatusPlus getLastCompleteTask()
diff --git a/server/src/test/java/org/apache/druid/segment/loading/SegmentLoaderLocalCacheManagerTest.java b/server/src/test/java/org/apache/druid/segment/loading/SegmentLoaderLocalCacheManagerTest.java
index 5a54f8a..cb38d7b 100644
--- a/server/src/test/java/org/apache/druid/segment/loading/SegmentLoaderLocalCacheManagerTest.java
+++ b/server/src/test/java/org/apache/druid/segment/loading/SegmentLoaderLocalCacheManagerTest.java
@@ -88,9 +88,7 @@ public class SegmentLoaderLocalCacheManagerTest
     localSegmentCacheFolder = tmpFolder.newFolder("segment_cache_folder");
 
     final List<StorageLocationConfig> locations = new ArrayList<>();
-    final StorageLocationConfig locationConfig = new StorageLocationConfig();
-    locationConfig.setPath(localSegmentCacheFolder);
-    locationConfig.setMaxSize(10000000000L);
+    final StorageLocationConfig locationConfig = new StorageLocationConfig(localSegmentCacheFolder, 10000000000L, null);
     locations.add(locationConfig);
 
     manager = new SegmentLoaderLocalCacheManager(
@@ -157,14 +155,10 @@ public class SegmentLoaderLocalCacheManagerTest
     final File localStorageFolder = tmpFolder.newFolder("local_storage_folder");
 
     final List<StorageLocationConfig> locations = new ArrayList<>();
-    final StorageLocationConfig locationConfig = new StorageLocationConfig();
-    locationConfig.setPath(localStorageFolder);
-    locationConfig.setMaxSize(10000000000L);
+    final StorageLocationConfig locationConfig = new StorageLocationConfig(localStorageFolder, 10000000000L, null);
     locations.add(locationConfig);
-    final StorageLocationConfig locationConfig2 = new StorageLocationConfig();
     final File localStorageFolder2 = tmpFolder.newFolder("local_storage_folder2");
-    locationConfig2.setPath(localStorageFolder2);
-    locationConfig2.setMaxSize(1000000000L);
+    final StorageLocationConfig locationConfig2 = new StorageLocationConfig(localStorageFolder2, 1000000000L, null);
     locations.add(locationConfig2);
 
     manager = new SegmentLoaderLocalCacheManager(
@@ -207,17 +201,13 @@ public class SegmentLoaderLocalCacheManagerTest
   public void testRetrySuccessAtSecondLocation() throws Exception
   {
     final List<StorageLocationConfig> locations = new ArrayList<>();
-    final StorageLocationConfig locationConfig = new StorageLocationConfig();
     final File localStorageFolder = tmpFolder.newFolder("local_storage_folder");
     // mock can't write in first location
     localStorageFolder.setWritable(false);
-    locationConfig.setPath(localStorageFolder);
-    locationConfig.setMaxSize(1000000000L);
+    final StorageLocationConfig locationConfig = new StorageLocationConfig(localStorageFolder, 1000000000L, null);
     locations.add(locationConfig);
-    final StorageLocationConfig locationConfig2 = new StorageLocationConfig();
     final File localStorageFolder2 = tmpFolder.newFolder("local_storage_folder2");
-    locationConfig2.setPath(localStorageFolder2);
-    locationConfig2.setMaxSize(10000000L);
+    final StorageLocationConfig locationConfig2 = new StorageLocationConfig(localStorageFolder2, 10000000L, null);
     locations.add(locationConfig2);
 
     manager = new SegmentLoaderLocalCacheManager(
@@ -260,19 +250,15 @@ public class SegmentLoaderLocalCacheManagerTest
   public void testRetryAllFail() throws Exception
   {
     final List<StorageLocationConfig> locations = new ArrayList<>();
-    final StorageLocationConfig locationConfig = new StorageLocationConfig();
     final File localStorageFolder = tmpFolder.newFolder("local_storage_folder");
     // mock can't write in first location
     localStorageFolder.setWritable(false);
-    locationConfig.setPath(localStorageFolder);
-    locationConfig.setMaxSize(1000000000L);
+    final StorageLocationConfig locationConfig = new StorageLocationConfig(localStorageFolder, 1000000000L, null);
     locations.add(locationConfig);
-    final StorageLocationConfig locationConfig2 = new StorageLocationConfig();
     final File localStorageFolder2 = tmpFolder.newFolder("local_storage_folder2");
     // mock can't write in second location
     localStorageFolder2.setWritable(false);
-    locationConfig2.setPath(localStorageFolder2);
-    locationConfig2.setMaxSize(10000000L);
+    final StorageLocationConfig locationConfig2 = new StorageLocationConfig(localStorageFolder2, 10000000L, null);
     locations.add(locationConfig2);
 
     manager = new SegmentLoaderLocalCacheManager(
@@ -316,17 +302,13 @@ public class SegmentLoaderLocalCacheManagerTest
   public void testEmptyToFullOrder() throws Exception
   {
     final List<StorageLocationConfig> locations = new ArrayList<>();
-    final StorageLocationConfig locationConfig = new StorageLocationConfig();
     final File localStorageFolder = tmpFolder.newFolder("local_storage_folder");
     localStorageFolder.setWritable(true);
-    locationConfig.setPath(localStorageFolder);
-    locationConfig.setMaxSize(10L);
+    final StorageLocationConfig locationConfig = new StorageLocationConfig(localStorageFolder, 10L, null);
     locations.add(locationConfig);
-    final StorageLocationConfig locationConfig2 = new StorageLocationConfig();
     final File localStorageFolder2 = tmpFolder.newFolder("local_storage_folder2");
     localStorageFolder2.setWritable(true);
-    locationConfig2.setPath(localStorageFolder2);
-    locationConfig2.setMaxSize(10L);
+    final StorageLocationConfig locationConfig2 = new StorageLocationConfig(localStorageFolder2, 10L, null);
     locations.add(locationConfig2);
 
     manager = new SegmentLoaderLocalCacheManager(
diff --git a/server/src/test/java/org/apache/druid/segment/loading/StorageLocationTest.java b/server/src/test/java/org/apache/druid/segment/loading/StorageLocationTest.java
index cc6b6fe..cdfcd47 100644
--- a/server/src/test/java/org/apache/druid/segment/loading/StorageLocationTest.java
+++ b/server/src/test/java/org/apache/druid/segment/loading/StorageLocationTest.java
@@ -71,25 +71,25 @@ public class StorageLocationTest
 
     final DataSegment secondSegment = makeSegment("2012-01-02/2012-01-03", 23);
 
-    loc.addSegment(makeSegment("2012-01-01/2012-01-02", 10));
+    loc.addSegmentDir(new File("test1"), makeSegment("2012-01-01/2012-01-02", 10));
     expectedAvail -= 10;
     verifyLoc(expectedAvail, loc);
 
-    loc.addSegment(makeSegment("2012-01-01/2012-01-02", 10));
+    loc.addSegmentDir(new File("test1"), makeSegment("2012-01-01/2012-01-02", 10));
     verifyLoc(expectedAvail, loc);
 
-    loc.addSegment(secondSegment);
+    loc.addSegmentDir(new File("test2"), secondSegment);
     expectedAvail -= 23;
     verifyLoc(expectedAvail, loc);
 
-    loc.removeSegment(makeSegment("2012-01-01/2012-01-02", 10));
+    loc.removeSegmentDir(new File("test1"), makeSegment("2012-01-01/2012-01-02", 10));
     expectedAvail += 10;
     verifyLoc(expectedAvail, loc);
 
-    loc.removeSegment(makeSegment("2012-01-01/2012-01-02", 10));
+    loc.removeSegmentDir(new File("test1"), makeSegment("2012-01-01/2012-01-02", 10));
     verifyLoc(expectedAvail, loc);
 
-    loc.removeSegment(secondSegment);
+    loc.removeSegmentDir(new File("test2"), secondSegment);
     expectedAvail += 23;
     verifyLoc(expectedAvail, loc);
   }
diff --git a/server/src/test/java/org/apache/druid/server/SegmentManagerThreadSafetyTest.java b/server/src/test/java/org/apache/druid/server/SegmentManagerThreadSafetyTest.java
index 649c8b2..fdd89d1 100644
--- a/server/src/test/java/org/apache/druid/server/SegmentManagerThreadSafetyTest.java
+++ b/server/src/test/java/org/apache/druid/server/SegmentManagerThreadSafetyTest.java
@@ -105,7 +105,7 @@ public class SegmentManagerThreadSafetyTest
           public List<StorageLocationConfig> getLocations()
           {
             return Collections.singletonList(
-                new StorageLocationConfig().setPath(segmentCacheDir)
+                new StorageLocationConfig(segmentCacheDir, null, null)
             );
           }
         },
diff --git a/services/src/main/java/org/apache/druid/cli/CliMiddleManager.java b/services/src/main/java/org/apache/druid/cli/CliMiddleManager.java
index eeb8ae1..68d374c 100644
--- a/services/src/main/java/org/apache/druid/cli/CliMiddleManager.java
+++ b/services/src/main/java/org/apache/druid/cli/CliMiddleManager.java
@@ -29,6 +29,7 @@ import com.google.inject.multibindings.MapBinder;
 import com.google.inject.name.Names;
 import com.google.inject.util.Providers;
 import io.airlift.airline.Command;
+import org.apache.druid.client.indexing.HttpIndexingServiceClient;
 import org.apache.druid.client.indexing.IndexingServiceClient;
 import org.apache.druid.discovery.NodeType;
 import org.apache.druid.discovery.WorkerNodeService;
@@ -53,6 +54,7 @@ import org.apache.druid.indexing.worker.Worker;
 import org.apache.druid.indexing.worker.WorkerCuratorCoordinator;
 import org.apache.druid.indexing.worker.WorkerTaskMonitor;
 import org.apache.druid.indexing.worker.config.WorkerConfig;
+import org.apache.druid.indexing.worker.http.ShuffleResource;
 import org.apache.druid.indexing.worker.http.TaskManagementResource;
 import org.apache.druid.indexing.worker.http.WorkerResource;
 import org.apache.druid.java.util.common.logger.Logger;
@@ -101,7 +103,7 @@ public class CliMiddleManager extends ServerRunnable
             binder.bind(TaskRunner.class).to(ForkingTaskRunner.class);
             binder.bind(ForkingTaskRunner.class).in(LazySingleton.class);
 
-            binder.bind(IndexingServiceClient.class).toProvider(Providers.of(null));
+            binder.bind(IndexingServiceClient.class).to(HttpIndexingServiceClient.class).in(LazySingleton.class);
             binder.bind(new TypeLiteral<IndexTaskClientFactory<ParallelIndexTaskClient>>() {})
                   .toProvider(Providers.of(null));
             binder.bind(ChatHandlerProvider.class).toProvider(Providers.of(null));
@@ -129,6 +131,7 @@ public class CliMiddleManager extends ServerRunnable
                   .in(LazySingleton.class);
             Jerseys.addResource(binder, WorkerResource.class);
             Jerseys.addResource(binder, TaskManagementResource.class);
+            Jerseys.addResource(binder, ShuffleResource.class);
 
             LifecycleModule.register(binder, Server.class);
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org