You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by ch...@apache.org on 2023/05/12 23:51:07 UTC

[druid] branch master updated: Be able to load segments on Peons (#14239)

This is an automated email from the ASF dual-hosted git repository.

cheddar pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new f9861808bc Be able to load segments on Peons (#14239)
f9861808bc is described below

commit f9861808bc7e1ce4d7358260e024540d13015aad
Author: imply-cheddar <86...@users.noreply.github.com>
AuthorDate: Sat May 13 08:51:00 2023 +0900

    Be able to load segments on Peons (#14239)
    
    * Be able to load segments on Peons
    
    This change introduces a new config on WorkerConfig
    that indicates how many bytes of each storage
    location to use for storage of a task.  Said config
    is divided up amongst the locations and slots
    and then used to set TaskConfig.tmpStorageBytesPerTask
    
    The Peons use their local task dir and
    tmpStorageBytesPerTask as their StorageLocations for
    the SegmentManager such that they can accept broadcast
    segments.
---
 docs/configuration/index.md                        |   4 +-
 docs/ingestion/tasks.md                            |  13 +
 docs/multi-stage-query/reference.md                |   5 +-
 .../dropwizard/DropwizardEmitterConfigTest.java    |   2 +-
 .../mysql/MySQLMetadataStorageModuleTest.java      |   6 +-
 .../indexing/common/TaskStorageDirTracker.java     | 220 ++++++++++--
 .../druid/indexing/common/TaskToolboxFactory.java  |   5 +-
 .../druid/indexing/common/config/TaskConfig.java   |  21 ++
 .../overlord/BaseRestorableTaskRunner.java         |  22 +-
 .../druid/indexing/overlord/ForkingTaskRunner.java | 191 +++++-----
 .../indexing/overlord/ThreadingTaskRunner.java     |  21 +-
 .../indexing/common/TaskStorageDirTrackerTest.java | 229 ++++++++++--
 .../indexing/overlord/ForkingTaskRunnerTest.java   | 160 ++++++---
 .../indexing/overlord/TestTaskToolboxFactory.java  | 388 +++++++++++++++++++++
 .../indexing/overlord/ThreadingTaskRunnerTest.java |  13 +-
 .../config/RemoteTaskRunnerConfigTest.java         |   4 +-
 .../server/initialization/IndexerZkConfigTest.java |  10 +-
 .../org/apache/druid/guice/JsonConfigProvider.java |  33 +-
 ...va => ProviderBasedGoogleSupplierProvider.java} |  22 +-
 .../druid/segment/column/StringValueSetIndex.java  |   3 +-
 .../apache/druid/query/DefaultQueryConfigTest.java |   4 +-
 .../druid/indexing/worker/config/WorkerConfig.java | 137 +++++++-
 .../initialization/AuthenticatorMapperModule.java  |   7 +-
 .../initialization/AuthorizerMapperModule.java     |   7 +-
 .../apache/druid/client/cache/CacheConfigTest.java |  14 +-
 .../druid/client/cache/CaffeineCacheTest.java      |   6 +-
 .../apache/druid/curator/CuratorConfigTest.java    |   2 +-
 .../apache/druid/guice/JsonConfigTesterBase.java   |   2 +-
 .../druid/guice/JsonConfigTesterBaseTest.java      |   2 +-
 .../druid/guice/security/DruidAuthModuleTest.java  |   2 +-
 .../indexing/worker/config/WorkerConfigTest.java   |  45 +--
 .../druid/initialization/ZkPathsConfigTest.java    |   2 +-
 .../lookup/LookupListeningAnnouncerConfigTest.java |  10 +-
 .../apache/druid/server/QuerySchedulerTest.java    |  14 +-
 .../log/LoggingRequestLoggerProviderTest.java      |   6 +-
 .../java/org/apache/druid/cli/CliCoordinator.java  |   7 +-
 .../main/java/org/apache/druid/cli/CliPeon.java    |  34 +-
 .../druid/sql/avatica/AvaticaModuleTest.java       |   6 +-
 website/.spelling                                  |   1 +
 39 files changed, 1309 insertions(+), 371 deletions(-)

diff --git a/docs/configuration/index.md b/docs/configuration/index.md
index f5d739dd4f..85437dc12b 100644
--- a/docs/configuration/index.md
+++ b/docs/configuration/index.md
@@ -1463,6 +1463,7 @@ Middle managers pass their configurations down to their child peons. The MiddleM
 |`druid.worker.version`|Version identifier for the MiddleManager. The version number is a string. This affects the expected behavior during certain operations like comparison against `druid.indexer.runner.minWorkerVersion`. Specifically, the version comparison follows dictionary order. Use ISO8601 date format for the version to accommodate date comparisons.|0|
 |`druid.worker.capacity`|Maximum number of tasks the MiddleManager can accept.|Number of CPUs on the machine - 1|
 |`druid.worker.baseTaskDirs`|List of base temporary working directories, one of which is assigned per task in a round-robin fashion. This property can be used to allow usage of multiple disks for indexing. This property is recommended in place of and takes precedence over `${druid.indexer.task.baseTaskDir}`.  If this configuration is not set, `${druid.indexer.task.baseTaskDir}` is used.  Example: `druid.worker.baseTaskDirs=[\"PATH1\",\"PATH2\",...]`.|null|
+|`druid.worker.baseTaskDirSize`|The total amount of bytes that can be used by tasks on any single task dir.  This value is treated symmetrically across all directories, that is, if this is 500 GB and there are 3 `baseTaskDirs`, then each of those task directories is assumed to allow for 500 GB to be used and a total of 1.5 TB will potentially be available across all tasks.  The actual amount of memory assigned to each task is discussed in [Configuring task storage sizes](../ingestion/tas [...]
 |`druid.worker.category`|A string to name the category that the MiddleManager node belongs to.|`_default_worker_category`|
 
 #### Peon Processing
@@ -1526,7 +1527,7 @@ Additional peon configs include:
 |`druid.indexer.task.restoreTasksOnRestart`|If true, MiddleManagers will attempt to stop tasks gracefully on shutdown and restore them on restart.|false|
 |`druid.indexer.task.ignoreTimestampSpecForDruidInputSource`|If true, tasks using the [Druid input source](../ingestion/native-batch-input-source.md) will ignore the provided timestampSpec, and will use the `__time` column of the input datasource. This option is provided for compatibility with ingestion specs written before Druid 0.22.0.|false|
 |`druid.indexer.task.storeEmptyColumns`|Boolean value for whether or not to store empty columns during ingestion. When set to true, Druid stores every column specified in the [`dimensionsSpec`](../ingestion/ingestion-spec.md#dimensionsspec). If you use schemaless ingestion and don't specify any dimensions to ingest, you must also set [`includeAllDimensions`](../ingestion/ingestion-spec.md#dimensionsspec) for Druid to store empty columns.<br/><br/>If you set `storeEmptyColumns` to false,  [...]
-|`druid.indexer.task.tmpStorageBytesPerTask`|Maximum number of bytes per task to be used to store temporary files on disk. This usage is split among all temporary storage usages for the task. An exception might be thrown if this limit is too low for the task or if this limit would be exceeded. This limit is currently respected only by MSQ tasks. Other types of tasks might exceed this limit. A value of -1 disables this limit.  |-1|
+|`druid.indexer.task.tmpStorageBytesPerTask`|Maximum number of bytes per task to be used to store temporary files on disk. This config is generally intended for internal usage.  Attempts to set it are very likely to be overwritten by the TaskRunner that executes the task, so be sure of what you expect to happen before directly adjusting this configuration parameter.  The config is documented here primarily to provide an understanding of what it means if/when someone sees that it has been [...]
 |`druid.indexer.server.maxChatRequests`|Maximum number of concurrent requests served by a task's chat handler. Set to 0 to disable limiting.|0|
 
 If the peon is running in remote mode, there must be an Overlord up and running. Peons in remote mode can set the following configurations:
@@ -1584,6 +1585,7 @@ then the value from the configuration below is used:
 |`druid.worker.version`|Version identifier for the Indexer.|0|
 |`druid.worker.capacity`|Maximum number of tasks the Indexer can accept.|Number of available processors - 1|
 |`druid.worker.baseTaskDirs`|List of base temporary working directories, one of which is assigned per task in a round-robin fashion. This property can be used to allow usage of multiple disks for indexing. This property is recommended in place of and takes precedence over `${druid.indexer.task.baseTaskDir}`.  If this configuration is not set, `${druid.indexer.task.baseTaskDir}` is used.  Example: `druid.worker.baseTaskDirs=[\"PATH1\",\"PATH2\",...]`.|null|
+|`druid.worker.baseTaskDirSize`|The total amount of bytes that can be used by tasks on any single task dir.  This value is treated symmetrically across all directories, that is, if this is 500 GB and there are 3 `baseTaskDirs`, then each of those task directories is assumed to allow for 500 GB to be used and a total of 1.5 TB will potentially be available across all tasks.  The actual amount of memory assigned to each task is discussed in [Configuring task storage sizes](../ingestion/tas [...]
 |`druid.worker.globalIngestionHeapLimitBytes`|Total amount of heap available for ingestion processing. This is applied by automatically setting the `maxBytesInMemory` property on tasks.|60% of configured JVM heap|
 |`druid.worker.numConcurrentMerges`|Maximum number of segment persist or merge operations that can run concurrently across all tasks.|`druid.worker.capacity` / 2, rounded down|
 |`druid.indexer.task.baseDir`|Base temporary working directory.|`System.getProperty("java.io.tmpdir")`|
diff --git a/docs/ingestion/tasks.md b/docs/ingestion/tasks.md
index d169bc4247..979d6d88d7 100644
--- a/docs/ingestion/tasks.md
+++ b/docs/ingestion/tasks.md
@@ -419,6 +419,19 @@ You can configure retention periods for logs in milliseconds by setting `druid.i
 
 > Automatic log file deletion typically works based on the log file's 'modified' timestamp in the back-end store.  Large clock skews between Druid processes and the long-term store might result in unintended behavior.
 
+## Configuring task storage sizes
+
+Tasks sometimes need to use local disk for storage of things while the task is active.  For example, for realtime ingestion tasks to accept broadcast segments for broadcast joins.  Or intermediate data sets for Multi-stage Query jobs
+
+Task storage sizes are configured through a combination of three properties:
+1. `druid.worker.capacity` - i.e. the "number of task slots"
+2. `druid.worker.baseTaskDirs` - i.e. the list of directories to use for task storage. 
+3. `druid.worker.baseTaskDirSize` - i.e. the amount of storage to use on each storage location
+
+While it seems like one task might use multiple directories, only one directory from the list of base directories will be used for any given task, as such, each task is only given a singular directory for scratch space.
+
+The actual amount of memory assigned to any given task is computed by determining the largest size that enables all task slots to be given an equivalent amount of disk storage.  For example, with 5 slots, 2 directories (A and B) and a size of 300 GB, 3 slots would be given to directory A, 2 slots to directory B and each slot would be allowed 100 GB 
+
 ## All task types
 
 ### `index_parallel`
diff --git a/docs/multi-stage-query/reference.md b/docs/multi-stage-query/reference.md
index 171d842e7e..f7b9ab8e66 100644
--- a/docs/multi-stage-query/reference.md
+++ b/docs/multi-stage-query/reference.md
@@ -344,9 +344,8 @@ the storage connector to work upon the durable storage. The durable storage loca
 for cluster's MSQ tasks. If the location contains other files or directories, then they will get cleaned up as well.
 
 Enabling durable storage also enables the use of local disk to store temporary files, such as the intermediate files produced
-by the super sorter. The limit set by `druid.indexer.task.tmpStorageBytesPerTask` for maximum number of bytes of local
-storage to be used per task will be respected by MSQ tasks. If the configured limit is too low, `NotEnoughTemporaryStorageFault`
-may be thrown.
+by the super sorter.  Tasks will use whatever has been configured for their temporary usage as described in [Configuring task storage sizes](../ingestion/tasks.md#configuring-task-storage-sizes)
+If the configured limit is too low, `NotEnoughTemporaryStorageFault` may be thrown.
 
 ### Enable durable storage
 
diff --git a/extensions-contrib/dropwizard-emitter/src/test/java/org/apache/druid/emitter/dropwizard/DropwizardEmitterConfigTest.java b/extensions-contrib/dropwizard-emitter/src/test/java/org/apache/druid/emitter/dropwizard/DropwizardEmitterConfigTest.java
index 1ace634f9e..174ad51de8 100644
--- a/extensions-contrib/dropwizard-emitter/src/test/java/org/apache/druid/emitter/dropwizard/DropwizardEmitterConfigTest.java
+++ b/extensions-contrib/dropwizard-emitter/src/test/java/org/apache/druid/emitter/dropwizard/DropwizardEmitterConfigTest.java
@@ -74,7 +74,7 @@ public class DropwizardEmitterConfigTest extends JsonConfigTesterBase<Dropwizard
     propertyValues.put(getPropertyKey("includeHost"), "true");
     testProperties.putAll(propertyValues);
     configProvider.inject(testProperties, configurator);
-    DropwizardEmitterConfig config = configProvider.get().get();
+    DropwizardEmitterConfig config = configProvider.get();
     Assert.assertTrue("IncludeHost", config.getIncludeHost());
     Assert.assertEquals("test-prefix", config.getPrefix());
     Assert.assertEquals(1, config.getReporters().size());
diff --git a/extensions-core/mysql-metadata-storage/src/test/java/org/apache/druid/metadata/storage/mysql/MySQLMetadataStorageModuleTest.java b/extensions-core/mysql-metadata-storage/src/test/java/org/apache/druid/metadata/storage/mysql/MySQLMetadataStorageModuleTest.java
index 4c395061e8..0e2d6c359b 100644
--- a/extensions-core/mysql-metadata-storage/src/test/java/org/apache/druid/metadata/storage/mysql/MySQLMetadataStorageModuleTest.java
+++ b/extensions-core/mysql-metadata-storage/src/test/java/org/apache/druid/metadata/storage/mysql/MySQLMetadataStorageModuleTest.java
@@ -62,7 +62,7 @@ public class MySQLMetadataStorageModuleTest
     properties.setProperty(propertyPrefix + ".enabledTLSProtocols", "[\"some\", \"protocols\"]");
     properties.setProperty(propertyPrefix + ".verifyServerCertificate", "true");
     provider.inject(properties, injector.getInstance(JsonConfigurator.class));
-    final MySQLConnectorSslConfig config = provider.get().get();
+    final MySQLConnectorSslConfig config = provider.get();
     Assert.assertTrue(config.isUseSSL());
     Assert.assertEquals("url", config.getTrustCertificateKeyStoreUrl());
     Assert.assertEquals("type", config.getTrustCertificateKeyStoreType());
@@ -86,7 +86,7 @@ public class MySQLMetadataStorageModuleTest
     );
     final Properties properties = new Properties();
     provider.inject(properties, injector.getInstance(JsonConfigurator.class));
-    final MySQLConnectorDriverConfig config = provider.get().get();
+    final MySQLConnectorDriverConfig config = provider.get();
     Assert.assertEquals(new MySQLConnectorDriverConfig().getDriverClassName(), config.getDriverClassName());
   }
 
@@ -102,7 +102,7 @@ public class MySQLMetadataStorageModuleTest
     final Properties properties = new Properties();
     properties.setProperty(propertyPrefix + ".driverClassName", "some.driver.classname");
     provider.inject(properties, injector.getInstance(JsonConfigurator.class));
-    final MySQLConnectorDriverConfig config = provider.get().get();
+    final MySQLConnectorDriverConfig config = provider.get();
     Assert.assertEquals("some.driver.classname", config.getDriverClassName());
   }
 
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/TaskStorageDirTracker.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/TaskStorageDirTracker.java
index b23d374bbd..5605066595 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/TaskStorageDirTracker.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/TaskStorageDirTracker.java
@@ -23,92 +23,240 @@ import com.google.common.collect.ImmutableList;
 import org.apache.druid.indexing.common.config.TaskConfig;
 import org.apache.druid.indexing.worker.config.WorkerConfig;
 import org.apache.druid.java.util.common.FileUtils;
+import org.apache.druid.java.util.common.IAE;
 import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.java.util.common.lifecycle.LifecycleStart;
+import org.apache.druid.java.util.common.logger.Logger;
 
-import javax.annotation.Nullable;
 import java.io.File;
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.stream.Collectors;
 
+/**
+ * Used to pick storage slots for tasks when run from the middle manager.
+ */
 public class TaskStorageDirTracker
 {
+  private static final Logger log = new Logger(TaskStorageDirTracker.class);
+
   public static TaskStorageDirTracker fromConfigs(WorkerConfig workerConfig, TaskConfig taskConfig)
   {
-    if (workerConfig == null) {
-      return new TaskStorageDirTracker(ImmutableList.of(taskConfig.getBaseTaskDir()));
+    final List<String> basePaths = workerConfig.getBaseTaskDirs();
+    final List<File> baseTaskDirs;
+
+    if (basePaths == null) {
+      baseTaskDirs = ImmutableList.of(taskConfig.getBaseTaskDir());
     } else {
-      final List<String> basePaths = workerConfig.getBaseTaskDirs();
-      if (basePaths == null) {
-        return new TaskStorageDirTracker(ImmutableList.of(taskConfig.getBaseTaskDir()));
-      }
-      return new TaskStorageDirTracker(
-          basePaths.stream().map(File::new).collect(Collectors.toList())
-      );
+      baseTaskDirs = basePaths.stream().map(File::new).collect(Collectors.toList());
+    }
+
+    return fromBaseDirs(baseTaskDirs, workerConfig.getCapacity(), workerConfig.getBaseTaskDirSize());
+  }
+
+  public static TaskStorageDirTracker fromBaseDirs(List<File> baseTaskDirs, int numSlots, long dirSize)
+  {
+    int slotsPerBaseTaskDir = numSlots / baseTaskDirs.size();
+    if (slotsPerBaseTaskDir == 0) {
+      slotsPerBaseTaskDir = 1;
+    } else if (numSlots % baseTaskDirs.size() > 0) {
+      // We have to add an extra slot per location if they do not evenly divide
+      ++slotsPerBaseTaskDir;
     }
+    long sizePerSlot = dirSize / slotsPerBaseTaskDir;
+
+    File[] slotDirs = new File[numSlots];
+    for (int i = 0; i < numSlots; ++i) {
+      final int whichDir = i % baseTaskDirs.size();
+      final int dirUsageCount = i / baseTaskDirs.size();
+      slotDirs[i] = new File(baseTaskDirs.get(whichDir), StringUtils.format("slot%d", dirUsageCount));
+    }
+
+    return new TaskStorageDirTracker(baseTaskDirs, slotDirs, sizePerSlot);
   }
 
+  /**
+   * The base task dirs, this field exists primarily for compatibility with scheduling that was done
+   * before TaskStorageDirTracker was introduced.  All of the tasks were just splatted together
+   * into one directory.  If we want to be able to restore the tasks, we need to be able to find them
+   * at the old locations and that is why this exists.
+   */
   private final File[] baseTaskDirs;
-  // Initialize to a negative number because it ensures that we can handle the overflow-rollover case
+
+  /**
+   * These are slots pre-divided to keep disk sizing considerations aligned.  The normal operation of this
+   * class is to round-robin across these slots.
+   */
+  private final StorageSlot[] slots;
+
+  /**
+   * A counter used to simplify round-robin allocation.  We initialize it to a negative value because it
+   * simplifies testing/ensuring that we can handle overflow-rollover of the integer
+   */
   private final AtomicInteger iterationCounter = new AtomicInteger(Integer.MIN_VALUE);
 
-  public TaskStorageDirTracker(List<File> baseTaskDirs)
+  private TaskStorageDirTracker(List<File> baseTaskDirs, File[] slotDirs, long sizePerSlot)
   {
     this.baseTaskDirs = baseTaskDirs.toArray(new File[0]);
+    this.slots = new StorageSlot[slotDirs.length];
+    for (int i = 0; i < slotDirs.length; ++i) {
+      slots[i] = new StorageSlot(slotDirs[i], sizePerSlot);
+    }
   }
 
   @LifecycleStart
   public void ensureDirectories()
   {
-    for (File baseTaskDir : baseTaskDirs) {
-      if (!baseTaskDir.exists()) {
+    for (StorageSlot slot : slots) {
+      if (!slot.getDirectory().exists()) {
         try {
-          FileUtils.mkdirp(baseTaskDir);
+          FileUtils.mkdirp(slot.getDirectory());
         }
         catch (IOException e) {
           throw new ISE(
               e,
-              "base task directory [%s] likely does not exist, please ensure it exists and the user has permissions.",
-              baseTaskDir
+              "directory for slot [%s] likely does not exist, please ensure it exists and the user has permissions.",
+              slot
           );
         }
       }
     }
   }
 
-  public File pickBaseDir(String taskId)
+  public synchronized StorageSlot pickStorageSlot(String taskId)
   {
-    if (baseTaskDirs.length == 1) {
-      return baseTaskDirs[0];
+    // if the task directory already exists, we want to give it precedence, so check.
+    for (StorageSlot slot : slots) {
+      if (slot.runningTaskId != null && slot.runningTaskId.equals(taskId)) {
+        return slot;
+      }
     }
 
-    // if the task directory already exists, we want to give it precedence, so check.
-    for (File baseTaskDir : baseTaskDirs) {
-      if (new File(baseTaskDir, taskId).exists()) {
-        return baseTaskDir;
+    // if it doesn't exist, pick one round-robin and ensure it is unused.
+    for (int i = 0; i < slots.length; ++i) {
+      // This can be negative, so abs() it.
+      final int currIncrement = Math.abs(iterationCounter.getAndIncrement() % slots.length);
+      final StorageSlot candidateSlot = slots[currIncrement % slots.length];
+      if (candidateSlot.runningTaskId == null) {
+        candidateSlot.runningTaskId = taskId;
+        return candidateSlot;
       }
     }
+    throw new ISE("Unable to pick a free slot, this should never happen, slot status [%s].", Arrays.toString(slots));
+  }
 
-    // if it doesn't exist, pick one round-robin and return.  This can be negative, so abs() it
-    final int currIncrement = Math.abs(iterationCounter.getAndIncrement() % baseTaskDirs.length);
-    return baseTaskDirs[currIncrement % baseTaskDirs.length];
+  public synchronized void returnStorageSlot(StorageSlot slot)
+  {
+    if (slot.getParentRef() == this) {
+      slot.runningTaskId = null;
+    } else {
+      throw new IAE("Cannot return storage slot for task [%s] that I don't own.", slot.runningTaskId);
+    }
   }
 
-  @Nullable
-  public File findExistingTaskDir(String taskId)
+  /**
+   * Finds directories that might already exist for the list of tasks.  Useful in restoring tasks upon restart.
+   *
+   * @param taskIds the ids to find and restore
+   * @return a map of taskId to the StorageSlot for that task.  Contains null values for ids that couldn't be found
+   */
+  public synchronized Map<String, StorageSlot> findExistingTaskDirs(List<String> taskIds)
   {
-    if (baseTaskDirs.length == 1) {
-      return new File(baseTaskDirs[0], taskId);
+    // Use a tree map because we don't expect this to be too large, but it's nice to have the keys sorted
+    // if this ever gets printed out.
+    Map<String, StorageSlot> retVal = new TreeMap<>();
+    List<String> missingIds = new ArrayList<>();
+
+
+    // We need to start by looking for the tasks in current slot locations so that we ensure that we have
+    // correct in-memory accounting for anything that is currently running in a known slot.  After that, for
+    // compatibility with an old implementation, we need to check the base directories to see if any of
+    // the tasks are running in the legacy locations and assign them to one of the free task slots.
+    for (String taskId : taskIds) {
+      StorageSlot candidateSlot = Arrays.stream(slots)
+                                        .filter(slot -> slot.runningTaskId == null)
+                                        .filter(slot -> new File(slot.getDirectory(), taskId).exists())
+                                        .findFirst()
+                                        .orElse(null);
+
+      if (candidateSlot == null) {
+        missingIds.add(taskId);
+      } else {
+        candidateSlot.runningTaskId = taskId;
+        retVal.put(taskId, candidateSlot);
+      }
     }
 
-    for (File baseTaskDir : baseTaskDirs) {
-      final File candidateLocation = new File(baseTaskDir, taskId);
-      if (candidateLocation.exists()) {
-        return candidateLocation;
+    for (String missingId : missingIds) {
+      File definitelyExists = null;
+      for (File baseTaskDir : baseTaskDirs) {
+        File maybeExists = new File(baseTaskDir, missingId);
+        if (maybeExists.exists()) {
+          definitelyExists = maybeExists;
+          break;
+        }
       }
+
+      if (definitelyExists == null) {
+        retVal.put(missingId, null);
+      } else {
+        final StorageSlot pickedSlot = pickStorageSlot(missingId);
+        final File pickedLocation = new File(pickedSlot.getDirectory(), missingId);
+        if (definitelyExists.renameTo(pickedLocation)) {
+          retVal.put(missingId, pickedSlot);
+        } else {
+          log.warn("Unable to relocate task ([%s] -> [%s]), pretend it didn't exist", definitelyExists, pickedLocation);
+          retVal.put(missingId, null);
+          returnStorageSlot(pickedSlot);
+        }
+      }
+    }
+
+    return retVal;
+  }
+
+  public class StorageSlot
+  {
+    private final File directory;
+    private final long numBytes;
+
+    private volatile String runningTaskId = null;
+
+    private StorageSlot(File baseDirectory, long numBytes)
+    {
+      this.directory = baseDirectory;
+      this.numBytes = numBytes;
+    }
+
+    public File getDirectory()
+    {
+      return directory;
+    }
+
+    public long getNumBytes()
+    {
+      return numBytes;
+    }
+
+    public TaskStorageDirTracker getParentRef()
+    {
+      return TaskStorageDirTracker.this;
+    }
+
+    @Override
+    public String toString()
+    {
+      return "StorageSlot{" +
+             "directory=" + directory +
+             ", numBytes=" + numBytes +
+             ", runningTaskId='" + runningTaskId + '\'' +
+             '}';
     }
-    return null;
   }
 }
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/TaskToolboxFactory.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/TaskToolboxFactory.java
index fc288bbee2..d95e5453ff 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/TaskToolboxFactory.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/TaskToolboxFactory.java
@@ -64,6 +64,7 @@ import org.apache.druid.server.security.AuthorizerMapper;
 import org.apache.druid.tasklogs.TaskLogPusher;
 
 import java.io.File;
+import java.util.function.Function;
 
 /**
  * Stuff that may be needed by a Task in order to conduct its business.
@@ -198,9 +199,9 @@ public class TaskToolboxFactory
     return build(config, task);
   }
 
-  public TaskToolbox build(File baseTaskDir, Task task)
+  public TaskToolbox build(Function<TaskConfig, TaskConfig> decoratorFn, Task task)
   {
-    return build(config.withBaseTaskDir(baseTaskDir), task);
+    return build(decoratorFn.apply(config), task);
   }
 
   public TaskToolbox build(TaskConfig config, Task task)
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/config/TaskConfig.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/config/TaskConfig.java
index c10b496389..db48d6f07f 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/config/TaskConfig.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/config/TaskConfig.java
@@ -373,4 +373,25 @@ public class TaskConfig
         tmpStorageBytesPerTask
     );
   }
+
+  public TaskConfig withTmpStorageBytesPerTask(long tmpStorageBytesPerTask)
+  {
+    return new TaskConfig(
+        baseDir,
+        baseTaskDir,
+        hadoopWorkingPath,
+        defaultRowFlushBoundary,
+        defaultHadoopCoordinates,
+        restoreTasksOnRestart,
+        gracefulShutdownTimeout,
+        directoryLockTimeout,
+        shuffleDataLocations,
+        ignoreTimestampSpecForDruidInputSource,
+        batchMemoryMappedIndex,
+        batchProcessingMode,
+        storeEmptyColumns,
+        encapsulatedTask,
+        tmpStorageBytesPerTask
+    );
+  }
 }
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/BaseRestorableTaskRunner.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/BaseRestorableTaskRunner.java
index 489afaf5fe..be2008ab14 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/BaseRestorableTaskRunner.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/BaseRestorableTaskRunner.java
@@ -33,6 +33,7 @@ import org.apache.druid.indexer.TaskStatus;
 import org.apache.druid.indexing.common.TaskStorageDirTracker;
 import org.apache.druid.indexing.common.config.TaskConfig;
 import org.apache.druid.indexing.common.task.Task;
+import org.apache.druid.java.util.common.FileUtils;
 import org.apache.druid.java.util.common.ISE;
 import org.apache.druid.java.util.common.Pair;
 import org.apache.druid.java.util.emitter.EmittingLogger;
@@ -42,6 +43,7 @@ import java.io.File;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.Executor;
@@ -98,18 +100,32 @@ public abstract class BaseRestorableTaskRunner<WorkItemType extends TaskRunnerWo
     }
 
     final List<Pair<Task, ListenableFuture<TaskStatus>>> retVal = new ArrayList<>();
+    final Map<String, TaskStorageDirTracker.StorageSlot> existingTaskDirs =
+        tracker.findExistingTaskDirs(taskRestoreInfo.getRunningTasks());
     for (final String taskId : taskRestoreInfo.getRunningTasks()) {
+
+      final TaskStorageDirTracker.StorageSlot storageSlot = existingTaskDirs.get(taskId);
+      if (storageSlot == null) {
+        LOG.warn("restorable task [%s] didn't actually exist!?", taskId);
+        continue;
+      }
+
       try {
-        final File taskFile = new File(tracker.findExistingTaskDir(taskId), "task.json");
+        final File taskFile = storageSlot.getDirectory().toPath().resolve(taskId).resolve("task.json").toFile();
         final Task task = jsonMapper.readValue(taskFile, Task.class);
 
         if (!task.getId().equals(taskId)) {
-          throw new ISE("Task[%s] restore file had wrong id[%s]", taskId, task.getId());
+          throw new ISE("Task [%s] restore file had wrong id[%s]", taskId, task.getId());
         }
 
         if (taskConfig.isRestoreTasksOnRestart() && task.canRestore()) {
           LOG.info("Restoring task[%s].", task.getId());
           retVal.add(Pair.of(task, run(task)));
+        } else {
+          final File dir = new File(storageSlot.getDirectory(), taskId);
+          LOG.info("Task [%s] is not restorable, cleaning up the directory [%s]", taskId, dir);
+          tracker.returnStorageSlot(storageSlot);
+          FileUtils.deleteDirectory(dir);
         }
       }
       catch (Exception e) {
@@ -118,7 +134,7 @@ public abstract class BaseRestorableTaskRunner<WorkItemType extends TaskRunnerWo
     }
 
     if (!retVal.isEmpty()) {
-      LOG.info("Restored %,d tasks: %s", retVal.size(), Joiner.on(", ").join(retVal));
+      LOG.info("Restored [%,d] tasks: [%s]", retVal.size(), Joiner.on(", ").join(retVal));
     }
 
     return retVal;
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ForkingTaskRunner.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ForkingTaskRunner.java
index 0020ff2198..8df3a69eee 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ForkingTaskRunner.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ForkingTaskRunner.java
@@ -27,7 +27,6 @@ import com.google.common.base.Optional;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Iterables;
 import com.google.common.collect.Sets;
 import com.google.common.io.ByteSink;
 import com.google.common.io.ByteStreams;
@@ -154,20 +153,19 @@ public class ForkingTaskRunner
                 @Override
                 public TaskStatus call()
                 {
-
-                  final File baseDirForTask;
+                  final TaskStorageDirTracker.StorageSlot storageSlot;
                   try {
-                    baseDirForTask = getTracker().pickBaseDir(task.getId());
+                    storageSlot = getTracker().pickStorageSlot(task.getId());
                   }
                   catch (RuntimeException e) {
-                    LOG.error(e, "Failed to get directory for task [%s], cannot schedule.", task.getId());
+                    LOG.warn(e, "Failed to get storage slot for task [%s], cannot schedule.", task.getId());
                     return TaskStatus.failure(
                         task.getId(),
-                        StringUtils.format("Could not schedule due to error [%s]", e.getMessage())
+                        StringUtils.format("Failed to get storage slot due to error [%s]", e.getMessage())
                     );
                   }
 
-                  final File taskDir = new File(baseDirForTask, task.getId());
+                  final File taskDir = new File(storageSlot.getDirectory(), task.getId());
                   final String attemptId = String.valueOf(getNextAttemptID(taskDir));
                   final File attemptDir = Paths.get(taskDir.getAbsolutePath(), "attempt", attemptId).toFile();
 
@@ -214,7 +212,7 @@ public class ForkingTaskRunner
                           throw new ISE("TaskInfo already has processHolder for task[%s]!", task.getId());
                         }
 
-                        final List<String> command = new ArrayList<>();
+                        final CommandListBuilder command = new CommandListBuilder();
                         final String taskClasspath;
                         if (task.getClasspathPrefix() != null && !task.getClasspathPrefix().isEmpty()) {
                           taskClasspath = Joiner.on(File.pathSeparator).join(
@@ -236,18 +234,15 @@ public class ForkingTaskRunner
 
                         command.add(StringUtils.format("-XX:ActiveProcessorCount=%d", numProcessorsPerTask));
 
-                        Iterables.addAll(command, new QuotableWhiteSpaceSplitter(config.getJavaOpts()));
-                        Iterables.addAll(command, config.getJavaOptsArray());
+                        command.addAll(new QuotableWhiteSpaceSplitter(config.getJavaOpts()));
+                        command.addAll(config.getJavaOptsArray());
 
                         // Override task specific javaOpts
                         Object taskJavaOpts = task.getContextValue(
                             ForkingTaskRunnerConfig.JAVA_OPTS_PROPERTY
                         );
                         if (taskJavaOpts != null) {
-                          Iterables.addAll(
-                              command,
-                              new QuotableWhiteSpaceSplitter((String) taskJavaOpts)
-                          );
+                          command.addAll(new QuotableWhiteSpaceSplitter((String) taskJavaOpts));
                         }
 
                         // Override task specific javaOptsArray
@@ -257,7 +252,7 @@ public class ForkingTaskRunner
                               new TypeReference<List<String>>() {}
                           );
                           if (taskJavaOptsArray != null) {
-                            Iterables.addAll(command, taskJavaOptsArray);
+                            command.addAll(taskJavaOptsArray);
                           }
                         }
                         catch (Exception e) {
@@ -275,13 +270,7 @@ public class ForkingTaskRunner
                                 && !ForkingTaskRunnerConfig.JAVA_OPTS_PROPERTY.equals(propName)
                                 && !ForkingTaskRunnerConfig.JAVA_OPTS_ARRAY_PROPERTY.equals(propName)
                             ) {
-                              command.add(
-                                  StringUtils.format(
-                                  "-D%s=%s",
-                                  propName,
-                                  props.getProperty(propName)
-                                )
-                              );
+                              command.addSystemProperty(propName, props.getProperty(propName));
                             }
                           }
                         }
@@ -289,12 +278,9 @@ public class ForkingTaskRunner
                         // Override child JVM specific properties
                         for (String propName : props.stringPropertyNames()) {
                           if (propName.startsWith(CHILD_PROPERTY_PREFIX)) {
-                            command.add(
-                                StringUtils.format(
-                                "-D%s=%s",
+                            command.addSystemProperty(
                                 propName.substring(CHILD_PROPERTY_PREFIX.length()),
                                 props.getProperty(propName)
-                              )
                             );
                           }
                         }
@@ -304,82 +290,48 @@ public class ForkingTaskRunner
                         if (context != null) {
                           for (String propName : context.keySet()) {
                             if (propName.startsWith(CHILD_PROPERTY_PREFIX)) {
-                              command.add(
-                                  StringUtils.format(
-                                  "-D%s=%s",
+                              command.addSystemProperty(
                                   propName.substring(CHILD_PROPERTY_PREFIX.length()),
                                   task.getContextValue(propName)
-                                )
                               );
                             }
                           }
                         }
 
                         // add the attemptId as a system property
-                        command.add(
-                            StringUtils.format(
-                                "-D%s=%s",
-                                "attemptId",
-                                "1"
-                            )
-                        );
+                        command.addSystemProperty("attemptId", "1");
 
                         // Add dataSource, taskId and taskType for metrics or logging
-                        command.add(
-                            StringUtils.format(
-                            "-D%s%s=%s",
-                            MonitorsConfig.METRIC_DIMENSION_PREFIX,
-                            DruidMetrics.DATASOURCE,
+                        command.addSystemProperty(
+                            MonitorsConfig.METRIC_DIMENSION_PREFIX + DruidMetrics.DATASOURCE,
                             task.getDataSource()
-                          )
                         );
-                        command.add(
-                            StringUtils.format(
-                            "-D%s%s=%s",
-                            MonitorsConfig.METRIC_DIMENSION_PREFIX,
-                            DruidMetrics.TASK_ID,
+                        command.addSystemProperty(
+                            MonitorsConfig.METRIC_DIMENSION_PREFIX + DruidMetrics.TASK_ID,
                             task.getId()
-                          )
                         );
-                        command.add(
-                            StringUtils.format(
-                            "-D%s%s=%s",
-                            MonitorsConfig.METRIC_DIMENSION_PREFIX,
-                            DruidMetrics.TASK_TYPE,
+                        command.addSystemProperty(
+                            MonitorsConfig.METRIC_DIMENSION_PREFIX + DruidMetrics.TASK_TYPE,
                             task.getType()
-                          )
                         );
 
-                        command.add(StringUtils.format("-Ddruid.host=%s", childHost));
-                        command.add(StringUtils.format("-Ddruid.plaintextPort=%d", childPort));
-                        command.add(StringUtils.format("-Ddruid.tlsPort=%d", tlsChildPort));
+
+                        command.addSystemProperty("druid.host", childHost);
+                        command.addSystemProperty("druid.plaintextPort", childPort);
+                        command.addSystemProperty("druid.tlsPort", tlsChildPort);
 
                         // Let tasks know where they are running on.
                         // This information is used in native parallel indexing with shuffle.
-                        command.add(StringUtils.format("-Ddruid.task.executor.service=%s", node.getServiceName()));
-                        command.add(StringUtils.format("-Ddruid.task.executor.host=%s", node.getHost()));
-                        command.add(
-                            StringUtils.format("-Ddruid.task.executor.plaintextPort=%d", node.getPlaintextPort())
-                        );
-                        command.add(
-                            StringUtils.format(
-                                "-Ddruid.task.executor.enablePlaintextPort=%s",
-                                node.isEnablePlaintextPort()
-                            )
-                        );
-                        command.add(StringUtils.format("-Ddruid.task.executor.tlsPort=%d", node.getTlsPort()));
-                        command.add(
-                            StringUtils.format("-Ddruid.task.executor.enableTlsPort=%s", node.isEnableTlsPort())
-                        );
-                        command.add(StringUtils.format("-Dlog4j2.configurationFactory=%s", ConsoleLoggingEnforcementConfigurationFactory.class.getName()));
+                        command.addSystemProperty("druid.task.executor.service", node.getServiceName());
+                        command.addSystemProperty("druid.task.executor.host", node.getHost());
+                        command.addSystemProperty("druid.task.executor.plaintextPort", node.getPlaintextPort());
+                        command.addSystemProperty("druid.task.executor.enablePlaintextPort", node.isEnablePlaintextPort());
+                        command.addSystemProperty("druid.task.executor.tlsPort", node.getTlsPort());
+                        command.addSystemProperty("druid.task.executor.enableTlsPort", node.isEnableTlsPort());
+                        command.addSystemProperty("log4j2.configurationFactory", ConsoleLoggingEnforcementConfigurationFactory.class.getName());
 
-                        // These are not enabled per default to allow the user to either set or not set them
-                        // Users are highly suggested to be set in druid.indexer.runner.javaOpts
-                        // See org.apache.druid.concurrent.TaskThreadPriority#getThreadPriorityFromTaskPriority(int)
-                        // for more information
-                        // command.add("-XX:+UseThreadPriorities");
-                        // command.add("-XX:ThreadPriorityPolicy=42");
-                        command.add(StringUtils.format("-Ddruid.indexer.task.baseTaskDir=%s", baseDirForTask.getAbsolutePath()));
+                        command.addSystemProperty("druid.indexer.task.baseTaskDir", storageSlot.getDirectory().getAbsolutePath());
+                        command.addSystemProperty("druid.indexer.task.tmpStorageBytesPerTask", storageSlot.getNumBytes());
 
                         command.add("org.apache.druid.cli.Main");
                         command.add("internal");
@@ -405,9 +357,9 @@ public class ForkingTaskRunner
 
                         LOGGER.info(
                             "Running command: %s",
-                            getMaskedCommand(startupLoggingConfig.getMaskProperties(), command)
+                            getMaskedCommand(startupLoggingConfig.getMaskProperties(), command.getCommandList())
                         );
-                        taskWorkItem.processHolder = runTaskProcess(command, logFile, taskLocation);
+                        taskWorkItem.processHolder = runTaskProcess(command.getCommandList(), logFile, taskLocation);
 
                         processHolder = taskWorkItem.processHolder;
                         processHolder.registerWithCloser(closer);
@@ -477,6 +429,8 @@ public class ForkingTaskRunner
                         portFinder.markPortUnused(tlsChildPort);
                       }
 
+                      getTracker().returnStorageSlot(storageSlot);
+
                       try {
                         if (!stopping && taskDir.exists()) {
                           FileUtils.deleteDirectory(taskDir);
@@ -495,6 +449,7 @@ public class ForkingTaskRunner
                     }
                   }
                 }
+
               }
             )
           )
@@ -510,9 +465,7 @@ public class ForkingTaskRunner
     return new ProcessHolder(
         new ProcessBuilder(ImmutableList.copyOf(command)).redirectErrorStream(true).start(),
         logFile,
-        taskLocation.getHost(),
-        taskLocation.getPort(),
-        taskLocation.getTlsPort()
+        taskLocation
     );
   }
 
@@ -851,7 +804,7 @@ public class ForkingTaskRunner
       if (processHolder == null) {
         return TaskLocation.unknown();
       } else {
-        return TaskLocation.create(processHolder.host, processHolder.port, processHolder.tlsPort);
+        return processHolder.location;
       }
     }
 
@@ -868,33 +821,26 @@ public class ForkingTaskRunner
     }
   }
 
-  @VisibleForTesting
-  static class ProcessHolder
+  public static class ProcessHolder
   {
     private final Process process;
     private final File logFile;
-    private final String host;
-    private final int port;
-    private final int tlsPort;
+    private final TaskLocation location;
 
-    private ProcessHolder(Process process, File logFile, String host, int port, int tlsPort)
+    public ProcessHolder(Process process, File logFile, TaskLocation location)
     {
       this.process = process;
       this.logFile = logFile;
-      this.host = host;
-      this.port = port;
-      this.tlsPort = tlsPort;
+      this.location = location;
     }
 
-    @VisibleForTesting
-    void registerWithCloser(Closer closer)
+    private void registerWithCloser(Closer closer)
     {
       closer.register(process.getInputStream());
       closer.register(process.getOutputStream());
     }
 
-    @VisibleForTesting
-    void shutdown()
+    private void shutdown()
     {
       process.destroy();
     }
@@ -924,5 +870,50 @@ public class ForkingTaskRunner
     }
     return maxAttempt + 1;
   }
+
+  public static class CommandListBuilder
+  {
+    ArrayList<String> commandList = new ArrayList<>();
+
+    public CommandListBuilder add(String arg)
+    {
+      commandList.add(arg);
+      return this;
+    }
+
+    public CommandListBuilder addSystemProperty(String property, int value)
+    {
+      return addSystemProperty(property, String.valueOf(value));
+    }
+
+    public CommandListBuilder addSystemProperty(String property, long value)
+    {
+      return addSystemProperty(property, String.valueOf(value));
+    }
+
+    public CommandListBuilder addSystemProperty(String property, boolean value)
+    {
+      return addSystemProperty(property, String.valueOf(value));
+    }
+
+    public CommandListBuilder addSystemProperty(String property, String value)
+    {
+      return add(StringUtils.format("-D%s=%s", property, value));
+    }
+
+    public CommandListBuilder addAll(Iterable<String> args)
+    {
+      for (String arg : args) {
+        add(arg);
+      }
+      return this;
+    }
+
+    public ArrayList<String> getCommandList()
+    {
+      return commandList;
+    }
+
+  }
 }
 
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ThreadingTaskRunner.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ThreadingTaskRunner.java
index e4033e0aa9..55dba71622 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ThreadingTaskRunner.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ThreadingTaskRunner.java
@@ -155,9 +155,9 @@ public class ThreadingTaskRunner
                         @Override
                         public TaskStatus call()
                         {
-                          final File baseDirForTask;
+                          final TaskStorageDirTracker.StorageSlot storageSlot;
                           try {
-                            baseDirForTask = getTracker().pickBaseDir(task.getId());
+                            storageSlot = getTracker().pickStorageSlot(task.getId());
                           }
                           catch (RuntimeException e) {
                             LOG.error(e, "Failed to get directory for task [%s], cannot schedule.", task.getId());
@@ -167,7 +167,7 @@ public class ThreadingTaskRunner
                             );
 
                           }
-                          final File taskDir = new File(baseDirForTask, task.getId());
+                          final File taskDir = new File(storageSlot.getDirectory(), task.getId());
 
                           final String attemptUUID = UUID.randomUUID().toString();
                           final File attemptDir = new File(taskDir, attemptUUID);
@@ -212,7 +212,11 @@ public class ThreadingTaskRunner
                                   .setName(StringUtils.format("[%s]-%s", task.getId(), priorThreadName));
 
                             TaskStatus taskStatus;
-                            final TaskToolbox toolbox = toolboxFactory.build(baseDirForTask, task);
+                            final TaskToolbox toolbox = toolboxFactory.build(
+                                config -> config.withBaseTaskDir(storageSlot.getDirectory())
+                                                .withTmpStorageBytesPerTask(storageSlot.getNumBytes()),
+                                task
+                            );
                             TaskRunnerUtils.notifyLocationChanged(listeners, task.getId(), taskLocation);
                             TaskRunnerUtils.notifyStatusChanged(
                                 listeners,
@@ -225,10 +229,13 @@ public class ThreadingTaskRunner
                               taskStatus = task.run(toolbox);
                             }
                             catch (Throwable t) {
-                              LOGGER.error(t, "Exception caught while running the task.");
+                              LOGGER.error(t, "Exception caught while running task [%s].", task.getId());
                               taskStatus = TaskStatus.failure(
                                   task.getId(),
-                                  "Failed with an exception. See indexer logs for more details."
+                                  StringUtils.format(
+                                      "Failed with exception [%s]. See indexer logs for details.",
+                                      t.getMessage()
+                                  )
                               );
                             }
                             finally {
@@ -258,6 +265,8 @@ public class ThreadingTaskRunner
                                 }
                               }
 
+                              getTracker().returnStorageSlot(storageSlot);
+
                               try {
                                 if (!stopping && taskDir.exists()) {
                                   FileUtils.deleteDirectory(taskDir);
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/TaskStorageDirTrackerTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/TaskStorageDirTrackerTest.java
index adcbff8000..602033c796 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/common/TaskStorageDirTrackerTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/TaskStorageDirTrackerTest.java
@@ -23,6 +23,8 @@ import com.google.common.collect.ImmutableList;
 import org.apache.druid.indexing.common.config.TaskConfigBuilder;
 import org.apache.druid.indexing.worker.config.WorkerConfig;
 import org.apache.druid.java.util.common.FileUtils;
+import org.apache.druid.java.util.common.IAE;
+import org.apache.druid.java.util.common.ISE;
 import org.junit.Assert;
 import org.junit.ClassRule;
 import org.junit.Test;
@@ -30,8 +32,9 @@ import org.junit.rules.TemporaryFolder;
 
 import java.io.File;
 import java.io.IOException;
+import java.util.Arrays;
 import java.util.List;
-import java.util.Objects;
+import java.util.Map;
 import java.util.stream.Collectors;
 
 public class TaskStorageDirTrackerTest
@@ -48,45 +51,66 @@ public class TaskStorageDirTrackerTest
         new File(tmpFolder, "B"),
         new File(tmpFolder, "C")
     );
+    final int workerCapacity = 7;
+    final int baseTaskDirSize = 100_000_000;
 
-    final TaskStorageDirTracker tracker = new TaskStorageDirTracker(files);
+    final TaskStorageDirTracker tracker = TaskStorageDirTracker.fromBaseDirs(files, workerCapacity, baseTaskDirSize);
     tracker.ensureDirectories();
 
     validateRoundRobinAllocation(tmpFolder, tracker);
-    for (File file : Objects.requireNonNull(tmpFolder.listFiles())) {
-      FileUtils.deleteDirectory(file);
-    }
+    StorageSlotVerifier verifier = new StorageSlotVerifier(tmpFolder).setExpectedSize(33_333_333L);
+    verifier.validateException(tracker, "eighth-task");
+
+    tracker.returnStorageSlot(tracker.pickStorageSlot("task3"));
+    verifier.validate(tracker.pickStorageSlot("eighth-task"), "A", "slot2");
 
     final TaskStorageDirTracker otherTracker = TaskStorageDirTracker.fromConfigs(
-        new WorkerConfig()
-        {
-          @Override
-          public List<String> getBaseTaskDirs()
-          {
-            return files.stream().map(File::toString).collect(Collectors.toList());
-          }
-        },
+        new WorkerConfig().cloneBuilder()
+                          .setCapacity(workerCapacity)
+                          .setBaseTaskDirSize(baseTaskDirSize)
+                          .setBaseTaskDirs(files.stream().map(File::toString).collect(Collectors.toList()))
+                          .build(),
         null
     );
     otherTracker.ensureDirectories();
     validateRoundRobinAllocation(tmpFolder, otherTracker);
+    verifier = new StorageSlotVerifier(tmpFolder).setExpectedSize(33_333_333L);
+    verifier.validateException(otherTracker, "eighth-task");
+
+    otherTracker.returnStorageSlot(otherTracker.pickStorageSlot("task3"));
+    verifier.validate(otherTracker.pickStorageSlot("eighth-task"), "A", "slot2");
+
+    final IAE iae = Assert.assertThrows(
+        IAE.class,
+        () -> tracker.returnStorageSlot(otherTracker.pickStorageSlot("eighth-task"))
+    );
+    Assert.assertEquals(
+        "Cannot return storage slot for task [eighth-task] that I don't own.",
+        iae.getMessage()
+    );
   }
 
-  private void validateRoundRobinAllocation(File tmpFolder, TaskStorageDirTracker dirTracker) throws IOException
+  private void validateRoundRobinAllocation(File tmpFolder, TaskStorageDirTracker dirTracker)
   {
     // Test round-robin allocation, it starts from "C" and goes "backwards" because the counter is initialized
     // negatively, which modulos to 2 -> 1 -> 0
-    Assert.assertEquals(new File(tmpFolder, "C").toString(), dirTracker.pickBaseDir("task0").getPath());
-    Assert.assertEquals(new File(tmpFolder, "B").toString(), dirTracker.pickBaseDir("task1").getPath());
-    Assert.assertEquals(new File(tmpFolder, "A").toString(), dirTracker.pickBaseDir("task2").getPath());
-    Assert.assertEquals(new File(tmpFolder, "C").toString(), dirTracker.pickBaseDir("task3").getPath());
-    Assert.assertEquals(new File(tmpFolder, "B").toString(), dirTracker.pickBaseDir("task4").getPath());
-    Assert.assertEquals(new File(tmpFolder, "A").toString(), dirTracker.pickBaseDir("task5").getPath());
-
-    // Test that the result is always the same
-    FileUtils.mkdirp(new File(new File(tmpFolder, "C"), "task0"));
+    StorageSlotVerifier verifier = new StorageSlotVerifier(tmpFolder).setExpectedSize(33_333_333L);
+    verifier.validate(dirTracker.pickStorageSlot("task0"), "C", "slot0");
+    verifier.validate(dirTracker.pickStorageSlot("task1"), "B", "slot0");
+    verifier.validate(dirTracker.pickStorageSlot("task2"), "A", "slot0");
+    verifier.validate(dirTracker.pickStorageSlot("task3"), "A", "slot2");
+    verifier.validate(dirTracker.pickStorageSlot("task4"), "C", "slot1");
+    verifier.validate(dirTracker.pickStorageSlot("task5"), "B", "slot1");
+    verifier.validate(dirTracker.pickStorageSlot("task6"), "A", "slot1");
+
     for (int i = 0; i < 10; i++) {
-      Assert.assertEquals(new File(tmpFolder, "C").toString(), dirTracker.pickBaseDir("task0").getPath());
+      verifier.validate(dirTracker.pickStorageSlot("task0"), "C", "slot0");
+      verifier.validate(dirTracker.pickStorageSlot("task1"), "B", "slot0");
+      verifier.validate(dirTracker.pickStorageSlot("task2"), "A", "slot0");
+      verifier.validate(dirTracker.pickStorageSlot("task3"), "A", "slot2");
+      verifier.validate(dirTracker.pickStorageSlot("task4"), "C", "slot1");
+      verifier.validate(dirTracker.pickStorageSlot("task5"), "B", "slot1");
+      verifier.validate(dirTracker.pickStorageSlot("task6"), "A", "slot1");
     }
   }
 
@@ -95,17 +119,158 @@ public class TaskStorageDirTrackerTest
   {
     final File baseDir = new File(TMP.newFolder(), "A");
     final TaskStorageDirTracker tracker = TaskStorageDirTracker.fromConfigs(
-        new WorkerConfig(),
+        new WorkerConfig()
+        {
+          @Override
+          public int getCapacity()
+          {
+            return 6;
+          }
+
+          @Override
+          public long getBaseTaskDirSize()
+          {
+            return 60_000_000;
+          }
+        },
+
         new TaskConfigBuilder().setBaseDir(baseDir.toString()).build()
     );
     tracker.ensureDirectories();
 
-    final String expected = new File(baseDir, "persistent/task").toString();
-    Assert.assertEquals(expected, tracker.pickBaseDir("task0").getPath());
-    Assert.assertEquals(expected, tracker.pickBaseDir("task1").getPath());
-    Assert.assertEquals(expected, tracker.pickBaseDir("task2").getPath());
-    Assert.assertEquals(expected, tracker.pickBaseDir("task3").getPath());
-    Assert.assertEquals(expected, tracker.pickBaseDir("task1").getPath());
-    Assert.assertEquals(expected, tracker.pickBaseDir("task10293721").getPath());
+    StorageSlotVerifier verifier = new StorageSlotVerifier(new File(baseDir, "persistent/task"))
+        .setExpectedSize(10_000_000L);
+    verifier.validate(tracker.pickStorageSlot("task0"), "slot2");
+    verifier.validate(tracker.pickStorageSlot("task1"), "slot1");
+    verifier.validate(tracker.pickStorageSlot("task2"), "slot0");
+    verifier.validate(tracker.pickStorageSlot("task3"), "slot5");
+    verifier.validate(tracker.pickStorageSlot("task4"), "slot4");
+    verifier.validate(tracker.pickStorageSlot("task10293721"), "slot3");
+
+    Assert.assertThrows(
+        ISE.class,
+        () -> tracker.pickStorageSlot("seventh-task")
+    );
+  }
+
+  @Test
+  public void testMoreDirectoriesThanSlots() throws IOException
+  {
+    File tmpFolder = TMP.newFolder();
+    List<File> files = ImmutableList.of(
+        new File(tmpFolder, "A"),
+        new File(tmpFolder, "B"),
+        new File(tmpFolder, "C")
+    );
+    final int workerCapacity = 2;
+    final int baseTaskDirSize = 100_000_000;
+
+    final TaskStorageDirTracker tracker = TaskStorageDirTracker.fromConfigs(
+        new WorkerConfig().cloneBuilder()
+                          .setCapacity(workerCapacity)
+                          .setBaseTaskDirSize(baseTaskDirSize)
+                          .setBaseTaskDirs(files.stream().map(File::toString).collect(Collectors.toList()))
+                          .build(),
+        null
+    );
+    tracker.ensureDirectories();
+    StorageSlotVerifier verifier = new StorageSlotVerifier(tmpFolder).setExpectedSize(100_000_000L);
+    verifier.validate(tracker.pickStorageSlot("task1"), "A", "slot0");
+    verifier.validate(tracker.pickStorageSlot("task2"), "B", "slot0");
+    Assert.assertThrows(
+        ISE.class,
+        () -> tracker.pickStorageSlot("third-task")
+    );
+  }
+
+  @Test
+  public void testMigration() throws IOException
+  {
+    File tmpFolder = TMP.newFolder();
+    List<File> files = ImmutableList.of(new File(tmpFolder, "A"), new File(tmpFolder, "B"));
+
+    TaskStorageDirTracker tracker = TaskStorageDirTracker.fromBaseDirs(files, 4, 100_000_000L);
+    tracker.ensureDirectories();
+
+    // First, set a baseline of what would happen without anything pre-defined.
+    StorageSlotVerifier verifier = new StorageSlotVerifier(tmpFolder).setExpectedSize(50_000_000L);
+    verifier.validate(tracker.pickStorageSlot("task1"), "A", "slot0");
+    verifier.validate(tracker.pickStorageSlot("task2"), "B", "slot1");
+    verifier.validate(tracker.pickStorageSlot("task3"), "A", "slot1");
+    verifier.validate(tracker.pickStorageSlot("task4"), "B", "slot0");
+    verifier.validateException(tracker, "fifth-task");
+
+
+    // Now, clean things up and start with some tasks for migration
+    FileUtils.deleteDirectory(tmpFolder);
+    FileUtils.mkdirp(new File(files.get(0), "task1"));
+    FileUtils.mkdirp(new File(files.get(1), "task3"));
+    FileUtils.mkdirp(new File(new File(files.get(0), "slot0"), "task2"));
+
+
+    tracker = TaskStorageDirTracker.fromBaseDirs(files, 4, 100_000_000L);
+    tracker.ensureDirectories();
+
+    // Remove "A/slot1" so that task3 cannot be moved
+    FileUtils.deleteDirectory(new File(files.get(0), "slot1"));
+
+    final Map<String, TaskStorageDirTracker.StorageSlot> dirs = tracker.findExistingTaskDirs(
+        Arrays.asList("task1", "task2", "task3", "task4")
+    );
+
+    Assert.assertNull(dirs.get("task3"));
+    Assert.assertNull(dirs.get("task4"));
+
+    // Re-create the dirs so that we can actually pick stuff again.
+    tracker.ensureDirectories();
+
+    verifier.validate(dirs.get("task1"), "B", "slot1");
+    verifier.validate(dirs.get("task2"), "A", "slot0");
+    verifier.validate(tracker.pickStorageSlot("task4"), "B", "slot0");
+    verifier.validate(tracker.pickStorageSlot("task5"), "A", "slot1");
+    verifier.validateException(tracker, "fifth-task");
+
+    verifier.validate(tracker.pickStorageSlot("task1"), "B", "slot1");
+    verifier.validate(tracker.pickStorageSlot("task2"), "A", "slot0");
+    verifier.validate(tracker.pickStorageSlot("task4"), "B", "slot0");
+    verifier.validate(tracker.pickStorageSlot("task5"), "A", "slot1");
+  }
+
+  public static class StorageSlotVerifier
+  {
+    private final File baseDir;
+    private Long expectedSize;
+
+    public StorageSlotVerifier(File baseDir)
+    {
+      this.baseDir = baseDir;
+    }
+
+    public StorageSlotVerifier setExpectedSize(Long expectedSize)
+    {
+      this.expectedSize = expectedSize;
+      return this;
+    }
+
+    public TaskStorageDirTracker.StorageSlot validate(TaskStorageDirTracker.StorageSlot slot, String... dirs)
+    {
+      File theFile = baseDir;
+      for (String dir : dirs) {
+        theFile = new File(theFile, dir);
+      }
+      Assert.assertEquals(theFile, slot.getDirectory());
+      if (expectedSize != null) {
+        Assert.assertEquals(expectedSize.longValue(), slot.getNumBytes());
+      }
+      return slot;
+    }
+
+    public void validateException(TaskStorageDirTracker tracker, String taskId)
+    {
+      Assert.assertThrows(
+          ISE.class,
+          () -> tracker.pickStorageSlot(taskId)
+      );
+    }
   }
 }
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/ForkingTaskRunnerTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/ForkingTaskRunnerTest.java
index bdb7c05b27..18980771ef 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/ForkingTaskRunnerTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/ForkingTaskRunnerTest.java
@@ -36,6 +36,8 @@ import org.apache.druid.indexing.overlord.config.ForkingTaskRunnerConfig;
 import org.apache.druid.indexing.worker.config.WorkerConfig;
 import org.apache.druid.jackson.DefaultObjectMapper;
 import org.apache.druid.java.util.common.Pair;
+import org.apache.druid.java.util.common.RE;
+import org.apache.druid.java.util.common.io.Closer;
 import org.apache.druid.server.DruidNode;
 import org.apache.druid.server.log.StartupLoggingConfig;
 import org.apache.druid.tasklogs.NoopTaskLogs;
@@ -45,11 +47,14 @@ import org.junit.Assert;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
-import org.mockito.ArgumentMatchers;
-import org.mockito.Mockito;
 
+import javax.annotation.Nonnull;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
 import java.io.File;
 import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
 import java.nio.file.Paths;
 import java.util.List;
 import java.util.Properties;
@@ -168,7 +173,8 @@ public class ForkingTaskRunnerTest
             "java -cp",
             "/path/to/somewhere:some-jars.jar",
             "/some===file",
-            "/asecretFileNa=me", // this should not be masked but there is not way to know this not a property and probably this is an unrealistic scenario anyways
+            "/asecretFileNa=me",
+            // this should not be masked but there is not way to know this not a property and probably this is an unrealistic scenario anyways
             "-Dsome.property=random",
             "-Dsome.otherproperty = random=random",
             "-Dsome.somesecret = secretvalue",
@@ -195,31 +201,28 @@ public class ForkingTaskRunnerTest
   {
     TaskConfig taskConfig = makeDefaultTaskConfigBuilder()
         .build();
+    final WorkerConfig workerConfig = new WorkerConfig();
     ForkingTaskRunner forkingTaskRunner = new ForkingTaskRunner(
         new ForkingTaskRunnerConfig(),
         taskConfig,
-        new WorkerConfig(),
+        workerConfig,
         new Properties(),
         new NoopTaskLogs(),
         new DefaultObjectMapper(),
         new DruidNode("middleManager", "host", false, 8091, null, true, false),
         new StartupLoggingConfig(),
-        TaskStorageDirTracker.fromConfigs(null, taskConfig)
+        TaskStorageDirTracker.fromConfigs(workerConfig, taskConfig)
     )
     {
       @Override
       ProcessHolder runTaskProcess(List<String> command, File logFile, TaskLocation taskLocation)
       {
-        ProcessHolder processHolder = Mockito.mock(ProcessHolder.class);
-        Mockito.doNothing().when(processHolder).registerWithCloser(ArgumentMatchers.any());
-        Mockito.doNothing().when(processHolder).shutdown();
-        return processHolder;
+        return makeTestProcessHolder(logFile, taskLocation);
       }
 
       @Override
       int waitForTaskProcessToComplete(Task task, ProcessHolder processHolder, File logFile, File reportsFile)
       {
-        WorkerConfig workerConfig = new WorkerConfig();
         Assert.assertEquals(1L, (long) this.getWorkerUsedTaskSlotCount());
         Assert.assertEquals(workerConfig.getCapacity(), (long) this.getWorkerTotalTaskSlotCount());
         Assert.assertEquals(workerConfig.getCapacity() - 1, (long) this.getWorkerIdleTaskSlotCount());
@@ -250,40 +253,38 @@ public class ForkingTaskRunnerTest
     TaskConfig taskConfig = makeDefaultTaskConfigBuilder()
         .setBaseTaskDir(file.toString())
         .build();
+    final WorkerConfig workerConfig = new WorkerConfig();
     ForkingTaskRunner forkingTaskRunner = new ForkingTaskRunner(
         new ForkingTaskRunnerConfig(),
         taskConfig,
-        new WorkerConfig(),
+        workerConfig,
         new Properties(),
         new NoopTaskLogs(),
         mapper,
         new DruidNode("middleManager", "host", false, 8091, null, true, false),
         new StartupLoggingConfig(),
-        TaskStorageDirTracker.fromConfigs(null, taskConfig)
+        TaskStorageDirTracker.fromConfigs(workerConfig, taskConfig)
     )
     {
       @Override
       ProcessHolder runTaskProcess(List<String> command, File logFile, TaskLocation taskLocation) throws IOException
       {
-        ProcessHolder processHolder = Mockito.mock(ProcessHolder.class);
-        Mockito.doNothing().when(processHolder).registerWithCloser(ArgumentMatchers.any());
-        Mockito.doNothing().when(processHolder).shutdown();
-
         for (String param : command) {
           if (param.endsWith(task.getId())) {
-            File resultFile = Paths.get(getTracker().findExistingTaskDir(task.getId()).getAbsolutePath(), "attempt", "1", "status.json").toFile();
+            // pickStorageSlot should pick the same slot as what ForkingTaskRunner already picked
+            final String basePath = getTracker().pickStorageSlot(task.getId()).getDirectory().getAbsolutePath();
+            File resultFile = Paths.get(basePath, task.getId(), "attempt", "1", "status.json").toFile();
             mapper.writeValue(resultFile, TaskStatus.success(task.getId()));
             break;
           }
         }
 
-        return processHolder;
+        return makeTestProcessHolder(logFile, taskLocation);
       }
 
       @Override
       int waitForTaskProcessToComplete(Task task, ProcessHolder processHolder, File logFile, File reportsFile)
       {
-        WorkerConfig workerConfig = new WorkerConfig();
         Assert.assertEquals(1L, (long) this.getWorkerUsedTaskSlotCount());
         Assert.assertEquals(workerConfig.getCapacity(), (long) this.getWorkerTotalTaskSlotCount());
         Assert.assertEquals(workerConfig.getCapacity() - 1, (long) this.getWorkerIdleTaskSlotCount());
@@ -310,11 +311,12 @@ public class ForkingTaskRunnerTest
     TaskConfig taskConfig = makeDefaultTaskConfigBuilder()
         .setBaseTaskDir(file.toString())
         .build();
-    TaskStorageDirTracker dirTracker = TaskStorageDirTracker.fromConfigs(null, taskConfig);
+    final WorkerConfig workerConfig = new WorkerConfig();
+    TaskStorageDirTracker dirTracker = TaskStorageDirTracker.fromConfigs(workerConfig, taskConfig);
     ForkingTaskRunner forkingTaskRunner = new ForkingTaskRunner(
         new ForkingTaskRunnerConfig(),
         taskConfig,
-        new WorkerConfig(),
+        workerConfig,
         new Properties(),
         new NoopTaskLogs(),
         mapper,
@@ -326,19 +328,16 @@ public class ForkingTaskRunnerTest
       @Override
       ProcessHolder runTaskProcess(List<String> command, File logFile, TaskLocation taskLocation) throws IOException
       {
-        ProcessHolder processHolder = Mockito.mock(ProcessHolder.class);
-        Mockito.doNothing().when(processHolder).registerWithCloser(ArgumentMatchers.any());
-        Mockito.doNothing().when(processHolder).shutdown();
-
         for (String param : command) {
           if (param.endsWith(task.getId())) {
-            File resultFile = Paths.get(dirTracker.findExistingTaskDir(task.getId()).getAbsolutePath(), "attempt", "1", "status.json").toFile();
+            final String basePath = getTracker().pickStorageSlot(task.getId()).getDirectory().getAbsolutePath();
+            File resultFile = Paths.get(basePath, task.getId(), "attempt", "1", "status.json").toFile();
             mapper.writeValue(resultFile, TaskStatus.failure(task.getId(), "task failure test"));
             break;
           }
         }
 
-        return processHolder;
+        return makeTestProcessHolder(logFile, taskLocation);
       }
 
       @Override
@@ -361,11 +360,20 @@ public class ForkingTaskRunnerTest
     TaskConfig taskConfig = makeDefaultTaskConfigBuilder()
         .setBaseTaskDir(file.toString())
         .build();
-    TaskStorageDirTracker dirTracker = TaskStorageDirTracker.fromConfigs(null, taskConfig);
+    TaskStorageDirTracker dirTracker = TaskStorageDirTracker.fromConfigs(new WorkerConfig(), taskConfig);
     String taskId = "foo";
-    assertEquals(1, ForkingTaskRunner.getNextAttemptID(new File(dirTracker.pickBaseDir(taskId), taskId)));
-    assertEquals(2, ForkingTaskRunner.getNextAttemptID(new File(dirTracker.pickBaseDir(taskId), taskId)));
-    assertEquals(3, ForkingTaskRunner.getNextAttemptID(new File(dirTracker.pickBaseDir(taskId), taskId)));
+    assertEquals(
+        1,
+        ForkingTaskRunner.getNextAttemptID(new File(dirTracker.pickStorageSlot(taskId).getDirectory(), taskId))
+    );
+    assertEquals(
+        2,
+        ForkingTaskRunner.getNextAttemptID(new File(dirTracker.pickStorageSlot(taskId).getDirectory(), taskId))
+    );
+    assertEquals(
+        3,
+        ForkingTaskRunner.getNextAttemptID(new File(dirTracker.pickStorageSlot(taskId).getDirectory(), taskId))
+    );
   }
 
   @Test
@@ -392,16 +400,17 @@ public class ForkingTaskRunnerTest
     final AtomicInteger xmxJavaOptsArrayIndex = new AtomicInteger(-1);
     TaskConfig taskConfig = makeDefaultTaskConfigBuilder()
         .build();
+    final WorkerConfig workerConfig = new WorkerConfig();
     ForkingTaskRunner forkingTaskRunner = new ForkingTaskRunner(
         new ForkingTaskRunnerConfig(),
         taskConfig,
-        new WorkerConfig(),
+        workerConfig,
         new Properties(),
         new NoopTaskLogs(),
         mapper,
         new DruidNode("middleManager", "host", false, 8091, null, true, false),
         new StartupLoggingConfig(),
-        TaskStorageDirTracker.fromConfigs(null, taskConfig)
+        TaskStorageDirTracker.fromConfigs(workerConfig, taskConfig)
     )
     {
       @Override
@@ -410,7 +419,7 @@ public class ForkingTaskRunnerTest
         xmxJavaOptsIndex.set(command.indexOf("-Xmx1g"));
         xmxJavaOptsArrayIndex.set(command.indexOf("-Xmx10g"));
 
-        return Mockito.mock(ProcessHolder.class);
+        return makeTestProcessHolder(logFile, taskLocation);
       }
 
       @Override
@@ -447,28 +456,28 @@ public class ForkingTaskRunnerTest
     final Task task = OBJECT_MAPPER.readValue(taskContent, NoopTask.class);
     TaskConfig taskConfig = makeDefaultTaskConfigBuilder()
         .build();
+    final WorkerConfig workerConfig = new WorkerConfig();
     ForkingTaskRunner forkingTaskRunner = new ForkingTaskRunner(
         new ForkingTaskRunnerConfig(),
         taskConfig,
-        new WorkerConfig(),
+        workerConfig,
         new Properties(),
         new NoopTaskLogs(),
         mapper,
         new DruidNode("middleManager", "host", false, 8091, null, true, false),
         new StartupLoggingConfig(),
-        TaskStorageDirTracker.fromConfigs(null, taskConfig)
+        TaskStorageDirTracker.fromConfigs(workerConfig, taskConfig)
     )
     {
       @Override
       ProcessHolder runTaskProcess(List<String> command, File logFile, TaskLocation taskLocation)
       {
-        return Mockito.mock(ProcessHolder.class);
+        return makeTestProcessHolder(logFile, taskLocation);
       }
 
       @Override
       int waitForTaskProcessToComplete(Task task, ProcessHolder processHolder, File logFile, File reportsFile)
       {
-        WorkerConfig workerConfig = new WorkerConfig();
         Assert.assertEquals(1L, (long) this.getWorkerUsedTaskSlotCount());
         Assert.assertEquals(workerConfig.getCapacity(), (long) this.getWorkerTotalTaskSlotCount());
         Assert.assertEquals(workerConfig.getCapacity() - 1, (long) this.getWorkerIdleTaskSlotCount());
@@ -481,7 +490,9 @@ public class ForkingTaskRunnerTest
     forkingTaskRunner.setNumProcessorsPerTask();
     ExecutionException e = Assert.assertThrows(ExecutionException.class, () -> forkingTaskRunner.run(task).get());
     Assert.assertTrue(e.getMessage().endsWith(ForkingTaskRunnerConfig.JAVA_OPTS_ARRAY_PROPERTY
-                                              + " in context of task: " + task.getId() + " must be an array of strings.")
+                                              + " in context of task: "
+                                              + task.getId()
+                                              + " must be an array of strings.")
     );
     Assert.assertEquals(1L, (long) forkingTaskRunner.getWorkerFailedTaskCount());
     Assert.assertEquals(0L, (long) forkingTaskRunner.getWorkerSuccessfulTaskCount());
@@ -493,11 +504,13 @@ public class ForkingTaskRunnerTest
     TaskConfig taskConfig = makeDefaultTaskConfigBuilder()
         .build();
 
-    TaskStorageDirTracker dirTracker = new TaskStorageDirTracker(
+    TaskStorageDirTracker dirTracker = TaskStorageDirTracker.fromBaseDirs(
         ImmutableList.of(
             temporaryFolder.newFolder().getAbsoluteFile(),
             temporaryFolder.newFolder().getAbsoluteFile()
-        )
+        ),
+        1,
+        100_000_000_000_000_000L
     );
     ForkingTaskRunner forkingTaskRunner = new ForkingTaskRunner(
         new ForkingTaskRunnerConfig(),
@@ -514,12 +527,8 @@ public class ForkingTaskRunnerTest
       @Override
       ProcessHolder runTaskProcess(List<String> command, File logFile, TaskLocation taskLocation)
       {
-        ProcessHolder processHolder = Mockito.mock(ProcessHolder.class);
-        Mockito.doNothing().when(processHolder).registerWithCloser(ArgumentMatchers.any());
-        Mockito.doNothing().when(processHolder).shutdown();
-        return processHolder;
+        return makeTestProcessHolder(logFile, taskLocation);
       }
-
     };
 
     forkingTaskRunner.setNumProcessorsPerTask();
@@ -537,4 +546,63 @@ public class ForkingTaskRunnerTest
         .setShuffleDataLocations(ImmutableList.of())
         .setBatchProcessingMode(TaskConfig.BATCH_PROCESSING_MODE_DEFAULT.name());
   }
+
+  @Nonnull
+  private ForkingTaskRunner.ProcessHolder makeTestProcessHolder(File logFile, TaskLocation taskLocation)
+  {
+    return new ForkingTaskRunner.ProcessHolder(new MockTestProcess(), logFile, taskLocation);
+  }
+
+  private static class MockTestProcess extends Process
+  {
+
+    private final ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
+    private final ByteArrayInputStream inputStream = new ByteArrayInputStream(new byte[1024]);
+    private final ByteArrayInputStream errorStream = new ByteArrayInputStream(new byte[1024]);
+
+    @Override
+    public OutputStream getOutputStream()
+    {
+      return outputStream;
+    }
+
+    @Override
+    public InputStream getInputStream()
+    {
+      return inputStream;
+    }
+
+    @Override
+    public InputStream getErrorStream()
+    {
+      return errorStream;
+    }
+
+    @Override
+    public int waitFor()
+    {
+      throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public int exitValue()
+    {
+      throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public void destroy()
+    {
+      final Closer closer = Closer.create();
+      closer.register(outputStream);
+      closer.register(inputStream);
+      closer.register(errorStream);
+      try {
+        closer.close();
+      }
+      catch (IOException e) {
+        throw new RE(e);
+      }
+    }
+  }
 }
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TestTaskToolboxFactory.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TestTaskToolboxFactory.java
new file mode 100644
index 0000000000..77491f7d4b
--- /dev/null
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TestTaskToolboxFactory.java
@@ -0,0 +1,388 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.overlord;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.inject.Provider;
+import org.apache.druid.client.cache.Cache;
+import org.apache.druid.client.cache.CacheConfig;
+import org.apache.druid.client.cache.CachePopulatorStats;
+import org.apache.druid.client.coordinator.CoordinatorClient;
+import org.apache.druid.discovery.DataNodeService;
+import org.apache.druid.discovery.DruidNodeAnnouncer;
+import org.apache.druid.discovery.LookupNodeService;
+import org.apache.druid.indexing.common.SegmentCacheManagerFactory;
+import org.apache.druid.indexing.common.TaskReportFileWriter;
+import org.apache.druid.indexing.common.TaskToolboxFactory;
+import org.apache.druid.indexing.common.actions.TaskActionClientFactory;
+import org.apache.druid.indexing.common.config.TaskConfig;
+import org.apache.druid.indexing.common.config.TaskConfigBuilder;
+import org.apache.druid.indexing.common.task.NoopTestTaskReportFileWriter;
+import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexSupervisorTaskClientProvider;
+import org.apache.druid.indexing.common.task.batch.parallel.ShuffleClient;
+import org.apache.druid.indexing.worker.shuffle.IntermediaryDataManager;
+import org.apache.druid.java.util.emitter.service.ServiceEmitter;
+import org.apache.druid.java.util.metrics.MonitorScheduler;
+import org.apache.druid.query.QueryProcessingPool;
+import org.apache.druid.query.QueryRunnerFactoryConglomerate;
+import org.apache.druid.rpc.indexing.OverlordClient;
+import org.apache.druid.segment.IndexIO;
+import org.apache.druid.segment.IndexMergerV9Factory;
+import org.apache.druid.segment.TestHelper;
+import org.apache.druid.segment.handoff.SegmentHandoffNotifierFactory;
+import org.apache.druid.segment.incremental.RowIngestionMetersFactory;
+import org.apache.druid.segment.join.JoinableFactory;
+import org.apache.druid.segment.loading.DataSegmentArchiver;
+import org.apache.druid.segment.loading.DataSegmentKiller;
+import org.apache.druid.segment.loading.DataSegmentMover;
+import org.apache.druid.segment.loading.DataSegmentPusher;
+import org.apache.druid.segment.realtime.appenderator.AppenderatorsManager;
+import org.apache.druid.segment.realtime.firehose.ChatHandlerProvider;
+import org.apache.druid.segment.writeout.OnHeapMemorySegmentWriteOutMediumFactory;
+import org.apache.druid.server.DruidNode;
+import org.apache.druid.server.coordination.DataSegmentAnnouncer;
+import org.apache.druid.server.coordination.DataSegmentServerAnnouncer;
+import org.apache.druid.server.security.AuthorizerMapper;
+import org.apache.druid.tasklogs.TaskLogPusher;
+
+public class TestTaskToolboxFactory extends TaskToolboxFactory
+{
+  /**
+   * We use a constructor that takes a builder instead of having the builder build the object so that
+   * implementations can override methods on this class if they need to.
+   *
+   * @param bob the builder
+   */
+  public TestTaskToolboxFactory(
+      Builder bob
+  )
+  {
+    super(
+        bob.config,
+        bob.taskExecutorNode,
+        bob.taskActionClientFactory,
+        bob.emitter,
+        bob.segmentPusher,
+        bob.dataSegmentKiller,
+        bob.dataSegmentMover,
+        bob.dataSegmentArchiver,
+        bob.segmentAnnouncer,
+        bob.serverAnnouncer,
+        bob.handoffNotifierFactory,
+        bob.queryRunnerFactoryConglomerateProvider,
+        bob.queryProcessingPool,
+        bob.joinableFactory,
+        bob.monitorSchedulerProvider,
+        bob.segmentCacheManagerFactory,
+        bob.jsonMapper,
+        bob.indexIO,
+        bob.cache,
+        bob.cacheConfig,
+        bob.cachePopulatorStats,
+        bob.indexMergerV9Factory,
+        bob.druidNodeAnnouncer,
+        bob.druidNode,
+        bob.lookupNodeService,
+        bob.dataNodeService,
+        bob.taskReportFileWriter,
+        bob.intermediaryDataManager,
+        bob.authorizerMapper,
+        bob.chatHandlerProvider,
+        bob.rowIngestionMetersFactory,
+        bob.appenderatorsManager,
+        bob.overlordClient,
+        bob.coordinatorClient,
+        bob.supervisorTaskClientProvider,
+        bob.shuffleClient,
+        bob.taskLogPusher,
+        bob.attemptId
+    );
+  }
+
+  public static class Builder
+  {
+    private TaskConfig config = new TaskConfigBuilder().build();
+    private DruidNode taskExecutorNode;
+    private TaskActionClientFactory taskActionClientFactory = task -> null;
+    private ServiceEmitter emitter;
+    private DataSegmentPusher segmentPusher;
+    private DataSegmentKiller dataSegmentKiller;
+    private DataSegmentMover dataSegmentMover;
+    private DataSegmentArchiver dataSegmentArchiver;
+    private DataSegmentAnnouncer segmentAnnouncer;
+    private DataSegmentServerAnnouncer serverAnnouncer;
+    private SegmentHandoffNotifierFactory handoffNotifierFactory;
+    private Provider<QueryRunnerFactoryConglomerate> queryRunnerFactoryConglomerateProvider;
+    private QueryProcessingPool queryProcessingPool;
+    private JoinableFactory joinableFactory;
+    private Provider<MonitorScheduler> monitorSchedulerProvider;
+    private ObjectMapper jsonMapper = TestHelper.JSON_MAPPER;
+    private IndexIO indexIO = TestHelper.getTestIndexIO();
+    private SegmentCacheManagerFactory segmentCacheManagerFactory = new SegmentCacheManagerFactory(jsonMapper);
+    private Cache cache;
+    private CacheConfig cacheConfig;
+    private CachePopulatorStats cachePopulatorStats;
+    private IndexMergerV9Factory indexMergerV9Factory = new IndexMergerV9Factory(jsonMapper, indexIO, OnHeapMemorySegmentWriteOutMediumFactory.instance());
+    private DruidNodeAnnouncer druidNodeAnnouncer;
+    private DruidNode druidNode;
+    private LookupNodeService lookupNodeService;
+    private DataNodeService dataNodeService;
+    private TaskReportFileWriter taskReportFileWriter = new NoopTestTaskReportFileWriter();
+    private IntermediaryDataManager intermediaryDataManager;
+    private AuthorizerMapper authorizerMapper;
+    private ChatHandlerProvider chatHandlerProvider;
+    private RowIngestionMetersFactory rowIngestionMetersFactory;
+    private AppenderatorsManager appenderatorsManager;
+    private OverlordClient overlordClient;
+    private CoordinatorClient coordinatorClient;
+    private ParallelIndexSupervisorTaskClientProvider supervisorTaskClientProvider;
+    private ShuffleClient shuffleClient;
+    private TaskLogPusher taskLogPusher;
+    private String attemptId;
+
+    public Builder setConfig(TaskConfig config)
+    {
+      this.config = config;
+      return this;
+    }
+
+    public Builder setTaskExecutorNode(DruidNode taskExecutorNode)
+    {
+      this.taskExecutorNode = taskExecutorNode;
+      return this;
+    }
+
+    public Builder setTaskActionClientFactory(TaskActionClientFactory taskActionClientFactory)
+    {
+      this.taskActionClientFactory = taskActionClientFactory;
+      return this;
+    }
+
+    public Builder setEmitter(ServiceEmitter emitter)
+    {
+      this.emitter = emitter;
+      return this;
+    }
+
+    public Builder setSegmentPusher(DataSegmentPusher segmentPusher)
+    {
+      this.segmentPusher = segmentPusher;
+      return this;
+    }
+
+    public Builder setDataSegmentKiller(DataSegmentKiller dataSegmentKiller)
+    {
+      this.dataSegmentKiller = dataSegmentKiller;
+      return this;
+    }
+
+    public Builder setDataSegmentMover(DataSegmentMover dataSegmentMover)
+    {
+      this.dataSegmentMover = dataSegmentMover;
+      return this;
+    }
+
+    public Builder setDataSegmentArchiver(DataSegmentArchiver dataSegmentArchiver)
+    {
+      this.dataSegmentArchiver = dataSegmentArchiver;
+      return this;
+    }
+
+    public Builder setSegmentAnnouncer(DataSegmentAnnouncer segmentAnnouncer)
+    {
+      this.segmentAnnouncer = segmentAnnouncer;
+      return this;
+    }
+
+    public Builder setServerAnnouncer(DataSegmentServerAnnouncer serverAnnouncer)
+    {
+      this.serverAnnouncer = serverAnnouncer;
+      return this;
+    }
+
+    public Builder setHandoffNotifierFactory(SegmentHandoffNotifierFactory handoffNotifierFactory)
+    {
+      this.handoffNotifierFactory = handoffNotifierFactory;
+      return this;
+    }
+
+    public Builder setQueryRunnerFactoryConglomerateProvider(Provider<QueryRunnerFactoryConglomerate> queryRunnerFactoryConglomerateProvider)
+    {
+      this.queryRunnerFactoryConglomerateProvider = queryRunnerFactoryConglomerateProvider;
+      return this;
+    }
+
+    public Builder setQueryProcessingPool(QueryProcessingPool queryProcessingPool)
+    {
+      this.queryProcessingPool = queryProcessingPool;
+      return this;
+    }
+
+    public Builder setJoinableFactory(JoinableFactory joinableFactory)
+    {
+      this.joinableFactory = joinableFactory;
+      return this;
+    }
+
+    public Builder setMonitorSchedulerProvider(Provider<MonitorScheduler> monitorSchedulerProvider)
+    {
+      this.monitorSchedulerProvider = monitorSchedulerProvider;
+      return this;
+    }
+
+    public Builder setSegmentCacheManagerFactory(SegmentCacheManagerFactory segmentCacheManagerFactory)
+    {
+      this.segmentCacheManagerFactory = segmentCacheManagerFactory;
+      return this;
+    }
+
+    public Builder setJsonMapper(ObjectMapper jsonMapper)
+    {
+      this.jsonMapper = jsonMapper;
+      return this;
+    }
+
+    public Builder setIndexIO(IndexIO indexIO)
+    {
+      this.indexIO = indexIO;
+      return this;
+    }
+
+    public Builder setCache(Cache cache)
+    {
+      this.cache = cache;
+      return this;
+    }
+
+    public Builder setCacheConfig(CacheConfig cacheConfig)
+    {
+      this.cacheConfig = cacheConfig;
+      return this;
+    }
+
+    public Builder setCachePopulatorStats(CachePopulatorStats cachePopulatorStats)
+    {
+      this.cachePopulatorStats = cachePopulatorStats;
+      return this;
+    }
+
+    public Builder setIndexMergerV9Factory(IndexMergerV9Factory indexMergerV9Factory)
+    {
+      this.indexMergerV9Factory = indexMergerV9Factory;
+      return this;
+    }
+
+    public Builder setDruidNodeAnnouncer(DruidNodeAnnouncer druidNodeAnnouncer)
+    {
+      this.druidNodeAnnouncer = druidNodeAnnouncer;
+      return this;
+    }
+
+    public Builder setDruidNode(DruidNode druidNode)
+    {
+      this.druidNode = druidNode;
+      return this;
+    }
+
+    public Builder setLookupNodeService(LookupNodeService lookupNodeService)
+    {
+      this.lookupNodeService = lookupNodeService;
+      return this;
+    }
+
+    public Builder setDataNodeService(DataNodeService dataNodeService)
+    {
+      this.dataNodeService = dataNodeService;
+      return this;
+    }
+
+    public Builder setTaskReportFileWriter(TaskReportFileWriter taskReportFileWriter)
+    {
+      this.taskReportFileWriter = taskReportFileWriter;
+      return this;
+    }
+
+    public Builder setIntermediaryDataManager(IntermediaryDataManager intermediaryDataManager)
+    {
+      this.intermediaryDataManager = intermediaryDataManager;
+      return this;
+    }
+
+    public Builder setAuthorizerMapper(AuthorizerMapper authorizerMapper)
+    {
+      this.authorizerMapper = authorizerMapper;
+      return this;
+    }
+
+    public Builder setChatHandlerProvider(ChatHandlerProvider chatHandlerProvider)
+    {
+      this.chatHandlerProvider = chatHandlerProvider;
+      return this;
+    }
+
+    public Builder setRowIngestionMetersFactory(RowIngestionMetersFactory rowIngestionMetersFactory)
+    {
+      this.rowIngestionMetersFactory = rowIngestionMetersFactory;
+      return this;
+    }
+
+    public Builder setAppenderatorsManager(AppenderatorsManager appenderatorsManager)
+    {
+      this.appenderatorsManager = appenderatorsManager;
+      return this;
+    }
+
+    public Builder setOverlordClient(OverlordClient overlordClient)
+    {
+      this.overlordClient = overlordClient;
+      return this;
+    }
+
+    public Builder setCoordinatorClient(CoordinatorClient coordinatorClient)
+    {
+      this.coordinatorClient = coordinatorClient;
+      return this;
+    }
+
+    public Builder setSupervisorTaskClientProvider(ParallelIndexSupervisorTaskClientProvider supervisorTaskClientProvider)
+    {
+      this.supervisorTaskClientProvider = supervisorTaskClientProvider;
+      return this;
+    }
+
+    public Builder setShuffleClient(ShuffleClient shuffleClient)
+    {
+      this.shuffleClient = shuffleClient;
+      return this;
+    }
+
+    public Builder setTaskLogPusher(TaskLogPusher taskLogPusher)
+    {
+      this.taskLogPusher = taskLogPusher;
+      return this;
+    }
+
+    public Builder setAttemptId(String attemptId)
+    {
+      this.attemptId = attemptId;
+      return this;
+    }
+  }
+}
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/ThreadingTaskRunnerTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/ThreadingTaskRunnerTest.java
index 95d2688d6e..0200a1e957 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/ThreadingTaskRunnerTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/ThreadingTaskRunnerTest.java
@@ -35,8 +35,6 @@ import org.apache.druid.server.DruidNode;
 import org.apache.druid.tasklogs.NoopTaskLogs;
 import org.junit.Assert;
 import org.junit.Test;
-import org.mockito.ArgumentMatchers;
-import org.mockito.Mockito;
 
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
@@ -48,16 +46,17 @@ public class ThreadingTaskRunnerTest
   public void testTaskStatusWhenTaskThrowsExceptionWhileRunning() throws ExecutionException, InterruptedException
   {
     final TaskConfig taskConfig = ForkingTaskRunnerTest.makeDefaultTaskConfigBuilder().build();
+    final WorkerConfig workerConfig = new WorkerConfig();
     ThreadingTaskRunner runner = new ThreadingTaskRunner(
         mockTaskToolboxFactory(),
         taskConfig,
-        new WorkerConfig(),
+        workerConfig,
         new NoopTaskLogs(),
         new DefaultObjectMapper(),
         new TestAppenderatorsManager(),
         new MultipleFileTaskReportFileWriter(),
         new DruidNode("middleManager", "host", false, 8091, null, true, false),
-        TaskStorageDirTracker.fromConfigs(null, taskConfig)
+        TaskStorageDirTracker.fromConfigs(workerConfig, taskConfig)
     );
 
     Future<TaskStatus> statusFuture = runner.run(new AbstractTask("id", "datasource", null)
@@ -89,15 +88,13 @@ public class ThreadingTaskRunnerTest
     TaskStatus status = statusFuture.get();
     Assert.assertEquals(TaskState.FAILED, status.getStatusCode());
     Assert.assertEquals(
-        "Failed with an exception. See indexer logs for more details.",
+        "Failed with exception [Task failure test]. See indexer logs for details.",
         status.getErrorMsg()
     );
   }
 
   private static TaskToolboxFactory mockTaskToolboxFactory()
   {
-    TaskToolboxFactory factory = Mockito.mock(TaskToolboxFactory.class);
-    Mockito.when(factory.build(ArgumentMatchers.any())).thenReturn(Mockito.mock(TaskToolbox.class));
-    return factory;
+    return new TestTaskToolboxFactory(new TestTaskToolboxFactory.Builder());
   }
 }
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/config/RemoteTaskRunnerConfigTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/config/RemoteTaskRunnerConfigTest.java
index bad9c891c4..1cc72346a0 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/config/RemoteTaskRunnerConfigTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/config/RemoteTaskRunnerConfigTest.java
@@ -823,7 +823,7 @@ public class RemoteTaskRunnerConfigTest
         RemoteTaskRunnerConfig.class
     );
     configProvider.inject(props, injector.getBinding(JsonConfigurator.class).getProvider().get());
-    configProvider.get().get();
+    configProvider.get();
   }
 
   @Test
@@ -844,7 +844,7 @@ public class RemoteTaskRunnerConfigTest
         RemoteTaskRunnerConfig.class
     );
     configProvider.inject(props, injector.getBinding(JsonConfigurator.class).getProvider().get());
-    configProvider.get().get();
+    configProvider.get();
   }
 
 
diff --git a/indexing-service/src/test/java/org/apache/druid/server/initialization/IndexerZkConfigTest.java b/indexing-service/src/test/java/org/apache/druid/server/initialization/IndexerZkConfigTest.java
index 8ae8217e49..92ddb1b63c 100644
--- a/indexing-service/src/test/java/org/apache/druid/server/initialization/IndexerZkConfigTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/server/initialization/IndexerZkConfigTest.java
@@ -153,7 +153,7 @@ public class IndexerZkConfigTest
     );
     indexerZkConfig.inject(propertyValues, configurator);
 
-    Assert.assertEquals("/druid/indexer/tasks", indexerZkConfig.get().get().getTasksPath());
+    Assert.assertEquals("/druid/indexer/tasks", indexerZkConfig.get().getTasksPath());
   }
 
   @Test
@@ -175,8 +175,8 @@ public class IndexerZkConfigTest
     indexerZkConfig.inject(propertyValues, configurator);
 
 
-    IndexerZkConfig zkConfig = indexerZkConfig.get().get();
-    ZkPathsConfig zkPathsConfig1 = zkPathsConfig.get().get();
+    IndexerZkConfig zkConfig = indexerZkConfig.get();
+    ZkPathsConfig zkPathsConfig1 = zkPathsConfig.get();
 
     validateEntries(zkConfig);
     validateEntries(zkPathsConfig1);
@@ -207,7 +207,7 @@ public class IndexerZkConfigTest
         IndexerZkConfig.class
     );
     indexerPathsConfig.inject(propertyValues, configurator);
-    IndexerZkConfig indexerZkConfig = indexerPathsConfig.get().get();
+    IndexerZkConfig indexerZkConfig = indexerPathsConfig.get();
 
 
     // Rewind value before we potentially fail
@@ -240,7 +240,7 @@ public class IndexerZkConfigTest
 
     zkPathsConfig.inject(propertyValues, configurator);
 
-    ZkPathsConfig zkPathsConfig1 = zkPathsConfig.get().get();
+    ZkPathsConfig zkPathsConfig1 = zkPathsConfig.get();
 
     IndexerZkConfig indexerZkConfig = new IndexerZkConfig(zkPathsConfig1, null, null, null, null);
 
diff --git a/processing/src/main/java/org/apache/druid/guice/JsonConfigProvider.java b/processing/src/main/java/org/apache/druid/guice/JsonConfigProvider.java
index 8c4e46933b..f3cc1860bd 100644
--- a/processing/src/main/java/org/apache/druid/guice/JsonConfigProvider.java
+++ b/processing/src/main/java/org/apache/druid/guice/JsonConfigProvider.java
@@ -79,7 +79,7 @@ import java.util.Properties;
  * @param <T> type of config object to provide.
  */
 @PublicApi
-public class JsonConfigProvider<T> implements Provider<Supplier<T>>
+public class JsonConfigProvider<T> implements Provider<T>
 {
   @SuppressWarnings("unchecked")
   public static <T> void bind(Binder binder, String propertyBase, Class<T> classToProvide)
@@ -148,8 +148,8 @@ public class JsonConfigProvider<T> implements Provider<Supplier<T>>
       Key<Supplier<T>> supplierKey
   )
   {
-    binder.bind(supplierKey).toProvider(of(propertyBase, clazz)).in(LazySingleton.class);
-    binder.bind(instanceKey).toProvider(new SupplierProvider<>(supplierKey));
+    binder.bind(instanceKey).toProvider(new JsonConfigProvider<>(propertyBase, clazz, null)).in(LazySingleton.class);
+    binder.bind(supplierKey).toProvider(new ProviderBasedGoogleSupplierProvider<>(instanceKey)).in(LazySingleton.class);
   }
 
   public static <T> void bind(
@@ -161,8 +161,10 @@ public class JsonConfigProvider<T> implements Provider<Supplier<T>>
       Key<Supplier<T>> supplierKey
   )
   {
-    binder.bind(supplierKey).toProvider(of(propertyBase, clazz, defaultClass)).in(LazySingleton.class);
-    binder.bind(instanceKey).toProvider(new SupplierProvider<>(supplierKey));
+    binder.bind(instanceKey)
+          .toProvider(new JsonConfigProvider<>(propertyBase, clazz, defaultClass))
+          .in(LazySingleton.class);
+    binder.bind(supplierKey).toProvider(new ProviderBasedGoogleSupplierProvider<>(instanceKey)).in(LazySingleton.class);
   }
 
   @SuppressWarnings("unchecked")
@@ -209,7 +211,7 @@ public class JsonConfigProvider<T> implements Provider<Supplier<T>>
   private Properties props;
   private JsonConfigurator configurator;
 
-  private Supplier<T> retVal = null;
+  private T retVal = null;
 
   public JsonConfigProvider(
       String propertyBase,
@@ -233,23 +235,10 @@ public class JsonConfigProvider<T> implements Provider<Supplier<T>>
   }
 
   @Override
-  public Supplier<T> get()
+  public T get()
   {
-    if (retVal != null) {
-      return retVal;
-    }
-
-    try {
-      final T config = configurator.configurate(props, propertyBase, classToProvide, defaultClass);
-      retVal = Suppliers.ofInstance(config);
-    }
-    catch (RuntimeException e) {
-      // When a runtime exception gets thrown out, this provider will get called again if the object is asked for again.
-      // This will have the same failed result, 'cause when it's called no parameters will have actually changed.
-      // Guice will then report the same error multiple times, which is pretty annoying. Cache a null supplier and
-      // return that instead.  This is technically enforcing a singleton, but such is life.
-      retVal = Suppliers.ofInstance(null);
-      throw e;
+    if (retVal == null) {
+      retVal = configurator.configurate(props, propertyBase, classToProvide, defaultClass);
     }
     return retVal;
   }
diff --git a/processing/src/main/java/org/apache/druid/guice/SupplierProvider.java b/processing/src/main/java/org/apache/druid/guice/ProviderBasedGoogleSupplierProvider.java
similarity index 70%
copy from processing/src/main/java/org/apache/druid/guice/SupplierProvider.java
copy to processing/src/main/java/org/apache/druid/guice/ProviderBasedGoogleSupplierProvider.java
index 107f3d92f4..8f89608c6e 100644
--- a/processing/src/main/java/org/apache/druid/guice/SupplierProvider.java
+++ b/processing/src/main/java/org/apache/druid/guice/ProviderBasedGoogleSupplierProvider.java
@@ -25,30 +25,32 @@ import com.google.inject.Injector;
 import com.google.inject.Key;
 import com.google.inject.Provider;
 
+
 /**
+ * A Provider of a Supplier that uses a Provider to implement the Supplier.
  */
-public class SupplierProvider<T> implements Provider<T>
+public class ProviderBasedGoogleSupplierProvider<T> implements Provider<Supplier<T>>
 {
-  private final Key<Supplier<T>> supplierKey;
-
-  private Provider<Supplier<T>> supplierProvider;
+  private final Key<T> supplierKey;
+  private Provider<T> instanceProvider;
 
-  public SupplierProvider(
-      Key<Supplier<T>> supplierKey
+  public ProviderBasedGoogleSupplierProvider(
+      Key<T> instanceKey
   )
   {
-    this.supplierKey = supplierKey;
+    this.supplierKey = instanceKey;
   }
 
   @Inject
   public void configure(Injector injector)
   {
-    this.supplierProvider = injector.getProvider(supplierKey);
+    this.instanceProvider = injector.getProvider(supplierKey);
   }
 
+
   @Override
-  public T get()
+  public Supplier<T> get()
   {
-    return supplierProvider.get().get();
+    return instanceProvider::get;
   }
 }
diff --git a/processing/src/main/java/org/apache/druid/segment/column/StringValueSetIndex.java b/processing/src/main/java/org/apache/druid/segment/column/StringValueSetIndex.java
index c53c1a9be9..8ceca2d86a 100644
--- a/processing/src/main/java/org/apache/druid/segment/column/StringValueSetIndex.java
+++ b/processing/src/main/java/org/apache/druid/segment/column/StringValueSetIndex.java
@@ -30,7 +30,8 @@ import java.util.SortedSet;
 public interface StringValueSetIndex
 {
   /**
-   * Get the {@link ImmutableBitmap} corresponding to the supplied value
+   * Get the {@link ImmutableBitmap} corresponding to the supplied value.  Generates an empty bitmap when passed a
+   * value that doesn't exist.  Never returns null.
    */
   BitmapColumnIndex forValue(@Nullable String value);
 
diff --git a/processing/src/test/java/org/apache/druid/query/DefaultQueryConfigTest.java b/processing/src/test/java/org/apache/druid/query/DefaultQueryConfigTest.java
index 5f77874a1b..167db0fbc2 100644
--- a/processing/src/test/java/org/apache/druid/query/DefaultQueryConfigTest.java
+++ b/processing/src/test/java/org/apache/druid/query/DefaultQueryConfigTest.java
@@ -45,7 +45,7 @@ public class DefaultQueryConfigTest
     properties.put(propertyPrefix + ".context.joinFilterRewriteMaxSize", "10");
     properties.put(propertyPrefix + ".context.vectorize", "true");
     provider.inject(properties, injector.getInstance(JsonConfigurator.class));
-    final DefaultQueryConfig defaultQueryConfig = provider.get().get();
+    final DefaultQueryConfig defaultQueryConfig = provider.get();
     Assert.assertNotNull(defaultQueryConfig.getContext());
     Assert.assertEquals(2, defaultQueryConfig.getContext().size());
     Assert.assertEquals("10", defaultQueryConfig.getContext().get("joinFilterRewriteMaxSize"));
@@ -63,7 +63,7 @@ public class DefaultQueryConfigTest
     );
     final Properties properties = new Properties();
     provider.inject(properties, injector.getInstance(JsonConfigurator.class));
-    final DefaultQueryConfig defaultQueryConfig = provider.get().get();
+    final DefaultQueryConfig defaultQueryConfig = provider.get();
     Assert.assertNotNull(defaultQueryConfig.getContext());
     Assert.assertEquals(0, defaultQueryConfig.getContext().size());
   }
diff --git a/server/src/main/java/org/apache/druid/indexing/worker/config/WorkerConfig.java b/server/src/main/java/org/apache/druid/indexing/worker/config/WorkerConfig.java
index 452590011f..36b702bfb9 100644
--- a/server/src/main/java/org/apache/druid/indexing/worker/config/WorkerConfig.java
+++ b/server/src/main/java/org/apache/druid/indexing/worker/config/WorkerConfig.java
@@ -44,6 +44,9 @@ public class WorkerConfig
   @Min(1)
   private int capacity = Math.max(1, JvmUtils.getRuntimeInfo().getAvailableProcessors() - 1);
 
+  @JsonProperty
+  private long baseTaskDirSize = Long.MAX_VALUE;
+
   @JsonProperty
   private List<String> baseTaskDirs = null;
 
@@ -60,10 +63,10 @@ public class WorkerConfig
   private Period intermediaryPartitionTimeout = new Period("PT5M");
 
   @JsonProperty
-  private final long globalIngestionHeapLimitBytes = Runtime.getRuntime().maxMemory() / 6;
+  private long globalIngestionHeapLimitBytes = Runtime.getRuntime().maxMemory() / 6;
 
   @JsonProperty
-  private final int numConcurrentMerges = Math.max(1, capacity / 2);
+  private int numConcurrentMerges = Math.max(1, capacity / 2);
 
   public String getIp()
   {
@@ -80,11 +83,23 @@ public class WorkerConfig
     return capacity;
   }
 
+  public WorkerConfig setCapacity(int capacity)
+  {
+    this.capacity = capacity;
+    return this;
+  }
+
+  public long getBaseTaskDirSize()
+  {
+    return baseTaskDirSize;
+  }
+
   public List<String> getBaseTaskDirs()
   {
     return baseTaskDirs;
   }
 
+  @NotNull
   public String getCategory()
   {
     return category;
@@ -114,4 +129,122 @@ public class WorkerConfig
   {
     return numConcurrentMerges;
   }
+
+  public Builder cloneBuilder()
+  {
+    return new Builder(this);
+  }
+
+  public static class Builder
+  {
+    private String ip;
+    private String version;
+    private int capacity;
+    private long baseTaskDirSize;
+    private List<String> baseTaskDirs;
+    private String category;
+    private long intermediaryPartitionDiscoveryPeriodSec;
+    private long intermediaryPartitionCleanupPeriodSec;
+    private Period intermediaryPartitionTimeout;
+    private long globalIngestionHeapLimitBytes;
+    private int numConcurrentMerges;
+
+    private Builder(WorkerConfig input)
+    {
+      this.ip = input.ip;
+      this.version = input.version;
+      this.capacity = input.capacity;
+      this.baseTaskDirSize = input.baseTaskDirSize;
+      this.baseTaskDirs = input.baseTaskDirs;
+      this.category = input.category;
+      this.intermediaryPartitionDiscoveryPeriodSec = input.intermediaryPartitionDiscoveryPeriodSec;
+      this.intermediaryPartitionCleanupPeriodSec = input.intermediaryPartitionCleanupPeriodSec;
+      this.intermediaryPartitionTimeout = input.intermediaryPartitionTimeout;
+      this.globalIngestionHeapLimitBytes = input.globalIngestionHeapLimitBytes;
+      this.numConcurrentMerges = input.numConcurrentMerges;
+    }
+
+    public Builder setIp(String ip)
+    {
+      this.ip = ip;
+      return this;
+    }
+
+    public Builder setVersion(String version)
+    {
+      this.version = version;
+      return this;
+    }
+
+    public Builder setCapacity(int capacity)
+    {
+      this.capacity = capacity;
+      return this;
+    }
+
+    public Builder setBaseTaskDirSize(long baseTaskDirSize)
+    {
+      this.baseTaskDirSize = baseTaskDirSize;
+      return this;
+    }
+
+    public Builder setBaseTaskDirs(List<String> baseTaskDirs)
+    {
+      this.baseTaskDirs = baseTaskDirs;
+      return this;
+    }
+
+    public Builder setCategory(String category)
+    {
+      this.category = category;
+      return this;
+    }
+
+    public Builder setIntermediaryPartitionDiscoveryPeriodSec(long intermediaryPartitionDiscoveryPeriodSec)
+    {
+      this.intermediaryPartitionDiscoveryPeriodSec = intermediaryPartitionDiscoveryPeriodSec;
+      return this;
+    }
+
+    public Builder setIntermediaryPartitionCleanupPeriodSec(long intermediaryPartitionCleanupPeriodSec)
+    {
+      this.intermediaryPartitionCleanupPeriodSec = intermediaryPartitionCleanupPeriodSec;
+      return this;
+    }
+
+    public Builder setIntermediaryPartitionTimeout(Period intermediaryPartitionTimeout)
+    {
+      this.intermediaryPartitionTimeout = intermediaryPartitionTimeout;
+      return this;
+    }
+
+    public Builder setGlobalIngestionHeapLimitBytes(long globalIngestionHeapLimitBytes)
+    {
+      this.globalIngestionHeapLimitBytes = globalIngestionHeapLimitBytes;
+      return this;
+    }
+
+    public Builder setNumConcurrentMerges(int numConcurrentMerges)
+    {
+      this.numConcurrentMerges = numConcurrentMerges;
+      return this;
+    }
+
+    public WorkerConfig build()
+    {
+      final WorkerConfig retVal = new WorkerConfig();
+      retVal.ip = this.ip;
+      retVal.version = this.version;
+      retVal.capacity = this.capacity;
+      retVal.baseTaskDirSize = this.baseTaskDirSize;
+      retVal.baseTaskDirs = this.baseTaskDirs;
+      retVal.category = this.category;
+      retVal.intermediaryPartitionDiscoveryPeriodSec = this.intermediaryPartitionDiscoveryPeriodSec;
+      retVal.intermediaryPartitionCleanupPeriodSec = this.intermediaryPartitionCleanupPeriodSec;
+      retVal.intermediaryPartitionTimeout = this.intermediaryPartitionTimeout;
+      retVal.globalIngestionHeapLimitBytes = this.globalIngestionHeapLimitBytes;
+      retVal.numConcurrentMerges = this.numConcurrentMerges;
+      return retVal;
+    }
+  }
 }
diff --git a/server/src/main/java/org/apache/druid/server/initialization/AuthenticatorMapperModule.java b/server/src/main/java/org/apache/druid/server/initialization/AuthenticatorMapperModule.java
index c2e240d8e9..82f39d22dc 100644
--- a/server/src/main/java/org/apache/druid/server/initialization/AuthenticatorMapperModule.java
+++ b/server/src/main/java/org/apache/druid/server/initialization/AuthenticatorMapperModule.java
@@ -19,7 +19,6 @@
 
 package org.apache.druid.server.initialization;
 
-import com.google.common.base.Supplier;
 import com.google.common.collect.Maps;
 import com.google.inject.Binder;
 import com.google.inject.Inject;
@@ -103,11 +102,7 @@ public class AuthenticatorMapperModule implements DruidModule
         }
         authenticatorProvider.inject(adjustedProps, configurator);
 
-        Supplier<Authenticator> authenticatorSupplier = authenticatorProvider.get();
-        if (authenticatorSupplier == null) {
-          throw new ISE("Could not create authenticator with name: %s", authenticatorName);
-        }
-        Authenticator authenticator = authenticatorSupplier.get();
+        Authenticator authenticator = authenticatorProvider.get();
         authenticatorMap.put(authenticatorName, authenticator);
       }
 
diff --git a/server/src/main/java/org/apache/druid/server/initialization/AuthorizerMapperModule.java b/server/src/main/java/org/apache/druid/server/initialization/AuthorizerMapperModule.java
index 750df5eb7d..3d3507b0c3 100644
--- a/server/src/main/java/org/apache/druid/server/initialization/AuthorizerMapperModule.java
+++ b/server/src/main/java/org/apache/druid/server/initialization/AuthorizerMapperModule.java
@@ -19,7 +19,6 @@
 
 package org.apache.druid.server.initialization;
 
-import com.google.common.base.Supplier;
 import com.google.inject.Binder;
 import com.google.inject.Inject;
 import com.google.inject.Injector;
@@ -122,11 +121,7 @@ public class AuthorizerMapperModule implements DruidModule
 
         authorizerProvider.inject(adjustedProps, configurator);
 
-        Supplier<Authorizer> authorizerSupplier = authorizerProvider.get();
-        if (authorizerSupplier == null) {
-          throw new ISE("Could not create authorizer with name: %s", authorizerName);
-        }
-        Authorizer authorizer = authorizerSupplier.get();
+        Authorizer authorizer = authorizerProvider.get();
         authorizerMap.put(authorizerName, authorizer);
       }
 
diff --git a/server/src/test/java/org/apache/druid/client/cache/CacheConfigTest.java b/server/src/test/java/org/apache/druid/client/cache/CacheConfigTest.java
index d0ed06d2d6..3a0a184f1a 100644
--- a/server/src/test/java/org/apache/druid/client/cache/CacheConfigTest.java
+++ b/server/src/test/java/org/apache/druid/client/cache/CacheConfigTest.java
@@ -88,7 +88,7 @@ public class CacheConfigTest
     properties.put(PROPERTY_PREFIX + ".unCacheable", "[\"a\",\"b\"]");
 
     configProvider.inject(properties, configurator);
-    CacheConfig config = configProvider.get().get();
+    CacheConfig config = configProvider.get();
 
     injector.injectMembers(config);
     Assert.assertEquals(5, config.getNumBackgroundThreads());
@@ -103,7 +103,7 @@ public class CacheConfigTest
     properties.put(PROPERTY_PREFIX + ".useCache", "false");
 
     configProvider.inject(properties, configurator);
-    CacheConfig config = configProvider.get().get();
+    CacheConfig config = configProvider.get();
 
     Assert.assertEquals(99, config.getNumBackgroundThreads());
     Assert.assertEquals(false, config.isPopulateCache());
@@ -116,7 +116,7 @@ public class CacheConfigTest
     properties.put(PROPERTY_PREFIX + ".numBackgroundThreads", "-1");
 
     configProvider.inject(properties, configurator);
-    CacheConfig config = configProvider.get().get();
+    CacheConfig config = configProvider.get();
     Assert.assertNotEquals(-1, config.getNumBackgroundThreads());
   }
 
@@ -126,7 +126,7 @@ public class CacheConfigTest
   {
     properties.put(PROPERTY_PREFIX + ".numBackgroundThreads", "BABBA YAGA");
     configProvider.inject(properties, configurator);
-    CacheConfig config = configProvider.get().get();
+    CacheConfig config = configProvider.get();
     throw new IllegalStateException("Should have already failed");
   }
 
@@ -135,7 +135,7 @@ public class CacheConfigTest
   {
     properties.put(PROPERTY_PREFIX + ".populateCache", "TRUE");
     configProvider.inject(properties, configurator);
-    CacheConfig config = configProvider.get().get();
+    CacheConfig config = configProvider.get();
     throw new IllegalStateException("Should have already failed");
   }
 
@@ -144,7 +144,7 @@ public class CacheConfigTest
   {
     properties.put(PROPERTY_PREFIX + ".populateCache", "FALSE");
     configProvider.inject(properties, configurator);
-    CacheConfig config = configProvider.get().get();
+    CacheConfig config = configProvider.get();
     throw new IllegalStateException("Should have already failed");
   }
 
@@ -154,7 +154,7 @@ public class CacheConfigTest
   {
     properties.put(PROPERTY_PREFIX + ".populateCache", "FaLse");
     configProvider.inject(properties, configurator);
-    CacheConfig config = configProvider.get().get();
+    CacheConfig config = configProvider.get();
     throw new IllegalStateException("Should have already failed");
   }
 
diff --git a/server/src/test/java/org/apache/druid/client/cache/CaffeineCacheTest.java b/server/src/test/java/org/apache/druid/client/cache/CaffeineCacheTest.java
index ed68521714..2352661b6d 100644
--- a/server/src/test/java/org/apache/druid/client/cache/CaffeineCacheTest.java
+++ b/server/src/test/java/org/apache/druid/client/cache/CaffeineCacheTest.java
@@ -397,7 +397,7 @@ public class CaffeineCacheTest
         CaffeineCacheConfig.class
     );
     caffeineCacheConfigJsonConfigProvider.inject(properties, configurator);
-    final CaffeineCacheConfig config = caffeineCacheConfigJsonConfigProvider.get().get();
+    final CaffeineCacheConfig config = caffeineCacheConfigJsonConfigProvider.get();
     Assert.assertEquals(10, config.getExpireAfter());
     Assert.assertEquals(100, config.getSizeInBytes());
     Assert.assertNotNull(config.createExecutor());
@@ -428,7 +428,7 @@ public class CaffeineCacheTest
         CaffeineCacheConfig.class
     );
     caffeineCacheConfigJsonConfigProvider.inject(properties, configurator);
-    final CaffeineCacheConfig config = caffeineCacheConfigJsonConfigProvider.get().get();
+    final CaffeineCacheConfig config = caffeineCacheConfigJsonConfigProvider.get();
     Assert.assertEquals(10, config.getExpireAfter());
     Assert.assertEquals(100, config.getSizeInBytes());
     Assert.assertEquals(ForkJoinPool.commonPool(), config.createExecutor());
@@ -456,7 +456,7 @@ public class CaffeineCacheTest
         CaffeineCacheConfig.class
     );
     caffeineCacheConfigJsonConfigProvider.inject(properties, configurator);
-    final CaffeineCacheConfig config = caffeineCacheConfigJsonConfigProvider.get().get();
+    final CaffeineCacheConfig config = caffeineCacheConfigJsonConfigProvider.get();
     Assert.assertEquals(-1, config.getExpireAfter());
     Assert.assertEquals(-1L, config.getSizeInBytes());
     Assert.assertEquals(ForkJoinPool.commonPool(), config.createExecutor());
diff --git a/server/src/test/java/org/apache/druid/curator/CuratorConfigTest.java b/server/src/test/java/org/apache/druid/curator/CuratorConfigTest.java
index ef4d4fc354..ee3a953a5f 100644
--- a/server/src/test/java/org/apache/druid/curator/CuratorConfigTest.java
+++ b/server/src/test/java/org/apache/druid/curator/CuratorConfigTest.java
@@ -36,7 +36,7 @@ public class CuratorConfigTest extends JsonConfigTesterBase<CuratorConfig>
     propertyValues.put(getPropertyKey("maxZkRetries"), "20");
     testProperties.putAll(propertyValues);
     configProvider.inject(testProperties, configurator);
-    CuratorConfig config = configProvider.get().get();
+    CuratorConfig config = configProvider.get();
     Assert.assertEquals("fooHost", config.getZkHosts());
     Assert.assertEquals(true, config.getEnableAcl());
     Assert.assertEquals("test-zk-user", config.getZkUser());
diff --git a/server/src/test/java/org/apache/druid/guice/JsonConfigTesterBase.java b/server/src/test/java/org/apache/druid/guice/JsonConfigTesterBase.java
index d21d9dd475..d6bbe22100 100644
--- a/server/src/test/java/org/apache/druid/guice/JsonConfigTesterBase.java
+++ b/server/src/test/java/org/apache/druid/guice/JsonConfigTesterBase.java
@@ -156,7 +156,7 @@ public abstract class JsonConfigTesterBase<T>
       throws IllegalAccessException, NoSuchMethodException, InvocationTargetException
   {
     configProvider.inject(testProperties, configurator);
-    validateEntries(configProvider.get().get());
+    validateEntries(configProvider.get());
     Assert.assertEquals(propertyValues.size(), assertions);
   }
 
diff --git a/server/src/test/java/org/apache/druid/guice/JsonConfigTesterBaseTest.java b/server/src/test/java/org/apache/druid/guice/JsonConfigTesterBaseTest.java
index 8107c2b59b..ffca0cfe4b 100644
--- a/server/src/test/java/org/apache/druid/guice/JsonConfigTesterBaseTest.java
+++ b/server/src/test/java/org/apache/druid/guice/JsonConfigTesterBaseTest.java
@@ -58,7 +58,7 @@ public final class JsonConfigTesterBaseTest
     propertyValues.put(getPropertyKey("map"), "{\"k1\": \"v1\", \"k2\": \"v2\"}");
     testProperties.putAll(propertyValues);
     configProvider.inject(testProperties, configurator);
-    Sample results = configProvider.get().get();
+    Sample results = configProvider.get();
 
     Assert.assertEquals(1, results.getPrimitiveInt());
     Assert.assertTrue(results.getPrimitiveBoolean());
diff --git a/server/src/test/java/org/apache/druid/guice/security/DruidAuthModuleTest.java b/server/src/test/java/org/apache/druid/guice/security/DruidAuthModuleTest.java
index 92606b28ef..a3cea18f56 100644
--- a/server/src/test/java/org/apache/druid/guice/security/DruidAuthModuleTest.java
+++ b/server/src/test/java/org/apache/druid/guice/security/DruidAuthModuleTest.java
@@ -102,6 +102,6 @@ public class DruidAuthModuleTest
         AuthConfig.class
     );
     provider.inject(properties, injector.getInstance(JsonConfigurator.class));
-    return provider.get().get();
+    return provider.get();
   }
 }
diff --git a/processing/src/main/java/org/apache/druid/guice/SupplierProvider.java b/server/src/test/java/org/apache/druid/indexing/worker/config/WorkerConfigTest.java
similarity index 55%
rename from processing/src/main/java/org/apache/druid/guice/SupplierProvider.java
rename to server/src/test/java/org/apache/druid/indexing/worker/config/WorkerConfigTest.java
index 107f3d92f4..86288f517a 100644
--- a/processing/src/main/java/org/apache/druid/guice/SupplierProvider.java
+++ b/server/src/test/java/org/apache/druid/indexing/worker/config/WorkerConfigTest.java
@@ -17,38 +17,27 @@
  * under the License.
  */
 
-package org.apache.druid.guice;
+package org.apache.druid.indexing.worker.config;
 
-import com.google.common.base.Supplier;
-import com.google.inject.Inject;
-import com.google.inject.Injector;
-import com.google.inject.Key;
-import com.google.inject.Provider;
+import org.junit.Assert;
+import org.junit.Test;
 
-/**
- */
-public class SupplierProvider<T> implements Provider<T>
-{
-  private final Key<Supplier<T>> supplierKey;
-
-  private Provider<Supplier<T>> supplierProvider;
+import java.util.Arrays;
 
-  public SupplierProvider(
-      Key<Supplier<T>> supplierKey
-  )
-  {
-    this.supplierKey = supplierKey;
-  }
-
-  @Inject
-  public void configure(Injector injector)
+public class WorkerConfigTest
+{
+  @Test
+  public void testSetters()
   {
-    this.supplierProvider = injector.getProvider(supplierKey);
-  }
+    WorkerConfig config = new WorkerConfig()
+        .cloneBuilder()
+        .setCapacity(10)
+        .setBaseTaskDirSize(100_000_000L)
+        .setBaseTaskDirs(Arrays.asList("1", "2", "another"))
+        .build();
 
-  @Override
-  public T get()
-  {
-    return supplierProvider.get().get();
+    Assert.assertEquals(10, config.getCapacity());
+    Assert.assertEquals(100_000_000L, config.getBaseTaskDirSize());
+    Assert.assertEquals(Arrays.asList("1", "2", "another"), config.getBaseTaskDirs());
   }
 }
diff --git a/server/src/test/java/org/apache/druid/initialization/ZkPathsConfigTest.java b/server/src/test/java/org/apache/druid/initialization/ZkPathsConfigTest.java
index 0d4d26de4e..70ec19064f 100644
--- a/server/src/test/java/org/apache/druid/initialization/ZkPathsConfigTest.java
+++ b/server/src/test/java/org/apache/druid/initialization/ZkPathsConfigTest.java
@@ -62,7 +62,7 @@ public class ZkPathsConfigTest extends JsonConfigTesterBase<ZkPathsConfig>
     propertyValues.put(StringUtils.format("%s.loadQueuePath", CONFIG_PREFIX), ZKPaths.makePath(base, "loadQueue"));
     propertyValues.put(StringUtils.format("%s.connectorPath", CONFIG_PREFIX), ZKPaths.makePath(base, "connector"));
 
-    ZkPathsConfig zkPathsConfigObj = zkPathsConfig.get().get();
+    ZkPathsConfig zkPathsConfigObj = zkPathsConfig.get();
     validateEntries(zkPathsConfigObj);
     Assert.assertEquals(propertyValues.size(), assertions);
 
diff --git a/server/src/test/java/org/apache/druid/query/lookup/LookupListeningAnnouncerConfigTest.java b/server/src/test/java/org/apache/druid/query/lookup/LookupListeningAnnouncerConfigTest.java
index 24e2e60b35..a0f3bc9cca 100644
--- a/server/src/test/java/org/apache/druid/query/lookup/LookupListeningAnnouncerConfigTest.java
+++ b/server/src/test/java/org/apache/druid/query/lookup/LookupListeningAnnouncerConfigTest.java
@@ -80,7 +80,7 @@ public class LookupListeningAnnouncerConfigTest
         LookupListeningAnnouncerConfig.class
     );
     configProvider.inject(properties, configurator);
-    final LookupListeningAnnouncerConfig config = configProvider.get().get();
+    final LookupListeningAnnouncerConfig config = configProvider.get();
     Assert.assertEquals(LookupListeningAnnouncerConfig.DEFAULT_TIER, config.getLookupTier());
   }
 
@@ -95,7 +95,7 @@ public class LookupListeningAnnouncerConfigTest
         LookupListeningAnnouncerConfig.class
     );
     configProvider.inject(properties, configurator);
-    final LookupListeningAnnouncerConfig config = configProvider.get().get();
+    final LookupListeningAnnouncerConfig config = configProvider.get();
     Assert.assertEquals(lookupTier, config.getLookupTier());
   }
 
@@ -109,7 +109,7 @@ public class LookupListeningAnnouncerConfigTest
         LookupListeningAnnouncerConfig.class
     );
     configProvider.inject(properties, configurator);
-    final LookupListeningAnnouncerConfig config = configProvider.get().get();
+    final LookupListeningAnnouncerConfig config = configProvider.get();
     config.getLookupTier();
   }
 
@@ -123,7 +123,7 @@ public class LookupListeningAnnouncerConfigTest
         LookupListeningAnnouncerConfig.class
     );
     configProvider.inject(properties, configurator);
-    final LookupListeningAnnouncerConfig config = configProvider.get().get();
+    final LookupListeningAnnouncerConfig config = configProvider.get();
     Assert.assertEquals("some_datasource", config.getLookupTier());
   }
 
@@ -139,7 +139,7 @@ public class LookupListeningAnnouncerConfigTest
         LookupListeningAnnouncerConfig.class
     );
     configProvider.inject(properties, configurator);
-    final LookupListeningAnnouncerConfig config = configProvider.get().get();
+    final LookupListeningAnnouncerConfig config = configProvider.get();
     Assert.assertEquals(lookupTier, config.getLookupTier());
   }
 }
diff --git a/server/src/test/java/org/apache/druid/server/QuerySchedulerTest.java b/server/src/test/java/org/apache/druid/server/QuerySchedulerTest.java
index 1f3acac8fc..749317d030 100644
--- a/server/src/test/java/org/apache/druid/server/QuerySchedulerTest.java
+++ b/server/src/test/java/org/apache/druid/server/QuerySchedulerTest.java
@@ -428,7 +428,7 @@ public class QuerySchedulerTest
     final Properties properties = new Properties();
     properties.setProperty(propertyPrefix + ".numThreads", "10");
     provider.inject(properties, injector.getInstance(JsonConfigurator.class));
-    final QueryScheduler scheduler = provider.get().get().get();
+    final QueryScheduler scheduler = provider.get().get();
     Assert.assertEquals(10, scheduler.getTotalAvailableCapacity());
     Assert.assertEquals(QueryScheduler.UNAVAILABLE, scheduler.getLaneAvailableCapacity(HiLoQueryLaningStrategy.LOW));
     Assert.assertEquals(QueryScheduler.UNAVAILABLE, scheduler.getLaneAvailableCapacity("non-existent"));
@@ -449,7 +449,7 @@ public class QuerySchedulerTest
     properties.setProperty(propertyPrefix + ".laning.maxLowPercent", "20");
 
     provider.inject(properties, injector.getInstance(JsonConfigurator.class));
-    final QueryScheduler scheduler = provider.get().get().get();
+    final QueryScheduler scheduler = provider.get().get();
     Assert.assertEquals(10, scheduler.getTotalAvailableCapacity());
     Assert.assertEquals(2, scheduler.getLaneAvailableCapacity(HiLoQueryLaningStrategy.LOW));
     Assert.assertEquals(QueryScheduler.UNAVAILABLE, scheduler.getLaneAvailableCapacity("non-existent"));
@@ -468,7 +468,7 @@ public class QuerySchedulerTest
     final Properties properties = new Properties();
     properties.setProperty(propertyPrefix + ".laning.strategy", "hilo");
     provider.inject(properties, injector.getInstance(JsonConfigurator.class));
-    Throwable t = Assert.assertThrows(ProvisionException.class, () -> provider.get().get().get());
+    Throwable t = Assert.assertThrows(ProvisionException.class, () -> provider.get().get());
     Assert.assertEquals(
         "Unable to provision, see the following errors:\n"
         + "\n"
@@ -497,7 +497,7 @@ public class QuerySchedulerTest
     properties.setProperty(propertyPrefix + ".prioritization.adjustment", "5");
     properties.setProperty(propertyPrefix + ".prioritization.segmentCountThreshold", "1");
     provider.inject(properties, injector.getInstance(JsonConfigurator.class));
-    final QueryScheduler scheduler = provider.get().get().get();
+    final QueryScheduler scheduler = provider.get().get();
     Assert.assertEquals(10, scheduler.getTotalAvailableCapacity());
     Assert.assertEquals(2, scheduler.getLaneAvailableCapacity(HiLoQueryLaningStrategy.LOW));
     Assert.assertEquals(QueryScheduler.UNAVAILABLE, scheduler.getLaneAvailableCapacity("non-existent"));
@@ -525,7 +525,7 @@ public class QuerySchedulerTest
     final Properties properties = new Properties();
     properties.setProperty(propertyPrefix + ".prioritization.strategy", "threshold");
     provider.inject(properties, injector.getInstance(JsonConfigurator.class));
-    Throwable t = Assert.assertThrows(ProvisionException.class, () -> provider.get().get().get());
+    Throwable t = Assert.assertThrows(ProvisionException.class, () -> provider.get().get());
     Assert.assertEquals(
         "Unable to provision, see the following errors:\n"
         + "\n"
@@ -553,7 +553,7 @@ public class QuerySchedulerTest
     properties.put(propertyPrefix + ".laning.lanes.one", "1");
     properties.put(propertyPrefix + ".laning.lanes.two", "2");
     provider.inject(properties, injector.getInstance(JsonConfigurator.class));
-    final QueryScheduler scheduler = provider.get().get().get();
+    final QueryScheduler scheduler = provider.get().get();
     Assert.assertEquals(10, scheduler.getTotalAvailableCapacity());
     Assert.assertEquals(1, scheduler.getLaneAvailableCapacity("one"));
     Assert.assertEquals(2, scheduler.getLaneAvailableCapacity("two"));
@@ -576,7 +576,7 @@ public class QuerySchedulerTest
     properties.put(propertyPrefix + ".laning.lanes.one", "1");
     properties.put(propertyPrefix + ".laning.lanes.twenty", "20");
     provider.inject(properties, injector.getInstance(JsonConfigurator.class));
-    final QueryScheduler scheduler = provider.get().get().get();
+    final QueryScheduler scheduler = provider.get().get();
     Assert.assertEquals(10, scheduler.getTotalAvailableCapacity());
     Assert.assertEquals(1, scheduler.getLaneAvailableCapacity("one"));
     Assert.assertEquals(2, scheduler.getLaneAvailableCapacity("twenty"));
diff --git a/server/src/test/java/org/apache/druid/server/log/LoggingRequestLoggerProviderTest.java b/server/src/test/java/org/apache/druid/server/log/LoggingRequestLoggerProviderTest.java
index 11c7db40af..7642e88a8b 100644
--- a/server/src/test/java/org/apache/druid/server/log/LoggingRequestLoggerProviderTest.java
+++ b/server/src/test/java/org/apache/druid/server/log/LoggingRequestLoggerProviderTest.java
@@ -53,7 +53,7 @@ public class LoggingRequestLoggerProviderTest
     final Properties properties = new Properties();
     properties.put(propertyPrefix + ".type", "slf4j");
     provider.inject(properties, injector.getInstance(JsonConfigurator.class));
-    final LoggingRequestLogger requestLogger = (LoggingRequestLogger) provider.get().get().get();
+    final LoggingRequestLogger requestLogger = (LoggingRequestLogger) provider.get().get();
     Assert.assertFalse(requestLogger.isSetContextMDC());
     Assert.assertFalse(requestLogger.isSetMDC());
   }
@@ -66,7 +66,7 @@ public class LoggingRequestLoggerProviderTest
     properties.put(propertyPrefix + ".setMDC", "true");
     properties.put(propertyPrefix + ".setContextMDC", "true");
     provider.inject(properties, injector.getInstance(JsonConfigurator.class));
-    final LoggingRequestLogger requestLogger = (LoggingRequestLogger) provider.get().get().get();
+    final LoggingRequestLogger requestLogger = (LoggingRequestLogger) provider.get().get();
     Assert.assertTrue(requestLogger.isSetContextMDC());
     Assert.assertTrue(requestLogger.isSetMDC());
   }
@@ -77,7 +77,7 @@ public class LoggingRequestLoggerProviderTest
     final Properties properties = new Properties();
     properties.put(propertyPrefix + ".type", "noop");
     provider.inject(properties, injector.getInstance(JsonConfigurator.class));
-    Assert.assertThat(provider.get().get().get(), Matchers.instanceOf(NoopRequestLogger.class));
+    Assert.assertThat(provider.get().get(), Matchers.instanceOf(NoopRequestLogger.class));
   }
 
   private Injector makeInjector()
diff --git a/services/src/main/java/org/apache/druid/cli/CliCoordinator.java b/services/src/main/java/org/apache/druid/cli/CliCoordinator.java
index 5684c3871c..85d2e41f54 100644
--- a/services/src/main/java/org/apache/druid/cli/CliCoordinator.java
+++ b/services/src/main/java/org/apache/druid/cli/CliCoordinator.java
@@ -24,7 +24,6 @@ import com.fasterxml.jackson.databind.ObjectMapper;
 import com.github.rvesse.airline.annotations.Command;
 import com.google.common.base.Predicates;
 import com.google.common.base.Strings;
-import com.google.common.base.Supplier;
 import com.google.common.collect.ImmutableSet;
 import com.google.inject.Binder;
 import com.google.inject.Inject;
@@ -440,11 +439,7 @@ public class CliCoordinator extends ServerRunnable
               adjustedProps.put(typeProperty, dutyName);
             }
             coordinatorCustomDutyProvider.inject(adjustedProps, configurator);
-            Supplier<CoordinatorCustomDuty> coordinatorCustomDutySupplier = coordinatorCustomDutyProvider.get();
-            if (coordinatorCustomDutySupplier == null) {
-              throw new ISE("Could not create CoordinatorCustomDuty with name: %s for group: %s", dutyName, coordinatorCustomDutyGroupName);
-            }
-            CoordinatorCustomDuty coordinatorCustomDuty = coordinatorCustomDutySupplier.get();
+            CoordinatorCustomDuty coordinatorCustomDuty = coordinatorCustomDutyProvider.get();
             if (coordinatorCustomDuty == null) {
               throw new ISE("Could not create CoordinatorCustomDuty with name: %s for group: %s", dutyName, coordinatorCustomDutyGroupName);
             }
diff --git a/services/src/main/java/org/apache/druid/cli/CliPeon.java b/services/src/main/java/org/apache/druid/cli/CliPeon.java
index 0dc2460995..3c4a1c37df 100644
--- a/services/src/main/java/org/apache/druid/cli/CliPeon.java
+++ b/services/src/main/java/org/apache/druid/cli/CliPeon.java
@@ -109,6 +109,7 @@ import org.apache.druid.segment.loading.DataSegmentMover;
 import org.apache.druid.segment.loading.OmniDataSegmentArchiver;
 import org.apache.druid.segment.loading.OmniDataSegmentKiller;
 import org.apache.druid.segment.loading.OmniDataSegmentMover;
+import org.apache.druid.segment.loading.StorageLocation;
 import org.apache.druid.segment.realtime.appenderator.AppenderatorsManager;
 import org.apache.druid.segment.realtime.appenderator.PeonAppenderatorsManager;
 import org.apache.druid.segment.realtime.firehose.ChatHandlerProvider;
@@ -126,6 +127,7 @@ import org.apache.druid.server.initialization.jetty.JettyServerInitializer;
 import org.apache.druid.server.metrics.DataSourceTaskIdHolder;
 import org.eclipse.jetty.server.Server;
 
+import java.io.File;
 import java.io.IOException;
 import java.nio.file.Paths;
 import java.util.List;
@@ -254,13 +256,7 @@ public class CliPeon extends GuiceRunnable
             LifecycleModule.register(binder, Server.class);
 
             if ("true".equals(loadBroadcastSegments)) {
-              binder.bind(SegmentManager.class).in(LazySingleton.class);
-              binder.bind(ZkCoordinator.class).in(ManageLifecycle.class);
-              Jerseys.addResource(binder, HistoricalResource.class);
-
-              if (isZkEnabled) {
-                LifecycleModule.register(binder, ZkCoordinator.class);
-              }
+              binder.install(new BroadcastSegmentLoadingModule());
             }
           }
 
@@ -492,4 +488,28 @@ public class CliPeon extends GuiceRunnable
     shuffleClientBiddy.addBinding("local").to(HttpShuffleClient.class).in(LazySingleton.class);
     shuffleClientBiddy.addBinding("deepstore").to(DeepStorageShuffleClient.class).in(LazySingleton.class);
   }
+
+  public class BroadcastSegmentLoadingModule implements Module
+  {
+    @Override
+    public void configure(Binder binder)
+    {
+      binder.bind(SegmentManager.class).in(LazySingleton.class);
+      binder.bind(ZkCoordinator.class).in(ManageLifecycle.class);
+      Jerseys.addResource(binder, HistoricalResource.class);
+
+      if (isZkEnabled) {
+        LifecycleModule.register(binder, ZkCoordinator.class);
+      }
+    }
+
+    @Provides
+    @LazySingleton
+    public List<StorageLocation> getCliPeonStorageLocations(TaskConfig config)
+    {
+      File broadcastStorage = new File(new File(taskDirPath, "broadcast"), "segments");
+
+      return ImmutableList.of(new StorageLocation(broadcastStorage, config.getTmpStorageBytesPerTask(), null));
+    }
+  }
 }
diff --git a/sql/src/test/java/org/apache/druid/sql/avatica/AvaticaModuleTest.java b/sql/src/test/java/org/apache/druid/sql/avatica/AvaticaModuleTest.java
index 69153c8a74..2c10994c4a 100644
--- a/sql/src/test/java/org/apache/druid/sql/avatica/AvaticaModuleTest.java
+++ b/sql/src/test/java/org/apache/druid/sql/avatica/AvaticaModuleTest.java
@@ -124,7 +124,7 @@ public class AvaticaModuleTest
     properties.setProperty("druid.sql.avatica.maxRowsPerFrame", "50000");
     properties.setProperty("druid.sql.avatica.minRowsPerFrame", "10000");
     provider.inject(properties, injector.getInstance(JsonConfigurator.class));
-    final AvaticaServerConfig config = provider.get().get();
+    final AvaticaServerConfig config = provider.get();
     Assert.assertNotNull(config);
     Assert.assertEquals(AvaticaServerConfig.DEFAULT_MAX_CONNECTIONS, config.getMaxConnections());
     Assert.assertEquals(
@@ -146,7 +146,7 @@ public class AvaticaModuleTest
     );
     properties.setProperty("druid.sql.avatica.maxRowsPerFrame", "50");
     provider.inject(properties, injector.getInstance(JsonConfigurator.class));
-    final AvaticaServerConfig config = provider.get().get();
+    final AvaticaServerConfig config = provider.get();
     Assert.assertNotNull(config);
     Assert.assertEquals(AvaticaServerConfig.DEFAULT_MAX_CONNECTIONS, config.getMaxConnections());
     Assert.assertEquals(
@@ -170,7 +170,7 @@ public class AvaticaModuleTest
     );
     properties.setProperty("druid.sql.avatica.minRowsPerFrame", "-1");
     provider.inject(properties, injector.getInstance(JsonConfigurator.class));
-    final AvaticaServerConfig config = provider.get().get();
+    final AvaticaServerConfig config = provider.get();
     Assert.assertNotNull(config);
     config.getMinRowsPerFrame();
   }
diff --git a/website/.spelling b/website/.spelling
index 16d48b006a..910912d46c 100644
--- a/website/.spelling
+++ b/website/.spelling
@@ -208,6 +208,7 @@ SqlParameter
 SslContextFactory
 StatsD
 SYSTEM_TABLE
+TaskRunner
 TCP
 TGT
 TLS


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org