You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by za...@apache.org on 2021/08/13 20:40:48 UTC

[druid] branch master updated: option to use deep storage for storing shuffle data (#11507)

This is an automated email from the ASF dual-hosted git repository.

zachjsh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new c7b4667  option to use deep storage for storing shuffle data (#11507)
c7b4667 is described below

commit c7b46671b311e841f7ddc3596dc4e9b4cf96b24d
Author: Parag Jain <pj...@apache.org>
AuthorDate: Sat Aug 14 02:10:25 2021 +0530

    option to use deep storage for storing shuffle data (#11507)
    
    Fixes #11297.
    Description
    
    Description and design in the proposal #11297
    Key changed/added classes in this PR
    
        *DataSegmentPusher
        *ShuffleClient
        *PartitionStat
        *PartitionLocation
        *IntermediaryDataManager
---
 .travis.yml                                        |  12 +-
 .../druid/segment/loading/DataSegmentPusher.java   |   5 +
 .../segment/loading/NoopDataSegmentPusher.java     |   6 +
 docs/configuration/index.md                        |   2 +-
 .../druid/storage/aliyun/OssDataSegmentPusher.java |   7 +-
 .../cassandra/CassandraDataSegmentPusher.java      |  11 +-
 .../cloudfiles/CloudFilesDataSegmentPusher.java    |  11 +-
 .../storage/azure/AzureDataSegmentPusher.java      |  20 ++-
 .../storage/azure/AzureDataSegmentPusherTest.java  |   6 +-
 .../storage/google/GoogleDataSegmentPusher.java    |   9 +-
 .../druid/storage/hdfs/HdfsDataSegmentPusher.java  |  30 +++--
 .../druid/storage/s3/S3DataSegmentPusher.java      |   7 +-
 .../parallel/DeepStoragePartitionLocation.java     | 122 ++++++++++++++++++
 ...tionStat.java => DeepStoragePartitionStat.java} |  73 +++++++----
 .../batch/parallel/DeepStorageShuffleClient.java   |  60 +++++++++
 .../GeneratedPartitionsMetadataReport.java         |   6 +-
 .../batch/parallel/GeneratedPartitionsReport.java  |  17 ++-
 .../batch/parallel/GenericPartitionLocation.java   | 113 ++++++++++++++++-
 .../task/batch/parallel/GenericPartitionStat.java  | 105 ++++++++++++++--
 .../task/batch/parallel/HttpShuffleClient.java     |  22 +++-
 .../parallel/ParallelIndexSupervisorTask.java      |  47 +++----
 .../PartialGenericSegmentMergeIOConfig.java        |  40 ------
 .../PartialGenericSegmentMergeIngestionSpec.java   |  38 ------
 ...GenericSegmentMergeParallelIndexTaskRunner.java |   8 +-
 .../parallel/PartialGenericSegmentMergeTask.java   |  10 +-
 ...HashSegmentGenerateParallelIndexTaskRunner.java |   2 +-
 .../parallel/PartialHashSegmentGenerateTask.java   |  18 +--
 ...angeSegmentGenerateParallelIndexTaskRunner.java |   2 +-
 .../parallel/PartialRangeSegmentGenerateTask.java  |  18 +--
 .../parallel/PartialSegmentMergeIOConfig.java      |  12 +-
 .../parallel/PartialSegmentMergeIngestionSpec.java |  10 +-
 .../batch/parallel/PartialSegmentMergeTask.java    |  37 ++----
 .../task/batch/parallel/PartitionLocation.java     | 137 +++------------------
 .../common/task/batch/parallel/PartitionStat.java  | 120 ++++--------------
 .../common/task/batch/parallel/ShuffleClient.java  |   5 +-
 .../DeepStorageIntermediaryDataManager.java        | 103 ++++++++++++++++
 .../worker/shuffle/IntermediaryDataManager.java    |  37 +++++-
 .../shuffle/LocalIntermediaryDataManager.java      |  60 ++++-----
 .../worker/shuffle/ShuffleDataSegmentPusher.java   |   5 +-
 .../AbstractParallelIndexSupervisorTaskTest.java   |  19 ++-
 ....java => DeepStoragePartitionLocationTest.java} |  39 ++++--
 ...Test.java => DeepStoragePartitionStatTest.java} |  23 ++--
 .../parallel/DeepStorageShuffleClientTest.java     | 113 +++++++++++++++++
 .../parallel/GenericPartitionLocationTest.java     |  10 ++
 .../batch/parallel/GenericPartitionStatTest.java   |  10 ++
 .../task/batch/parallel/HttpShuffleClientTest.java |  15 ++-
 .../parallel/ParallelIndexSupervisorTaskTest.java  |  89 ++++++++-----
 .../parallel/ParallelIndexTestingFactory.java      |   4 +-
 .../PartialGenericSegmentMergeTaskTest.java        |  61 ++++++---
 ...t.java => PartialSegmentMergeIOConfigTest.java} |  55 ++++++---
 ...a => PartialSegmentMergeIngestionSpecTest.java} |  48 ++++++--
 ...ocalIntermediaryDataManagerAutoCleanupTest.java |   4 +-
 ...ermediaryDataManagerManualAddAndDeleteTest.java |   6 +-
 .../shuffle/ShuffleDataSegmentPusherTest.java      | 115 ++++++++++++++---
 .../worker/shuffle/ShuffleResourceTest.java        |   4 +-
 integration-tests/docker/Dockerfile                |   4 +-
 .../docker/docker-compose.shuffle-deep-store.yml   | 105 ++++++++++++++++
 .../environment-configs/common-shuffle-deep-store  |  81 ++++++++++++
 integration-tests/script/docker_compose_args.sh    |   4 +
 .../java/org/apache/druid/tests/TestNGGroup.java   |   2 +
 .../indexer/ITPerfectRollupParallelIndexTest.java  |   2 +-
 .../segment/loading/LocalDataSegmentPusher.java    |  14 ++-
 .../org/apache/druid/cli/CliMiddleManager.java     |   2 +
 .../main/java/org/apache/druid/cli/CliPeon.java    |   4 +
 website/.spelling                                  |   2 +
 65 files changed, 1523 insertions(+), 665 deletions(-)

diff --git a/.travis.yml b/.travis.yml
index 4211f18..15b38d8 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -451,6 +451,14 @@ jobs:
       name: "(Compile=openjdk8, Run=openjdk8) perfect rollup parallel batch index integration test with Indexer"
       env: TESTNG_GROUPS='-Dgroups=perfect-rollup-parallel-batch-index' JVM_RUNTIME='-Djvm.runtime=8' USE_INDEXER='indexer'
 
+    - <<: *integration_perfect_rollup_parallel_batch_index
+      name: "(Compile=openjdk8, Run=openjdk8) perfect rollup parallel batch index integration test with deep storage as intermediate store"
+      env: TESTNG_GROUPS='-Dgroups=shuffle-deep-store' JVM_RUNTIME='-Djvm.runtime=8' USE_INDEXER='middleManager'
+
+    - <<: *integration_perfect_rollup_parallel_batch_index
+      name: "(Compile=openjdk8, Run=openjdk8) perfect rollup parallel batch index integration test with deep storage as intermediate store with indexer"
+      env: TESTNG_GROUPS='-Dgroups=shuffle-deep-store' JVM_RUNTIME='-Djvm.runtime=8' USE_INDEXER='indexer'
+
     - &integration_kafka_index
       name: "(Compile=openjdk8, Run=openjdk8) kafka index integration test"
       stage: Tests - phase 2
@@ -597,13 +605,13 @@ jobs:
       stage: Tests - phase 2
       jdk: openjdk8
       services: *integration_test_services
-      env: TESTNG_GROUPS='-DexcludedGroups=batch-index,input-format,input-source,perfect-rollup-parallel-batch-index,kafka-index,query,query-retry,query-error,realtime-index,security,ldap-security,s3-deep-storage,gcs-deep-storage,azure-deep-storage,hdfs-deep-storage,s3-ingestion,kinesis-index,kinesis-data-format,kafka-transactional-index,kafka-index-slow,kafka-transactional-index-slow,kafka-data-format,hadoop-s3-to-s3-deep-storage,hadoop-s3-to-hdfs-deep-storage,hadoop-azure-to-azure-deep [...]
+      env: TESTNG_GROUPS='-DexcludedGroups=batch-index,input-format,input-source,perfect-rollup-parallel-batch-index,kafka-index,query,query-retry,query-error,realtime-index,security,ldap-security,s3-deep-storage,gcs-deep-storage,azure-deep-storage,hdfs-deep-storage,s3-ingestion,kinesis-index,kinesis-data-format,kafka-transactional-index,kafka-index-slow,kafka-transactional-index-slow,kafka-data-format,hadoop-s3-to-s3-deep-storage,hadoop-s3-to-hdfs-deep-storage,hadoop-azure-to-azure-deep [...]
       script: *run_integration_test
       after_failure: *integration_test_diags
 
     - <<: *integration_tests
       name: "(Compile=openjdk8, Run=openjdk8) other integration tests with Indexer"
-      env: TESTNG_GROUPS='-DexcludedGroups=batch-index,input-format,input-source,perfect-rollup-parallel-batch-index,kafka-index,query,query-retry,query-error,realtime-index,security,ldap-security,s3-deep-storage,gcs-deep-storage,azure-deep-storage,hdfs-deep-storage,s3-ingestion,kinesis-index,kinesis-data-format,kafka-transactional-index,kafka-index-slow,kafka-transactional-index-slow,kafka-data-format,hadoop-s3-to-s3-deep-storage,hadoop-s3-to-hdfs-deep-storage,hadoop-azure-to-azure-deep [...]
+      env: TESTNG_GROUPS='-DexcludedGroups=batch-index,input-format,input-source,perfect-rollup-parallel-batch-index,kafka-index,query,query-retry,query-error,realtime-index,security,ldap-security,s3-deep-storage,gcs-deep-storage,azure-deep-storage,hdfs-deep-storage,s3-ingestion,kinesis-index,kinesis-data-format,kafka-transactional-index,kafka-index-slow,kafka-transactional-index-slow,kafka-data-format,hadoop-s3-to-s3-deep-storage,hadoop-s3-to-hdfs-deep-storage,hadoop-azure-to-azure-deep [...]
 
     - <<: *integration_tests
       name: "(Compile=openjdk8, Run=openjdk8) leadership and high availability integration tests"
diff --git a/core/src/main/java/org/apache/druid/segment/loading/DataSegmentPusher.java b/core/src/main/java/org/apache/druid/segment/loading/DataSegmentPusher.java
index 98c7550..ce93bb3 100644
--- a/core/src/main/java/org/apache/druid/segment/loading/DataSegmentPusher.java
+++ b/core/src/main/java/org/apache/druid/segment/loading/DataSegmentPusher.java
@@ -68,6 +68,11 @@ public interface DataSegmentPusher
    */
   DataSegment push(File file, DataSegment segment, boolean useUniquePath) throws IOException;
 
+  default DataSegment pushToPath(File indexFilesDir, DataSegment segment, String storageDirSuffix) throws IOException
+  {
+    throw new UnsupportedOperationException("not supported");
+  }
+
   //use map instead of LoadSpec class to avoid dependency pollution.
   Map<String, Object> makeLoadSpec(URI finalIndexZipFilePath);
 
diff --git a/core/src/main/java/org/apache/druid/segment/loading/NoopDataSegmentPusher.java b/core/src/main/java/org/apache/druid/segment/loading/NoopDataSegmentPusher.java
index 1af11ac..44ae968 100644
--- a/core/src/main/java/org/apache/druid/segment/loading/NoopDataSegmentPusher.java
+++ b/core/src/main/java/org/apache/druid/segment/loading/NoopDataSegmentPusher.java
@@ -51,6 +51,12 @@ public class NoopDataSegmentPusher implements DataSegmentPusher
   }
 
   @Override
+  public DataSegment pushToPath(File file, DataSegment segment, String storageDirSuffix)
+  {
+    return segment;
+  }
+
+  @Override
   public Map<String, Object> makeLoadSpec(URI uri)
   {
     return ImmutableMap.of();
diff --git a/docs/configuration/index.md b/docs/configuration/index.md
index 4ca0e8d..b365759 100644
--- a/docs/configuration/index.md
+++ b/docs/configuration/index.md
@@ -1306,7 +1306,7 @@ Processing properties set on the Middlemanager will be passed through to Peons.
 |`druid.processing.columnCache.sizeBytes`|Maximum size in bytes for the dimension value lookup cache. Any value greater than `0` enables the cache. It is currently disabled by default. Enabling the lookup cache can significantly improve the performance of aggregators operating on dimension values, such as the JavaScript aggregator, or cardinality aggregator, but can slow things down if the cache hit rate is low (i.e. dimensions with few repeating values). Enabling it may also require add [...]
 |`druid.processing.fifo`|If the processing queue should treat tasks of equal priority in a FIFO manner|`false`|
 |`druid.processing.tmpDir`|Path where temporary files created while processing a query should be stored. If specified, this configuration takes priority over the default `java.io.tmpdir` path.|path represented by `java.io.tmpdir`|
-|`druid.processing.intermediaryData.storage.type`|Storage type for storing intermediary segments of data shuffle between native parallel index tasks. Current choice are only "local" which stores segment files in local storage of Middle Managers (or Indexer).|local|
+|`druid.processing.intermediaryData.storage.type`|Storage type for storing intermediary segments of data shuffle between native parallel index tasks. Current choices are "local" which stores segment files in local storage of Middle Managers (or Indexer) or "deepstore" which uses configured deep storage. Note - With "deepstore" type data is stored in `shuffle-data` directory under the configured deep storage path, auto clean up for this directory is not supported yet. One can setup cloud  [...]
 
 The amount of direct memory needed by Druid is at least
 `druid.processing.buffer.sizeBytes * (druid.processing.numMergeBuffers + druid.processing.numThreads + 1)`. You can
diff --git a/extensions-contrib/aliyun-oss-extensions/src/main/java/org/apache/druid/storage/aliyun/OssDataSegmentPusher.java b/extensions-contrib/aliyun-oss-extensions/src/main/java/org/apache/druid/storage/aliyun/OssDataSegmentPusher.java
index 3f2fd2f..b824232 100644
--- a/extensions-contrib/aliyun-oss-extensions/src/main/java/org/apache/druid/storage/aliyun/OssDataSegmentPusher.java
+++ b/extensions-contrib/aliyun-oss-extensions/src/main/java/org/apache/druid/storage/aliyun/OssDataSegmentPusher.java
@@ -77,8 +77,13 @@ public class OssDataSegmentPusher implements DataSegmentPusher
   public DataSegment push(final File indexFilesDir, final DataSegment inSegment, final boolean useUniquePath)
       throws IOException
   {
-    final String path = OssUtils.constructSegmentPath(config.getPrefix(), getStorageDir(inSegment, useUniquePath));
+    return pushToPath(indexFilesDir, inSegment, getStorageDir(inSegment, useUniquePath));
+  }
 
+  @Override
+  public DataSegment pushToPath(File indexFilesDir, DataSegment inSegment, String storageDirSuffix) throws IOException
+  {
+    final String path = OssUtils.constructSegmentPath(config.getPrefix(), storageDirSuffix);
     log.debug("Copying segment[%s] to OSS at location[%s]", inSegment.getId(), path);
 
     final File zipOutFile = File.createTempFile("druid", "index.zip");
diff --git a/extensions-contrib/cassandra-storage/src/main/java/org/apache/druid/storage/cassandra/CassandraDataSegmentPusher.java b/extensions-contrib/cassandra-storage/src/main/java/org/apache/druid/storage/cassandra/CassandraDataSegmentPusher.java
index c61443f..b96902c 100644
--- a/extensions-contrib/cassandra-storage/src/main/java/org/apache/druid/storage/cassandra/CassandraDataSegmentPusher.java
+++ b/extensions-contrib/cassandra-storage/src/main/java/org/apache/druid/storage/cassandra/CassandraDataSegmentPusher.java
@@ -77,11 +77,16 @@ public class CassandraDataSegmentPusher extends CassandraStorage implements Data
   public DataSegment push(final File indexFilesDir, DataSegment segment, final boolean useUniquePath) throws IOException
   {
     log.info("Writing [%s] to C*", indexFilesDir);
+    return pushToPath(indexFilesDir, segment, this.getStorageDir(segment, useUniquePath));
+  }
+
+  @Override
+  public DataSegment pushToPath(File indexFilesDir, DataSegment segment, String storageDirSuffix) throws IOException
+  {
     String key = JOINER.join(
         config.getKeyspace().isEmpty() ? null : config.getKeyspace(),
-        this.getStorageDir(segment, useUniquePath)
-    );
-
+        storageDirSuffix
+        );
     // Create index
     final File compressedIndexFile = File.createTempFile("druid", "index.zip");
     long indexSize = CompressionUtils.zip(indexFilesDir, compressedIndexFile);
diff --git a/extensions-contrib/cloudfiles-extensions/src/main/java/org/apache/druid/storage/cloudfiles/CloudFilesDataSegmentPusher.java b/extensions-contrib/cloudfiles-extensions/src/main/java/org/apache/druid/storage/cloudfiles/CloudFilesDataSegmentPusher.java
index 42fe23f..22ca444 100644
--- a/extensions-contrib/cloudfiles-extensions/src/main/java/org/apache/druid/storage/cloudfiles/CloudFilesDataSegmentPusher.java
+++ b/extensions-contrib/cloudfiles-extensions/src/main/java/org/apache/druid/storage/cloudfiles/CloudFilesDataSegmentPusher.java
@@ -71,11 +71,16 @@ public class CloudFilesDataSegmentPusher implements DataSegmentPusher
   @Override
   public DataSegment push(final File indexFilesDir, final DataSegment inSegment, final boolean useUniquePath)
   {
+    return pushToPath(indexFilesDir, inSegment, getStorageDir(inSegment, useUniquePath));
+  }
+
+  @Override
+  public DataSegment pushToPath(File indexFilesDir, DataSegment inSegment, String storageDirSuffix)
+  {
     final String segmentPath = CloudFilesUtils.buildCloudFilesPath(
         this.config.getBasePath(),
-        getStorageDir(inSegment, useUniquePath)
-    );
-
+        storageDirSuffix
+        );
     File descriptorFile = null;
     File zipOutFile = null;
 
diff --git a/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureDataSegmentPusher.java b/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureDataSegmentPusher.java
index 461b107..1e46239 100644
--- a/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureDataSegmentPusher.java
+++ b/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureDataSegmentPusher.java
@@ -91,10 +91,7 @@ public class AzureDataSegmentPusher implements DataSegmentPusher
   @Override
   public String getStorageDir(DataSegment dataSegment, boolean useUniquePath)
   {
-    String prefix = segmentConfig.getPrefix();
-    boolean prefixIsNullOrEmpty = org.apache.commons.lang.StringUtils.isEmpty(prefix);
     String seg = JOINER.join(
-        prefixIsNullOrEmpty ? null : StringUtils.maybeRemoveTrailingSlash(prefix),
         dataSegment.getDataSource(),
         StringUtils.format(
             "%s_%s",
@@ -107,7 +104,7 @@ public class AzureDataSegmentPusher implements DataSegmentPusher
         useUniquePath ? DataSegmentPusher.generateUniquePath() : null
     );
 
-    log.info("DataSegment: [%s]", seg);
+    log.info("DataSegment Suffix: [%s]", seg);
 
     // Replace colons with underscores, since they are not supported through wasb:// prefix
     return seg;
@@ -124,6 +121,19 @@ public class AzureDataSegmentPusher implements DataSegmentPusher
       throws IOException
   {
     log.info("Uploading [%s] to Azure.", indexFilesDir);
+    final String azurePathSuffix = getAzurePath(segment, useUniquePath);
+    return pushToPath(indexFilesDir, segment, azurePathSuffix);
+  }
+
+  @Override
+  public DataSegment pushToPath(File indexFilesDir, DataSegment segment, String storageDirSuffix) throws IOException
+  {
+    String prefix = segmentConfig.getPrefix();
+    boolean prefixIsNullOrEmpty = org.apache.commons.lang.StringUtils.isEmpty(prefix);
+    final String azurePath = JOINER.join(
+        prefixIsNullOrEmpty ? null : StringUtils.maybeRemoveTrailingSlash(prefix),
+        storageDirSuffix
+    );
 
     final int binaryVersion = SegmentUtils.getVersionFromDir(indexFilesDir);
     File zipOutFile = null;
@@ -132,8 +142,6 @@ public class AzureDataSegmentPusher implements DataSegmentPusher
       final File outFile = zipOutFile = File.createTempFile("index", ".zip");
       final long size = CompressionUtils.zip(indexFilesDir, zipOutFile);
 
-      final String azurePath = getAzurePath(segment, useUniquePath);
-
       return AzureUtils.retryAzureOperation(
           () -> uploadDataSegment(segment, binaryVersion, size, outFile, azurePath),
           accountConfig.getMaxTries()
diff --git a/extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/AzureDataSegmentPusherTest.java b/extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/AzureDataSegmentPusherTest.java
index 8024d67..f3e65c7 100644
--- a/extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/AzureDataSegmentPusherTest.java
+++ b/extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/AzureDataSegmentPusherTest.java
@@ -145,7 +145,11 @@ public class AzureDataSegmentPusherTest extends EasyMockSupport
     Files.write(DATA, tmp);
 
     String azurePath = pusher.getAzurePath(SEGMENT_TO_PUSH, useUniquePath);
-    azureStorage.uploadBlob(EasyMock.anyObject(File.class), EasyMock.eq(CONTAINER_NAME), EasyMock.eq(azurePath));
+    azureStorage.uploadBlob(
+        EasyMock.anyObject(File.class),
+        EasyMock.eq(CONTAINER_NAME),
+        EasyMock.eq(PREFIX + "/" + azurePath)
+    );
     EasyMock.expectLastCall();
 
     replayAll();
diff --git a/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/GoogleDataSegmentPusher.java b/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/GoogleDataSegmentPusher.java
index 35c763f..e352926 100644
--- a/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/GoogleDataSegmentPusher.java
+++ b/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/GoogleDataSegmentPusher.java
@@ -104,15 +104,20 @@ public class GoogleDataSegmentPusher implements DataSegmentPusher
       throws IOException
   {
     log.debug("Uploading [%s] to Google.", indexFilesDir);
+    final String storageDir = this.getStorageDir(segment, useUniquePath);
+    return pushToPath(indexFilesDir, segment, storageDir);
+  }
 
+  @Override
+  public DataSegment pushToPath(File indexFilesDir, DataSegment segment, String storageDirSuffix) throws IOException
+  {
     final int version = SegmentUtils.getVersionFromDir(indexFilesDir);
     File indexFile = null;
 
     try {
       indexFile = File.createTempFile("index", ".zip");
       final long indexSize = CompressionUtils.zip(indexFilesDir, indexFile);
-      final String storageDir = this.getStorageDir(segment, useUniquePath);
-      final String indexPath = buildPath(storageDir + "/" + "index.zip");
+      final String indexPath = buildPath(storageDirSuffix + "/" + "index.zip");
 
       final DataSegment outSegment = segment
           .withSize(indexSize)
diff --git a/extensions-core/hdfs-storage/src/main/java/org/apache/druid/storage/hdfs/HdfsDataSegmentPusher.java b/extensions-core/hdfs-storage/src/main/java/org/apache/druid/storage/hdfs/HdfsDataSegmentPusher.java
index 0354e3c..e262b40 100644
--- a/extensions-core/hdfs-storage/src/main/java/org/apache/druid/storage/hdfs/HdfsDataSegmentPusher.java
+++ b/extensions-core/hdfs-storage/src/main/java/org/apache/druid/storage/hdfs/HdfsDataSegmentPusher.java
@@ -105,13 +105,30 @@ public class HdfsDataSegmentPusher implements DataSegmentPusher
     // '{partitionNum}_index.zip' without unique paths and '{partitionNum}_{UUID}_index.zip' with unique paths.
     final String storageDir = this.getStorageDir(segment, false);
 
+
+    final String uniquePrefix = useUniquePath ? DataSegmentPusher.generateUniquePath() + "_" : "";
+    final String outIndexFilePathSuffix = StringUtils.format(
+        "%s/%d_%sindex.zip",
+        storageDir,
+        segment.getShardSpec().getPartitionNum(),
+        uniquePrefix
+    );
+
+    return pushToPath(inDir, segment, outIndexFilePathSuffix);
+  }
+
+  @Override
+  public DataSegment pushToPath(File inDir, DataSegment segment, String storageDirSuffix) throws IOException
+  {
     log.debug(
         "Copying segment[%s] to HDFS at location[%s/%s]",
         segment.getId(),
         fullyQualifiedStorageDirectory.get(),
-        storageDir
+        storageDirSuffix
     );
 
+    final String storageDir = StringUtils.format("%s/%s", fullyQualifiedStorageDirectory.get(), storageDirSuffix);
+
     Path tmpIndexFile = new Path(StringUtils.format(
         "%s/%s/%s/%s_index.zip",
         fullyQualifiedStorageDirectory.get(),
@@ -130,16 +147,7 @@ public class HdfsDataSegmentPusher implements DataSegmentPusher
       try (FSDataOutputStream out = fs.create(tmpIndexFile)) {
         size = CompressionUtils.zip(inDir, out);
       }
-
-      final String uniquePrefix = useUniquePath ? DataSegmentPusher.generateUniquePath() + "_" : "";
-      final Path outIndexFile = new Path(StringUtils.format(
-          "%s/%s/%d_%sindex.zip",
-          fullyQualifiedStorageDirectory.get(),
-          storageDir,
-          segment.getShardSpec().getPartitionNum(),
-          uniquePrefix
-      ));
-
+      final Path outIndexFile = new Path(storageDir);
       dataSegment = segment.withLoadSpec(makeLoadSpec(outIndexFile.toUri()))
                            .withSize(size)
                            .withBinaryVersion(SegmentUtils.getVersionFromDir(inDir));
diff --git a/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3DataSegmentPusher.java b/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3DataSegmentPusher.java
index 17dac63..da4b27e 100644
--- a/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3DataSegmentPusher.java
+++ b/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3DataSegmentPusher.java
@@ -79,8 +79,13 @@ public class S3DataSegmentPusher implements DataSegmentPusher
   public DataSegment push(final File indexFilesDir, final DataSegment inSegment, final boolean useUniquePath)
       throws IOException
   {
-    final String s3Path = S3Utils.constructSegmentPath(config.getBaseKey(), getStorageDir(inSegment, useUniquePath));
+    return pushToPath(indexFilesDir, inSegment, getStorageDir(inSegment, useUniquePath));
+  }
 
+  @Override
+  public DataSegment pushToPath(File indexFilesDir, DataSegment inSegment, String storageDirSuffix) throws IOException
+  {
+    final String s3Path = S3Utils.constructSegmentPath(config.getBaseKey(), storageDirSuffix);
     log.debug("Copying segment[%s] to S3 at location[%s]", inSegment.getId(), s3Path);
 
     final File zipOutFile = File.createTempFile("druid", "index.zip");
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/DeepStoragePartitionLocation.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/DeepStoragePartitionLocation.java
new file mode 100644
index 0000000..f845b74
--- /dev/null
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/DeepStoragePartitionLocation.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.common.task.batch.parallel;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.druid.timeline.partition.BuildingShardSpec;
+import org.joda.time.Interval;
+
+import java.util.Map;
+import java.util.Objects;
+
+/**
+ * This class represents the intermediary deep storage location where the partition of {@code interval} and {@code shardSpec}
+ * is stored.
+ */
+public class DeepStoragePartitionLocation implements PartitionLocation
+{
+  private final String subTaskId;
+  private final Interval interval;
+  private final BuildingShardSpec shardSpec;
+  private final Map<String, Object> loadSpec;
+
+  @JsonCreator
+  public DeepStoragePartitionLocation(
+      @JsonProperty("subTaskId") String subTaskId,
+      @JsonProperty("interval") Interval interval,
+      @JsonProperty("shardSpec") BuildingShardSpec shardSpec,
+      @JsonProperty("loadSpec") Map<String, Object> loadSpec
+  )
+  {
+    this.subTaskId = subTaskId;
+    this.interval = interval;
+    this.shardSpec = shardSpec;
+    this.loadSpec = loadSpec;
+  }
+
+  @JsonIgnore
+  @Override
+  public int getBucketId()
+  {
+    return shardSpec.getBucketId();
+  }
+
+  @JsonProperty
+  @Override
+  public Interval getInterval()
+  {
+    return interval;
+  }
+
+  @JsonProperty
+  @Override
+  public BuildingShardSpec getShardSpec()
+  {
+    return shardSpec;
+  }
+
+  @JsonProperty
+  @Override
+  public String getSubTaskId()
+  {
+    return subTaskId;
+  }
+
+  @JsonProperty
+  public Map<String, Object> getLoadSpec()
+  {
+    return loadSpec;
+  }
+
+  @Override
+  public boolean equals(Object o)
+  {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    DeepStoragePartitionLocation that = (DeepStoragePartitionLocation) o;
+    return subTaskId.equals(that.subTaskId)
+           && interval.equals(that.interval)
+           && shardSpec.equals(that.shardSpec)
+           && loadSpec.equals(that.loadSpec);
+  }
+
+  @Override
+  public int hashCode()
+  {
+    return Objects.hash(subTaskId, interval, shardSpec, loadSpec);
+  }
+
+  @Override
+  public String toString()
+  {
+    return "DeepStoragePartitionLocation{" +
+           "subTaskId='" + subTaskId + '\'' +
+           ", interval=" + interval +
+           ", shardSpec=" + shardSpec +
+           ", loadSpec=" + loadSpec +
+           '}';
+  }
+}
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/GenericPartitionStat.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/DeepStoragePartitionStat.java
similarity index 53%
copy from indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/GenericPartitionStat.java
copy to indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/DeepStoragePartitionStat.java
index a4ac80b..7d78043 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/GenericPartitionStat.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/DeepStoragePartitionStat.java
@@ -22,54 +22,70 @@ package org.apache.druid.indexing.common.task.batch.parallel;
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import org.apache.druid.timeline.partition.BucketNumberedShardSpec;
-import org.apache.druid.timeline.partition.ShardSpec;
+import org.apache.druid.timeline.partition.BuildingShardSpec;
 import org.joda.time.Interval;
 
-import javax.annotation.Nullable;
+import java.util.Map;
 import java.util.Objects;
 
 /**
- * Generic partition description ({@link ShardSpec}) and statistics created by {@link PartialSegmentGenerateTask}. Each
- * partition is a set of data of the same time chunk (primary partition key) and the same {@link ShardSpec} (secondary
- * partition key). The {@link ShardSpec} is later used by {@link PartialGenericSegmentMergeTask} to merge the partial
- * segments.
+ * Similar to {@link GenericPartitionStat} but contains information about deep storage location where it is stored
  */
-public class GenericPartitionStat extends PartitionStat<BucketNumberedShardSpec>
+public class DeepStoragePartitionStat implements PartitionStat
 {
-  private static final String PROP_SHARD_SPEC = "shardSpec";
-
+  public static final String TYPE = "deepstore";
+  static final String PROP_SHARD_SPEC = "shardSpec";
+  private final Map<String, Object> loadSpec;
+  // Primary partition key
+  private final Interval interval;
   // Secondary partition key
   private final BucketNumberedShardSpec shardSpec;
 
   @JsonCreator
-  public GenericPartitionStat(
-      @JsonProperty("taskExecutorHost") String taskExecutorHost,
-      @JsonProperty("taskExecutorPort") int taskExecutorPort,
-      @JsonProperty("useHttps") boolean useHttps,
+  public DeepStoragePartitionStat(
       @JsonProperty("interval") Interval interval,
       @JsonProperty(PROP_SHARD_SPEC) BucketNumberedShardSpec shardSpec,
-      @JsonProperty("numRows") @Nullable Integer numRows,
-      @JsonProperty("sizeBytes") @Nullable Long sizeBytes
+      @JsonProperty("loadSpec") Map<String, Object> loadSpec
   )
   {
-    super(taskExecutorHost, taskExecutorPort, useHttps, interval, numRows, sizeBytes);
+    this.interval = interval;
     this.shardSpec = shardSpec;
+    this.loadSpec = loadSpec;
+  }
+
+  @JsonProperty
+  public Map<String, Object> getLoadSpec()
+  {
+    return loadSpec;
   }
 
+  @JsonProperty
   @Override
-  public int getBucketId()
+  public Interval getInterval()
   {
-    return shardSpec.getBucketId();
+    return interval;
   }
 
   @JsonProperty(PROP_SHARD_SPEC)
   @Override
-  BucketNumberedShardSpec getSecondaryPartition()
+  public BucketNumberedShardSpec getSecondaryPartition()
   {
     return shardSpec;
   }
 
   @Override
+  public int getBucketId()
+  {
+    return shardSpec.getBucketId();
+  }
+
+  @Override
+  public DeepStoragePartitionLocation toPartitionLocation(String subtaskId, BuildingShardSpec secondaryParition)
+  {
+    return new DeepStoragePartitionLocation(subtaskId, interval, secondaryParition, loadSpec);
+  }
+
+  @Override
   public boolean equals(Object o)
   {
     if (this == o) {
@@ -78,16 +94,23 @@ public class GenericPartitionStat extends PartitionStat<BucketNumberedShardSpec>
     if (o == null || getClass() != o.getClass()) {
       return false;
     }
-    if (!super.equals(o)) {
-      return false;
-    }
-    GenericPartitionStat that = (GenericPartitionStat) o;
-    return Objects.equals(shardSpec, that.shardSpec);
+    DeepStoragePartitionStat that = (DeepStoragePartitionStat) o;
+    return loadSpec.equals(that.loadSpec) && interval.equals(that.interval) && shardSpec.equals(that.shardSpec);
   }
 
   @Override
   public int hashCode()
   {
-    return Objects.hash(super.hashCode(), shardSpec);
+    return Objects.hash(loadSpec, interval, shardSpec);
+  }
+
+  @Override
+  public String toString()
+  {
+    return "DeepStoragePartitionStat{" +
+           "loadSpec=" + loadSpec +
+           ", interval=" + interval +
+           ", shardSpec=" + shardSpec +
+           '}';
   }
 }
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/DeepStorageShuffleClient.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/DeepStorageShuffleClient.java
new file mode 100644
index 0000000..ee906ba
--- /dev/null
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/DeepStorageShuffleClient.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.common.task.batch.parallel;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.inject.Inject;
+import org.apache.commons.io.FileUtils;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.segment.loading.LoadSpec;
+import org.apache.druid.segment.loading.SegmentLoadingException;
+
+import java.io.File;
+import java.io.IOException;
+
+public class DeepStorageShuffleClient implements ShuffleClient<DeepStoragePartitionLocation>
+{
+  private static final Logger LOG = new Logger(DeepStorageShuffleClient.class);
+  private final ObjectMapper objectMapper;
+
+  @Inject
+  public DeepStorageShuffleClient(ObjectMapper objectMapper)
+  {
+    this.objectMapper = objectMapper;
+  }
+
+  @Override
+  public File fetchSegmentFile(File partitionDir, String supervisorTaskId, DeepStoragePartitionLocation location)
+      throws IOException
+  {
+    final LoadSpec loadSpec = objectMapper.convertValue(location.getLoadSpec(), LoadSpec.class);
+    final File unzippedDir = new File(partitionDir, StringUtils.format("unzipped_%s", location.getSubTaskId()));
+    FileUtils.forceMkdir(unzippedDir);
+    try {
+      loadSpec.loadSegment(unzippedDir);
+    }
+    catch (SegmentLoadingException e) {
+      LOG.error(e, "Failed to load segment");
+      throw new IOException(e);
+    }
+    return unzippedDir;
+  }
+}
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/GeneratedPartitionsMetadataReport.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/GeneratedPartitionsMetadataReport.java
index b19a5fc..553cc1d 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/GeneratedPartitionsMetadataReport.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/GeneratedPartitionsMetadataReport.java
@@ -26,16 +26,16 @@ import java.util.List;
 
 /**
  * Report containing the {@link GenericPartitionStat}s created by a {@link PartialSegmentGenerateTask}. This report is
- * collected by {@link ParallelIndexSupervisorTask} and used to generate {@link PartialGenericSegmentMergeIOConfig}.
+ * collected by {@link ParallelIndexSupervisorTask} and used to generate {@link PartialSegmentMergeIOConfig}.
  */
-class GeneratedPartitionsMetadataReport extends GeneratedPartitionsReport<GenericPartitionStat>
+class GeneratedPartitionsMetadataReport extends GeneratedPartitionsReport
 {
   public static final String TYPE = "generated_partitions_metadata";
 
   @JsonCreator
   GeneratedPartitionsMetadataReport(
       @JsonProperty("taskId") String taskId,
-      @JsonProperty("partitionStats") List<GenericPartitionStat> partitionStats
+      @JsonProperty("partitionStats") List<PartitionStat> partitionStats
   )
   {
     super(taskId, partitionStats);
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/GeneratedPartitionsReport.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/GeneratedPartitionsReport.java
index bfe8cef..d05d4dd 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/GeneratedPartitionsReport.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/GeneratedPartitionsReport.java
@@ -29,12 +29,12 @@ import java.util.Objects;
  * This report is collected by {@link ParallelIndexSupervisorTask} and
  * used to generate {@link PartialSegmentMergeIOConfig}.
  */
-abstract class GeneratedPartitionsReport<T extends PartitionStat> implements SubTaskReport
+public class GeneratedPartitionsReport implements SubTaskReport
 {
   private final String taskId;
-  private final List<T> partitionStats;
+  private final List<PartitionStat> partitionStats;
 
-  GeneratedPartitionsReport(String taskId, List<T> partitionStats)
+  GeneratedPartitionsReport(String taskId, List<PartitionStat> partitionStats)
   {
     this.taskId = taskId;
     this.partitionStats = partitionStats;
@@ -48,7 +48,7 @@ abstract class GeneratedPartitionsReport<T extends PartitionStat> implements Sub
   }
 
   @JsonProperty
-  public List<T> getPartitionStats()
+  public List<PartitionStat> getPartitionStats()
   {
     return partitionStats;
   }
@@ -72,4 +72,13 @@ abstract class GeneratedPartitionsReport<T extends PartitionStat> implements Sub
   {
     return Objects.hash(taskId, partitionStats);
   }
+
+  @Override
+  public String toString()
+  {
+    return "GeneratedPartitionsReport{" +
+        "taskId='" + taskId + '\'' +
+        ", partitionStats=" + partitionStats +
+        '}';
+  }
 }
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/GenericPartitionLocation.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/GenericPartitionLocation.java
index 74c4c17..c921efe 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/GenericPartitionLocation.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/GenericPartitionLocation.java
@@ -22,15 +22,26 @@ package org.apache.druid.indexing.common.task.batch.parallel;
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonIgnore;
 import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.timeline.partition.BuildingShardSpec;
 import org.joda.time.Interval;
 
+import java.net.URI;
+import java.util.Objects;
+
 /**
  * This class represents the intermediary data server where the partition of {@code interval} and {@code shardSpec}
  * is stored.
  */
-public class GenericPartitionLocation extends PartitionLocation<BuildingShardSpec>
+public class GenericPartitionLocation implements PartitionLocation
 {
+  private final String host;
+  private final int port;
+  private final boolean useHttps;
+  private final String subTaskId;
+  private final Interval interval;
+  private final BuildingShardSpec shardSpec;
+
   @JsonCreator
   public GenericPartitionLocation(
       @JsonProperty("host") String host,
@@ -41,19 +52,111 @@ public class GenericPartitionLocation extends PartitionLocation<BuildingShardSpe
       @JsonProperty("shardSpec") BuildingShardSpec shardSpec
   )
   {
-    super(host, port, useHttps, subTaskId, interval, shardSpec);
+    this.host = host;
+    this.port = port;
+    this.useHttps = useHttps;
+    this.subTaskId = subTaskId;
+    this.interval = interval;
+    this.shardSpec = shardSpec;
+  }
+
+  @JsonProperty
+  public String getHost()
+  {
+    return host;
+  }
+
+  @JsonProperty
+  public int getPort()
+  {
+    return port;
+  }
+
+  @JsonProperty
+  public boolean isUseHttps()
+  {
+    return useHttps;
+  }
+
+  @JsonProperty
+  @Override
+  public String getSubTaskId()
+  {
+    return subTaskId;
+  }
+
+  @JsonProperty
+  @Override
+  public Interval getInterval()
+  {
+    return interval;
   }
 
   @JsonIgnore
   @Override
   public int getBucketId()
   {
-    return getSecondaryPartition().getBucketId();
+    return shardSpec.getBucketId();
   }
 
   @JsonProperty
-  BuildingShardSpec getShardSpec()
+  @Override
+  public BuildingShardSpec getShardSpec()
+  {
+    return shardSpec;
+  }
+
+  final URI toIntermediaryDataServerURI(String supervisorTaskId)
+  {
+    return URI.create(
+        StringUtils.format(
+            "%s://%s:%d/druid/worker/v1/shuffle/task/%s/%s/partition?startTime=%s&endTime=%s&bucketId=%d",
+            useHttps ? "https" : "http",
+            host,
+            port,
+            StringUtils.urlEncode(supervisorTaskId),
+            StringUtils.urlEncode(subTaskId),
+            interval.getStart(),
+            interval.getEnd(),
+            getBucketId()
+        )
+    );
+  }
+
+  @Override
+  public boolean equals(Object o)
+  {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    GenericPartitionLocation that = (GenericPartitionLocation) o;
+    return port == that.port
+           && useHttps == that.useHttps
+           && host.equals(that.host)
+           && subTaskId.equals(that.subTaskId)
+           && interval.equals(that.interval)
+           && shardSpec.equals(that.shardSpec);
+  }
+
+  @Override
+  public int hashCode()
+  {
+    return Objects.hash(host, port, useHttps, subTaskId, interval, shardSpec);
+  }
+
+  @Override
+  public String toString()
   {
-    return getSecondaryPartition();
+    return "GenericPartitionLocation{" +
+           "host='" + host + '\'' +
+           ", port=" + port +
+           ", useHttps=" + useHttps +
+           ", subTaskId='" + subTaskId + '\'' +
+           ", interval=" + interval +
+           ", shardSpec=" + shardSpec +
+           '}';
   }
 }
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/GenericPartitionStat.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/GenericPartitionStat.java
index a4ac80b..aada0a2 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/GenericPartitionStat.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/GenericPartitionStat.java
@@ -22,6 +22,7 @@ package org.apache.druid.indexing.common.task.batch.parallel;
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import org.apache.druid.timeline.partition.BucketNumberedShardSpec;
+import org.apache.druid.timeline.partition.BuildingShardSpec;
 import org.apache.druid.timeline.partition.ShardSpec;
 import org.joda.time.Interval;
 
@@ -34,10 +35,22 @@ import java.util.Objects;
  * partition key). The {@link ShardSpec} is later used by {@link PartialGenericSegmentMergeTask} to merge the partial
  * segments.
  */
-public class GenericPartitionStat extends PartitionStat<BucketNumberedShardSpec>
+public class GenericPartitionStat implements PartitionStat
 {
-  private static final String PROP_SHARD_SPEC = "shardSpec";
+  public static final String TYPE = "local";
+  static final String PROP_SHARD_SPEC = "shardSpec";
 
+  // Host and port of the task executor
+  private final String taskExecutorHost;
+  private final int taskExecutorPort;
+  private final boolean useHttps;
+  // Primary partition key
+  private final Interval interval;
+  // numRows and sizeBytes are always null currently and will be filled properly in the future.
+  @Nullable
+  private final Integer numRows;
+  @Nullable
+  private final Long sizeBytes;
   // Secondary partition key
   private final BucketNumberedShardSpec shardSpec;
 
@@ -52,10 +65,54 @@ public class GenericPartitionStat extends PartitionStat<BucketNumberedShardSpec>
       @JsonProperty("sizeBytes") @Nullable Long sizeBytes
   )
   {
-    super(taskExecutorHost, taskExecutorPort, useHttps, interval, numRows, sizeBytes);
+    this.taskExecutorHost = taskExecutorHost;
+    this.taskExecutorPort = taskExecutorPort;
+    this.useHttps = useHttps;
+    this.interval = interval;
+    this.numRows = numRows == null ? 0 : numRows;
+    this.sizeBytes = sizeBytes == null ? 0 : sizeBytes;
     this.shardSpec = shardSpec;
   }
 
+  @JsonProperty
+  public final String getTaskExecutorHost()
+  {
+    return taskExecutorHost;
+  }
+
+  @JsonProperty
+  public final int getTaskExecutorPort()
+  {
+    return taskExecutorPort;
+  }
+
+  @JsonProperty
+  public final boolean isUseHttps()
+  {
+    return useHttps;
+  }
+
+  @JsonProperty
+  @Override
+  public final Interval getInterval()
+  {
+    return interval;
+  }
+
+  @Nullable
+  @JsonProperty
+  public final Integer getNumRows()
+  {
+    return numRows;
+  }
+
+  @Nullable
+  @JsonProperty
+  public final Long getSizeBytes()
+  {
+    return sizeBytes;
+  }
+
   @Override
   public int getBucketId()
   {
@@ -64,12 +121,25 @@ public class GenericPartitionStat extends PartitionStat<BucketNumberedShardSpec>
 
   @JsonProperty(PROP_SHARD_SPEC)
   @Override
-  BucketNumberedShardSpec getSecondaryPartition()
+  public BucketNumberedShardSpec getSecondaryPartition()
   {
     return shardSpec;
   }
 
   @Override
+  public GenericPartitionLocation toPartitionLocation(String subtaskId, BuildingShardSpec secondaryParition)
+  {
+    return new GenericPartitionLocation(
+        getTaskExecutorHost(),
+        getTaskExecutorPort(),
+        isUseHttps(),
+        subtaskId,
+        getInterval(),
+        secondaryParition
+    );
+  }
+
+  @Override
   public boolean equals(Object o)
   {
     if (this == o) {
@@ -78,16 +148,33 @@ public class GenericPartitionStat extends PartitionStat<BucketNumberedShardSpec>
     if (o == null || getClass() != o.getClass()) {
       return false;
     }
-    if (!super.equals(o)) {
-      return false;
-    }
     GenericPartitionStat that = (GenericPartitionStat) o;
-    return Objects.equals(shardSpec, that.shardSpec);
+    return taskExecutorPort == that.taskExecutorPort
+           && useHttps == that.useHttps
+           && taskExecutorHost.equals(that.taskExecutorHost)
+           && interval.equals(that.interval)
+           && Objects.equals(numRows, that.numRows)
+           && Objects.equals(sizeBytes, that.sizeBytes)
+           && shardSpec.equals(that.shardSpec);
   }
 
   @Override
   public int hashCode()
   {
-    return Objects.hash(super.hashCode(), shardSpec);
+    return Objects.hash(taskExecutorHost, taskExecutorPort, useHttps, interval, numRows, sizeBytes, shardSpec);
+  }
+
+  @Override
+  public String toString()
+  {
+    return "GenericPartitionStat{" +
+           "taskExecutorHost='" + taskExecutorHost + '\'' +
+           ", taskExecutorPort=" + taskExecutorPort +
+           ", useHttps=" + useHttps +
+           ", interval=" + interval +
+           ", numRows=" + numRows +
+           ", sizeBytes=" + sizeBytes +
+           ", shardSpec=" + shardSpec +
+           '}';
   }
 }
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/HttpShuffleClient.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/HttpShuffleClient.java
index 58c41e6..13abcae 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/HttpShuffleClient.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/HttpShuffleClient.java
@@ -24,9 +24,11 @@ import com.google.inject.Inject;
 import org.apache.druid.guice.annotations.EscalatedClient;
 import org.apache.druid.java.util.common.FileUtils;
 import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.logger.Logger;
 import org.apache.druid.java.util.http.client.HttpClient;
 import org.apache.druid.java.util.http.client.Request;
 import org.apache.druid.java.util.http.client.response.InputStreamResponseHandler;
+import org.apache.druid.utils.CompressionUtils;
 import org.jboss.netty.handler.codec.http.HttpMethod;
 
 import java.io.File;
@@ -38,8 +40,10 @@ import java.util.concurrent.ExecutionException;
  * HTTP-based ShuffleClient.
  * This class is injected as a lazy singleton instance and thus must be stateless.
  */
-public class HttpShuffleClient implements ShuffleClient
+public class HttpShuffleClient implements ShuffleClient<GenericPartitionLocation>
 {
+  private static final Logger LOG = new Logger(HttpShuffleClient.class);
+
   @VisibleForTesting
   static final int NUM_FETCH_RETRIES = 3;
 
@@ -54,10 +58,10 @@ public class HttpShuffleClient implements ShuffleClient
   }
 
   @Override
-  public <T, P extends PartitionLocation<T>> File fetchSegmentFile(
+  public File fetchSegmentFile(
       File partitionDir,
       String supervisorTaskId,
-      P location
+      GenericPartitionLocation location
   ) throws IOException
   {
     // Create a local buffer since this class is not thread-safe.
@@ -82,6 +86,16 @@ public class HttpShuffleClient implements ShuffleClient
         NUM_FETCH_RETRIES,
         StringUtils.format("Failed to fetch file[%s]", uri)
     );
-    return zippedFile;
+    final File unzippedDir = new File(partitionDir, StringUtils.format("unzipped_%s", location.getSubTaskId()));
+    try {
+      org.apache.commons.io.FileUtils.forceMkdir(unzippedDir);
+      CompressionUtils.unzip(zippedFile, unzippedDir);
+    }
+    finally {
+      if (!zippedFile.delete()) {
+        LOG.warn("Failed to delete temp file[%s]", zippedFile);
+      }
+    }
+    return unzippedDir;
   }
 }
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java
index d2a3ca7..b66f49f 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java
@@ -355,7 +355,7 @@ public class ParallelIndexSupervisorTask extends AbstractBatchIndexTask implemen
   @VisibleForTesting
   PartialGenericSegmentMergeParallelIndexTaskRunner createPartialGenericSegmentMergeRunner(
       TaskToolbox toolbox,
-      List<PartialGenericSegmentMergeIOConfig> ioConfigs,
+      List<PartialSegmentMergeIOConfig> ioConfigs,
       ParallelIndexIngestionSpec ingestionSchema
   )
   {
@@ -693,7 +693,7 @@ public class ParallelIndexSupervisorTask extends AbstractBatchIndexTask implemen
 
     // 1. Partial segment generation phase
     final ParallelIndexIngestionSpec segmentCreateIngestionSpec = ingestionSchemaToUse;
-    ParallelIndexTaskRunner<PartialHashSegmentGenerateTask, GeneratedPartitionsReport<GenericPartitionStat>> indexingRunner =
+    ParallelIndexTaskRunner<PartialHashSegmentGenerateTask, GeneratedPartitionsReport> indexingRunner =
         createRunner(
             toolbox,
             f -> createPartialHashSegmentGenerateRunner(toolbox, segmentCreateIngestionSpec, intervalToNumShards)
@@ -710,9 +710,9 @@ public class ParallelIndexSupervisorTask extends AbstractBatchIndexTask implemen
 
     // 2. Partial segment merge phase
     // partition (interval, partitionId) -> partition locations
-    Map<Pair<Interval, Integer>, List<GenericPartitionLocation>> partitionToLocations =
+    Map<Pair<Interval, Integer>, List<PartitionLocation>> partitionToLocations =
         groupGenericPartitionLocationsPerPartition(indexingRunner.getReports());
-    final List<PartialGenericSegmentMergeIOConfig> ioConfigs = createGenericMergeIOConfigs(
+    final List<PartialSegmentMergeIOConfig> ioConfigs = createGenericMergeIOConfigs(
         ingestionSchema.getTuningConfig().getTotalNumMergeTasks(),
         partitionToLocations
     );
@@ -780,7 +780,7 @@ public class ParallelIndexSupervisorTask extends AbstractBatchIndexTask implemen
     );
 
     final ParallelIndexIngestionSpec segmentCreateIngestionSpec = ingestionSchemaToUse;
-    ParallelIndexTaskRunner<PartialRangeSegmentGenerateTask, GeneratedPartitionsReport<GenericPartitionStat>> indexingRunner =
+    ParallelIndexTaskRunner<PartialRangeSegmentGenerateTask, GeneratedPartitionsReport> indexingRunner =
         createRunner(
             toolbox,
             tb -> createPartialRangeSegmentGenerateRunner(tb, intervalToPartitions, segmentCreateIngestionSpec)
@@ -796,9 +796,9 @@ public class ParallelIndexSupervisorTask extends AbstractBatchIndexTask implemen
     }
 
     // partition (interval, partitionId) -> partition locations
-    Map<Pair<Interval, Integer>, List<GenericPartitionLocation>> partitionToLocations =
+    Map<Pair<Interval, Integer>, List<PartitionLocation>> partitionToLocations =
         groupGenericPartitionLocationsPerPartition(indexingRunner.getReports());
-    final List<PartialGenericSegmentMergeIOConfig> ioConfigs = createGenericMergeIOConfigs(
+    final List<PartialSegmentMergeIOConfig> ioConfigs = createGenericMergeIOConfigs(
         ingestionSchema.getTuningConfig().getTotalNumMergeTasks(),
         partitionToLocations
     );
@@ -905,13 +905,13 @@ public class ParallelIndexSupervisorTask extends AbstractBatchIndexTask implemen
     return partitions;
   }
 
-  private static Map<Pair<Interval, Integer>, List<GenericPartitionLocation>> groupGenericPartitionLocationsPerPartition(
-      Map<String, GeneratedPartitionsReport<GenericPartitionStat>> subTaskIdToReport
+  private static Map<Pair<Interval, Integer>, List<PartitionLocation>> groupGenericPartitionLocationsPerPartition(
+      Map<String, GeneratedPartitionsReport> subTaskIdToReport
   )
   {
     final Map<Pair<Interval, Integer>, BuildingShardSpec<?>> intervalAndIntegerToShardSpec = new HashMap<>();
     final Object2IntMap<Interval> intervalToNextPartitionId = new Object2IntOpenHashMap<>();
-    final BiFunction<String, GenericPartitionStat, GenericPartitionLocation> createPartitionLocationFunction =
+    final BiFunction<String, PartitionStat, PartitionLocation> createPartitionLocationFunction =
         (subtaskId, partitionStat) -> {
           final BuildingShardSpec<?> shardSpec = intervalAndIntegerToShardSpec.computeIfAbsent(
               Pair.of(partitionStat.getInterval(), partitionStat.getBucketId()),
@@ -925,31 +925,24 @@ public class ParallelIndexSupervisorTask extends AbstractBatchIndexTask implemen
                 return partitionStat.getSecondaryPartition().convert(partitionId);
               }
           );
-          return new GenericPartitionLocation(
-              partitionStat.getTaskExecutorHost(),
-              partitionStat.getTaskExecutorPort(),
-              partitionStat.isUseHttps(),
-              subtaskId,
-              partitionStat.getInterval(),
-              shardSpec
-          );
+          return partitionStat.toPartitionLocation(subtaskId, shardSpec);
         };
 
     return groupPartitionLocationsPerPartition(subTaskIdToReport, createPartitionLocationFunction);
   }
 
-  private static <S extends PartitionStat, L extends PartitionLocation>
+  private static <L extends PartitionLocation>
       Map<Pair<Interval, Integer>, List<L>> groupPartitionLocationsPerPartition(
-      Map<String, ? extends GeneratedPartitionsReport<S>> subTaskIdToReport,
-      BiFunction<String, S, L> createPartitionLocationFunction
+      Map<String, ? extends GeneratedPartitionsReport> subTaskIdToReport,
+      BiFunction<String, PartitionStat, L> createPartitionLocationFunction
   )
   {
     // partition (interval, partitionId) -> partition locations
     final Map<Pair<Interval, Integer>, List<L>> partitionToLocations = new HashMap<>();
-    for (Entry<String, ? extends GeneratedPartitionsReport<S>> entry : subTaskIdToReport.entrySet()) {
+    for (Entry<String, ? extends GeneratedPartitionsReport> entry : subTaskIdToReport.entrySet()) {
       final String subTaskId = entry.getKey();
-      final GeneratedPartitionsReport<S> report = entry.getValue();
-      for (S partitionStat : report.getPartitionStats()) {
+      final GeneratedPartitionsReport report = entry.getValue();
+      for (PartitionStat partitionStat : report.getPartitionStats()) {
         final List<L> locationsOfSamePartition = partitionToLocations.computeIfAbsent(
             Pair.of(partitionStat.getInterval(), partitionStat.getBucketId()),
             k -> new ArrayList<>()
@@ -961,15 +954,15 @@ public class ParallelIndexSupervisorTask extends AbstractBatchIndexTask implemen
     return partitionToLocations;
   }
 
-  private static List<PartialGenericSegmentMergeIOConfig> createGenericMergeIOConfigs(
+  private static List<PartialSegmentMergeIOConfig> createGenericMergeIOConfigs(
       int totalNumMergeTasks,
-      Map<Pair<Interval, Integer>, List<GenericPartitionLocation>> partitionToLocations
+      Map<Pair<Interval, Integer>, List<PartitionLocation>> partitionToLocations
   )
   {
     return createMergeIOConfigs(
         totalNumMergeTasks,
         partitionToLocations,
-        PartialGenericSegmentMergeIOConfig::new
+        PartialSegmentMergeIOConfig::new
     );
   }
 
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialGenericSegmentMergeIOConfig.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialGenericSegmentMergeIOConfig.java
deleted file mode 100644
index bbec73f..0000000
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialGenericSegmentMergeIOConfig.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.druid.indexing.common.task.batch.parallel;
-
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonProperty;
-import com.fasterxml.jackson.annotation.JsonTypeName;
-import org.apache.druid.segment.indexing.IOConfig;
-
-import java.util.List;
-
-@JsonTypeName(PartialGenericSegmentMergeTask.TYPE)
-class PartialGenericSegmentMergeIOConfig extends PartialSegmentMergeIOConfig<GenericPartitionLocation>
-    implements IOConfig
-{
-  @JsonCreator
-  PartialGenericSegmentMergeIOConfig(
-      @JsonProperty("partitionLocations") List<GenericPartitionLocation> partitionLocations
-  )
-  {
-    super(partitionLocations);
-  }
-}
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialGenericSegmentMergeIngestionSpec.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialGenericSegmentMergeIngestionSpec.java
deleted file mode 100644
index 52edad6..0000000
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialGenericSegmentMergeIngestionSpec.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.druid.indexing.common.task.batch.parallel;
-
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonProperty;
-import org.apache.druid.segment.indexing.DataSchema;
-
-class PartialGenericSegmentMergeIngestionSpec
-    extends PartialSegmentMergeIngestionSpec<PartialGenericSegmentMergeIOConfig>
-{
-  @JsonCreator
-  PartialGenericSegmentMergeIngestionSpec(
-      @JsonProperty("dataSchema") DataSchema dataSchema,
-      @JsonProperty("ioConfig") PartialGenericSegmentMergeIOConfig ioConfig,
-      @JsonProperty("tuningConfig") ParallelIndexTuningConfig tuningConfig
-  )
-  {
-    super(dataSchema, ioConfig, tuningConfig);
-  }
-}
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialGenericSegmentMergeParallelIndexTaskRunner.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialGenericSegmentMergeParallelIndexTaskRunner.java
index 4050a01..8babf50 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialGenericSegmentMergeParallelIndexTaskRunner.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialGenericSegmentMergeParallelIndexTaskRunner.java
@@ -37,7 +37,7 @@ class PartialGenericSegmentMergeParallelIndexTaskRunner
   private static final String PHASE_NAME = "partial segment merge";
 
   private final DataSchema dataSchema;
-  private final List<PartialGenericSegmentMergeIOConfig> mergeIOConfigs;
+  private final List<PartialSegmentMergeIOConfig> mergeIOConfigs;
 
   PartialGenericSegmentMergeParallelIndexTaskRunner(
       TaskToolbox toolbox,
@@ -45,7 +45,7 @@ class PartialGenericSegmentMergeParallelIndexTaskRunner
       String groupId,
       String baseSubtaskSpecName,
       DataSchema dataSchema,
-      List<PartialGenericSegmentMergeIOConfig> mergeIOConfigs,
+      List<PartialSegmentMergeIOConfig> mergeIOConfigs,
       ParallelIndexTuningConfig tuningConfig,
       Map<String, Object> context
   )
@@ -75,9 +75,9 @@ class PartialGenericSegmentMergeParallelIndexTaskRunner
   }
 
   @VisibleForTesting
-  SubTaskSpec<PartialGenericSegmentMergeTask> newTaskSpec(PartialGenericSegmentMergeIOConfig ioConfig)
+  SubTaskSpec<PartialGenericSegmentMergeTask> newTaskSpec(PartialSegmentMergeIOConfig ioConfig)
   {
-    final PartialGenericSegmentMergeIngestionSpec ingestionSpec = new PartialGenericSegmentMergeIngestionSpec(
+    final PartialSegmentMergeIngestionSpec ingestionSpec = new PartialSegmentMergeIngestionSpec(
         dataSchema,
         ioConfig,
         getTuningConfig()
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialGenericSegmentMergeTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialGenericSegmentMergeTask.java
index 5dafc6e..72f9020 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialGenericSegmentMergeTask.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialGenericSegmentMergeTask.java
@@ -38,11 +38,11 @@ import java.util.Map;
 /**
  * {@link ParallelIndexTaskRunner} for the phase to merge generic partitioned segments in multi-phase parallel indexing.
  */
-public class PartialGenericSegmentMergeTask extends PartialSegmentMergeTask<BuildingShardSpec, GenericPartitionLocation>
+public class PartialGenericSegmentMergeTask extends PartialSegmentMergeTask<BuildingShardSpec>
 {
   public static final String TYPE = "partial_index_generic_merge";
 
-  private final PartialGenericSegmentMergeIngestionSpec ingestionSchema;
+  private final PartialSegmentMergeIngestionSpec ingestionSchema;
   private final Table<Interval, Integer, BuildingShardSpec<?>> intervalAndIntegerToShardSpec;
 
   @JsonCreator
@@ -55,7 +55,7 @@ public class PartialGenericSegmentMergeTask extends PartialSegmentMergeTask<Buil
       // subtaskSpecId can be null only for old task versions.
       @JsonProperty("subtaskSpecId") @Nullable final String subtaskSpecId,
       @JsonProperty("numAttempts") final int numAttempts, // zero-based counting
-      @JsonProperty("spec") final PartialGenericSegmentMergeIngestionSpec ingestionSchema,
+      @JsonProperty("spec") final PartialSegmentMergeIngestionSpec ingestionSchema,
       @JsonProperty("context") final Map<String, Object> context
   )
   {
@@ -79,7 +79,7 @@ public class PartialGenericSegmentMergeTask extends PartialSegmentMergeTask<Buil
   }
 
   private static Table<Interval, Integer, BuildingShardSpec<?>> createIntervalAndIntegerToShardSpec(
-      List<GenericPartitionLocation> partitionLocations
+      List<PartitionLocation> partitionLocations
   )
   {
     final Table<Interval, Integer, BuildingShardSpec<?>> intervalAndIntegerToShardSpec = HashBasedTable.create();
@@ -107,7 +107,7 @@ public class PartialGenericSegmentMergeTask extends PartialSegmentMergeTask<Buil
   }
 
   @JsonProperty("spec")
-  private PartialGenericSegmentMergeIngestionSpec getIngestionSchema()
+  private PartialSegmentMergeIngestionSpec getIngestionSchema()
   {
     return ingestionSchema;
   }
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialHashSegmentGenerateParallelIndexTaskRunner.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialHashSegmentGenerateParallelIndexTaskRunner.java
index 2f4cea9..3bb9a15 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialHashSegmentGenerateParallelIndexTaskRunner.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialHashSegmentGenerateParallelIndexTaskRunner.java
@@ -30,7 +30,7 @@ import java.util.Map;
  * {@link ParallelIndexTaskRunner} for the phase to create hash partitioned segments in multi-phase parallel indexing.
  */
 class PartialHashSegmentGenerateParallelIndexTaskRunner
-    extends InputSourceSplitParallelIndexTaskRunner<PartialHashSegmentGenerateTask, GeneratedPartitionsReport<GenericPartitionStat>>
+    extends InputSourceSplitParallelIndexTaskRunner<PartialHashSegmentGenerateTask, GeneratedPartitionsReport>
 {
   private static final String PHASE_NAME = "partial segment generation";
 
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialHashSegmentGenerateTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialHashSegmentGenerateTask.java
index 9770653..ea320a4 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialHashSegmentGenerateTask.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialHashSegmentGenerateTask.java
@@ -33,7 +33,6 @@ import org.apache.druid.indexing.common.task.batch.partition.HashPartitionAnalys
 import org.apache.druid.indexing.worker.shuffle.ShuffleDataSegmentPusher;
 import org.apache.druid.segment.indexing.granularity.GranularitySpec;
 import org.apache.druid.timeline.DataSegment;
-import org.apache.druid.timeline.partition.BucketNumberedShardSpec;
 import org.apache.druid.timeline.partition.PartialShardSpec;
 import org.joda.time.Interval;
 
@@ -164,25 +163,12 @@ public class PartialHashSegmentGenerateTask extends PartialSegmentGenerateTask<G
   @Override
   GeneratedPartitionsMetadataReport createGeneratedPartitionsReport(TaskToolbox toolbox, List<DataSegment> segments)
   {
-    List<GenericPartitionStat> partitionStats = segments.stream()
-                                                        .map(segment -> createPartitionStat(toolbox, segment))
+    List<PartitionStat> partitionStats = segments.stream()
+                                                        .map(segment -> toolbox.getIntermediaryDataManager().generatePartitionStat(toolbox, segment))
                                                         .collect(Collectors.toList());
     return new GeneratedPartitionsMetadataReport(getId(), partitionStats);
   }
 
-  private GenericPartitionStat createPartitionStat(TaskToolbox toolbox, DataSegment segment)
-  {
-    return new GenericPartitionStat(
-        toolbox.getTaskExecutorNode().getHost(),
-        toolbox.getTaskExecutorNode().getPortToUse(),
-        toolbox.getTaskExecutorNode().isEnableTlsPort(),
-        segment.getInterval(),
-        (BucketNumberedShardSpec) segment.getShardSpec(),
-        null, // numRows is not supported yet
-        null  // sizeBytes is not supported yet
-    );
-  }
-
   /**
    * Creates shard specs based on the given configurations. The return value is a map between intervals created
    * based on the segment granularity and the shard specs to be created.
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialRangeSegmentGenerateParallelIndexTaskRunner.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialRangeSegmentGenerateParallelIndexTaskRunner.java
index cec1f7a..d37b432 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialRangeSegmentGenerateParallelIndexTaskRunner.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialRangeSegmentGenerateParallelIndexTaskRunner.java
@@ -30,7 +30,7 @@ import java.util.Map;
  * {@link ParallelIndexTaskRunner} for the phase to create range partitioned segments in multi-phase parallel indexing.
  */
 class PartialRangeSegmentGenerateParallelIndexTaskRunner
-    extends InputSourceSplitParallelIndexTaskRunner<PartialRangeSegmentGenerateTask, GeneratedPartitionsReport<GenericPartitionStat>>
+    extends InputSourceSplitParallelIndexTaskRunner<PartialRangeSegmentGenerateTask, GeneratedPartitionsReport>
 {
   private static final String PHASE_NAME = "partial segment generation";
 
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialRangeSegmentGenerateTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialRangeSegmentGenerateTask.java
index a67a3df..61287e0 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialRangeSegmentGenerateTask.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialRangeSegmentGenerateTask.java
@@ -34,7 +34,6 @@ import org.apache.druid.indexing.common.task.batch.parallel.iterator.RangePartit
 import org.apache.druid.indexing.common.task.batch.partition.RangePartitionAnalysis;
 import org.apache.druid.indexing.worker.shuffle.ShuffleDataSegmentPusher;
 import org.apache.druid.timeline.DataSegment;
-import org.apache.druid.timeline.partition.BucketNumberedShardSpec;
 import org.apache.druid.timeline.partition.PartitionBoundaries;
 import org.joda.time.Interval;
 
@@ -176,22 +175,9 @@ public class PartialRangeSegmentGenerateTask extends PartialSegmentGenerateTask<
   @Override
   GeneratedPartitionsMetadataReport createGeneratedPartitionsReport(TaskToolbox toolbox, List<DataSegment> segments)
   {
-    List<GenericPartitionStat> partitionStats = segments.stream()
-                                                        .map(segment -> createPartitionStat(toolbox, segment))
+    List<PartitionStat> partitionStats = segments.stream()
+                                                        .map(segment -> toolbox.getIntermediaryDataManager().generatePartitionStat(toolbox, segment))
                                                         .collect(Collectors.toList());
     return new GeneratedPartitionsMetadataReport(getId(), partitionStats);
   }
-
-  private GenericPartitionStat createPartitionStat(TaskToolbox toolbox, DataSegment segment)
-  {
-    return new GenericPartitionStat(
-        toolbox.getTaskExecutorNode().getHost(),
-        toolbox.getTaskExecutorNode().getPortToUse(),
-        toolbox.getTaskExecutorNode().isEnableTlsPort(),
-        segment.getInterval(),
-        (BucketNumberedShardSpec) segment.getShardSpec(),
-        null, // numRows is not supported yet
-        null  // sizeBytes is not supported yet
-    );
-  }
 }
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialSegmentMergeIOConfig.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialSegmentMergeIOConfig.java
index 50e3a09..90990a5 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialSegmentMergeIOConfig.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialSegmentMergeIOConfig.java
@@ -19,17 +19,21 @@
 
 package org.apache.druid.indexing.common.task.batch.parallel;
 
+import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
 import com.google.common.base.Preconditions;
 import org.apache.druid.segment.indexing.IOConfig;
 
 import java.util.List;
 
-abstract class PartialSegmentMergeIOConfig<T extends PartitionLocation> implements IOConfig
+@JsonTypeName(PartialGenericSegmentMergeTask.TYPE)
+public class PartialSegmentMergeIOConfig implements IOConfig
 {
-  private final List<T> partitionLocations;
+  private final List<PartitionLocation> partitionLocations;
 
-  PartialSegmentMergeIOConfig(List<T> partitionLocations)
+  @JsonCreator
+  PartialSegmentMergeIOConfig(@JsonProperty("partitionLocations") List<PartitionLocation> partitionLocations)
   {
     Preconditions.checkState(
         partitionLocations != null && !partitionLocations.isEmpty(),
@@ -39,7 +43,7 @@ abstract class PartialSegmentMergeIOConfig<T extends PartitionLocation> implemen
   }
 
   @JsonProperty
-  public List<T> getPartitionLocations()
+  public List<PartitionLocation> getPartitionLocations()
   {
     return partitionLocations;
   }
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialSegmentMergeIngestionSpec.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialSegmentMergeIngestionSpec.java
index b0ea81d..2683844 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialSegmentMergeIngestionSpec.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialSegmentMergeIngestionSpec.java
@@ -19,16 +19,16 @@
 
 package org.apache.druid.indexing.common.task.batch.parallel;
 
+import com.fasterxml.jackson.annotation.JsonProperty;
 import org.apache.druid.segment.indexing.DataSchema;
 import org.apache.druid.segment.indexing.IngestionSpec;
 
-abstract class PartialSegmentMergeIngestionSpec<T extends PartialSegmentMergeIOConfig>
-    extends IngestionSpec<T, ParallelIndexTuningConfig>
+public class PartialSegmentMergeIngestionSpec extends IngestionSpec<PartialSegmentMergeIOConfig, ParallelIndexTuningConfig>
 {
   PartialSegmentMergeIngestionSpec(
-      DataSchema dataSchema,
-      T ioConfig,
-      ParallelIndexTuningConfig tuningConfig
+      @JsonProperty("dataSchema") DataSchema dataSchema,
+      @JsonProperty("ioConfig") PartialSegmentMergeIOConfig ioConfig,
+      @JsonProperty("tuningConfig") ParallelIndexTuningConfig tuningConfig
   )
   {
     super(dataSchema, ioConfig, tuningConfig);
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialSegmentMergeTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialSegmentMergeTask.java
index 81cc66e..38a7261 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialSegmentMergeTask.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialSegmentMergeTask.java
@@ -49,7 +49,6 @@ import org.apache.druid.segment.indexing.DataSchema;
 import org.apache.druid.segment.loading.DataSegmentPusher;
 import org.apache.druid.timeline.DataSegment;
 import org.apache.druid.timeline.partition.ShardSpec;
-import org.apache.druid.utils.CompressionUtils;
 import org.joda.time.Interval;
 
 import javax.annotation.Nullable;
@@ -70,12 +69,12 @@ import java.util.stream.Collectors;
 /**
  * Base class for creating task that merges partial segments created by {@link PartialSegmentGenerateTask}.
  */
-abstract class PartialSegmentMergeTask<S extends ShardSpec, P extends PartitionLocation> extends PerfectRollupWorkerTask
+abstract class PartialSegmentMergeTask<S extends ShardSpec> extends PerfectRollupWorkerTask
 {
   private static final Logger LOG = new Logger(PartialSegmentMergeTask.class);
 
 
-  private final PartialSegmentMergeIOConfig<P> ioConfig;
+  private final PartialSegmentMergeIOConfig ioConfig;
   private final int numAttempts;
   private final String supervisorTaskId;
   private final String subtaskSpecId;
@@ -88,7 +87,7 @@ abstract class PartialSegmentMergeTask<S extends ShardSpec, P extends PartitionL
       final String supervisorTaskId,
       @Nullable String subtaskSpecId,
       DataSchema dataSchema,
-      PartialSegmentMergeIOConfig<P> ioConfig,
+      PartialSegmentMergeIOConfig ioConfig,
       ParallelIndexTuningConfig tuningConfig,
       final int numAttempts, // zero-based counting
       final Map<String, Object> context
@@ -142,8 +141,8 @@ abstract class PartialSegmentMergeTask<S extends ShardSpec, P extends PartitionL
   public TaskStatus runTask(TaskToolbox toolbox) throws Exception
   {
     // Group partitionLocations by interval and partitionId
-    final Map<Interval, Int2ObjectMap<List<P>>> intervalToBuckets = new HashMap<>();
-    for (P location : ioConfig.getPartitionLocations()) {
+    final Map<Interval, Int2ObjectMap<List<PartitionLocation>>> intervalToBuckets = new HashMap<>();
+    for (PartitionLocation location : ioConfig.getPartitionLocations()) {
       intervalToBuckets.computeIfAbsent(location.getInterval(), k -> new Int2ObjectOpenHashMap<>())
                          .computeIfAbsent(location.getBucketId(), k -> new ArrayList<>())
                          .add(location);
@@ -205,7 +204,7 @@ abstract class PartialSegmentMergeTask<S extends ShardSpec, P extends PartitionL
 
   private Map<Interval, Int2ObjectMap<List<File>>> fetchSegmentFiles(
       TaskToolbox toolbox,
-      Map<Interval, Int2ObjectMap<List<P>>> intervalToBuckets
+      Map<Interval, Int2ObjectMap<List<PartitionLocation>>> intervalToBuckets
   ) throws IOException
   {
     final File tempDir = toolbox.getIndexingTmpDir();
@@ -214,9 +213,9 @@ abstract class PartialSegmentMergeTask<S extends ShardSpec, P extends PartitionL
 
     final Map<Interval, Int2ObjectMap<List<File>>> intervalToUnzippedFiles = new HashMap<>();
     // Fetch partition files
-    for (Entry<Interval, Int2ObjectMap<List<P>>> entryPerInterval : intervalToBuckets.entrySet()) {
+    for (Entry<Interval, Int2ObjectMap<List<PartitionLocation>>> entryPerInterval : intervalToBuckets.entrySet()) {
       final Interval interval = entryPerInterval.getKey();
-      for (Int2ObjectMap.Entry<List<P>> entryPerBucketId : entryPerInterval.getValue().int2ObjectEntrySet()) {
+      for (Int2ObjectMap.Entry<List<PartitionLocation>> entryPerBucketId : entryPerInterval.getValue().int2ObjectEntrySet()) {
         final int bucketId = entryPerBucketId.getIntKey();
         final File partitionDir = FileUtils.getFile(
             tempDir,
@@ -225,21 +224,11 @@ abstract class PartialSegmentMergeTask<S extends ShardSpec, P extends PartitionL
             Integer.toString(bucketId)
         );
         FileUtils.forceMkdir(partitionDir);
-        for (P location : entryPerBucketId.getValue()) {
-          final File zippedFile = toolbox.getShuffleClient().fetchSegmentFile(partitionDir, supervisorTaskId, location);
-          try {
-            final File unzippedDir = new File(partitionDir, StringUtils.format("unzipped_%s", location.getSubTaskId()));
-            FileUtils.forceMkdir(unzippedDir);
-            CompressionUtils.unzip(zippedFile, unzippedDir);
-            intervalToUnzippedFiles.computeIfAbsent(interval, k -> new Int2ObjectOpenHashMap<>())
-                                   .computeIfAbsent(bucketId, k -> new ArrayList<>())
-                                   .add(unzippedDir);
-          }
-          finally {
-            if (!zippedFile.delete()) {
-              LOG.warn("Failed to delete temp file[%s]", zippedFile);
-            }
-          }
+        for (PartitionLocation location : entryPerBucketId.getValue()) {
+          final File unzippedDir = toolbox.getShuffleClient().fetchSegmentFile(partitionDir, supervisorTaskId, location);
+          intervalToUnzippedFiles.computeIfAbsent(interval, k -> new Int2ObjectOpenHashMap<>())
+              .computeIfAbsent(bucketId, k -> new ArrayList<>())
+              .add(unzippedDir);
         }
       }
     }
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartitionLocation.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartitionLocation.java
index da382ce..fc62eda 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartitionLocation.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartitionLocation.java
@@ -19,133 +19,24 @@
 
 package org.apache.druid.indexing.common.task.batch.parallel;
 
-import com.fasterxml.jackson.annotation.JsonIgnore;
-import com.fasterxml.jackson.annotation.JsonProperty;
-import org.apache.druid.java.util.common.StringUtils;
+import com.fasterxml.jackson.annotation.JsonSubTypes;
+import com.fasterxml.jackson.annotation.JsonTypeInfo;
+import org.apache.druid.timeline.partition.BuildingShardSpec;
 import org.joda.time.Interval;
 
-import java.net.URI;
-import java.util.Objects;
-
 /**
- * This class represents the intermediary data server where the partition of {@link #interval} and
+ * This class represents the intermediary data server where the partition of {@link #getInterval()} and
  * {@link #getBucketId()} is stored.
  */
-abstract class PartitionLocation<T>
+@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type", defaultImpl = GenericPartitionLocation.class)
+@JsonSubTypes(value = {
+    @JsonSubTypes.Type(name = GenericPartitionStat.TYPE, value = GenericPartitionLocation.class),
+    @JsonSubTypes.Type(name = DeepStoragePartitionStat.TYPE, value = DeepStoragePartitionLocation.class)
+})
+public interface PartitionLocation
 {
-  private final String host;
-  private final int port;
-  private final boolean useHttps;
-  private final String subTaskId;
-  private final Interval interval;
-  private final T secondaryPartition;
-
-  PartitionLocation(
-      String host,
-      int port,
-      boolean useHttps,
-      String subTaskId,
-      Interval interval,
-      T secondaryPartition
-  )
-  {
-    this.host = host;
-    this.port = port;
-    this.useHttps = useHttps;
-    this.subTaskId = subTaskId;
-    this.interval = interval;
-    this.secondaryPartition = secondaryPartition;
-  }
-
-  @JsonProperty
-  public String getHost()
-  {
-    return host;
-  }
-
-  @JsonProperty
-  public int getPort()
-  {
-    return port;
-  }
-
-  @JsonProperty
-  public boolean isUseHttps()
-  {
-    return useHttps;
-  }
-
-  @JsonProperty
-  public String getSubTaskId()
-  {
-    return subTaskId;
-  }
-
-  @JsonProperty
-  public Interval getInterval()
-  {
-    return interval;
-  }
-
-  @JsonIgnore
-  public T getSecondaryPartition()
-  {
-    return secondaryPartition;
-  }
-
-  abstract int getBucketId();
-
-  final URI toIntermediaryDataServerURI(String supervisorTaskId)
-  {
-    return URI.create(
-        StringUtils.format(
-            "%s://%s:%d/druid/worker/v1/shuffle/task/%s/%s/partition?startTime=%s&endTime=%s&bucketId=%d",
-            useHttps ? "https" : "http",
-            host,
-            port,
-            StringUtils.urlEncode(supervisorTaskId),
-            StringUtils.urlEncode(subTaskId),
-            interval.getStart(),
-            interval.getEnd(),
-            getBucketId()
-        )
-    );
-  }
-
-  @Override
-  public boolean equals(Object o)
-  {
-    if (this == o) {
-      return true;
-    }
-    if (o == null || getClass() != o.getClass()) {
-      return false;
-    }
-    PartitionLocation<?> that = (PartitionLocation<?>) o;
-    return port == that.port &&
-           useHttps == that.useHttps &&
-           Objects.equals(host, that.host) &&
-           Objects.equals(subTaskId, that.subTaskId) &&
-           Objects.equals(interval, that.interval) &&
-           Objects.equals(secondaryPartition, that.secondaryPartition);
-  }
-
-  @Override
-  public int hashCode()
-  {
-    return Objects.hash(host, port, useHttps, subTaskId, interval, secondaryPartition);
-  }
-
-  @Override
-  public String toString()
-  {
-    return "PartitionLocation{" +
-           "host='" + host + '\'' +
-           ", port=" + port +
-           ", useHttps=" + useHttps +
-           ", subTaskId='" + subTaskId + '\'' +
-           ", interval=" + interval +
-           ", secondaryPartition=" + secondaryPartition +
-           '}';
-  }
+  int getBucketId();
+  Interval getInterval();
+  BuildingShardSpec getShardSpec();
+  String getSubTaskId();
 }
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartitionStat.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartitionStat.java
index c7f1a55..4ef5887 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartitionStat.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartitionStat.java
@@ -19,119 +19,41 @@
 
 package org.apache.druid.indexing.common.task.batch.parallel;
 
-import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonSubTypes;
+import com.fasterxml.jackson.annotation.JsonTypeInfo;
+import org.apache.druid.timeline.partition.BucketNumberedShardSpec;
+import org.apache.druid.timeline.partition.BuildingShardSpec;
 import org.joda.time.Interval;
 
-import javax.annotation.Nullable;
-import java.util.Objects;
-
 /**
  * Statistics about a partition created by {@link PartialSegmentGenerateTask}. Each partition is a
  * set of data of the same time chunk (primary partition key) and the same secondary partition key
- * ({@link T}). This class holds the statistics of a single partition created by a task.
+ * ({@link BucketNumberedShardSpec}). This class holds the statistics of a single partition created by a task.
  */
-abstract class PartitionStat<T>
+@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type", defaultImpl = GenericPartitionStat.class)
+@JsonSubTypes(value = {
+    @JsonSubTypes.Type(name = GenericPartitionStat.TYPE, value = GenericPartitionStat.class),
+    @JsonSubTypes.Type(name = DeepStoragePartitionStat.TYPE, value = DeepStoragePartitionStat.class)
+})
+public interface PartitionStat
 {
-  // Host and port of the task executor
-  private final String taskExecutorHost;
-  private final int taskExecutorPort;
-  private final boolean useHttps;
-
-  // Primary partition key
-  private final Interval interval;
-
-  // numRows and sizeBytes are always null currently and will be filled properly in the future.
-  @Nullable
-  private final Integer numRows;
-  @Nullable
-  private final Long sizeBytes;
-
-  PartitionStat(
-      String taskExecutorHost,
-      int taskExecutorPort,
-      boolean useHttps,
-      Interval interval,
-      @Nullable Integer numRows,
-      @Nullable Long sizeBytes
-  )
-  {
-    this.taskExecutorHost = taskExecutorHost;
-    this.taskExecutorPort = taskExecutorPort;
-    this.useHttps = useHttps;
-    this.interval = interval;
-    this.numRows = numRows == null ? 0 : numRows;
-    this.sizeBytes = sizeBytes == null ? 0 : sizeBytes;
-  }
-
-  @JsonProperty
-  public final String getTaskExecutorHost()
-  {
-    return taskExecutorHost;
-  }
-
-  @JsonProperty
-  public final int getTaskExecutorPort()
-  {
-    return taskExecutorPort;
-  }
-
-  @JsonProperty
-  public final boolean isUseHttps()
-  {
-    return useHttps;
-  }
-
-  @JsonProperty
-  public final Interval getInterval()
-  {
-    return interval;
-  }
-
-  @Nullable
-  @JsonProperty
-  public final Integer getNumRows()
-  {
-    return numRows;
-  }
-
-  @Nullable
-  @JsonProperty
-  public final Long getSizeBytes()
-  {
-    return sizeBytes;
-  }
-
   /**
    * @return Uniquely identifying index from 0..N-1 of the N partitions
    */
-  abstract int getBucketId();
+  int getBucketId();
 
   /**
    * @return Definition of secondary partition. For example, for range partitioning, this should include the start/end.
    */
-  abstract T getSecondaryPartition();
+  BucketNumberedShardSpec getSecondaryPartition();
 
-  @Override
-  public boolean equals(Object o)
-  {
-    if (this == o) {
-      return true;
-    }
-    if (o == null || getClass() != o.getClass()) {
-      return false;
-    }
-    PartitionStat that = (PartitionStat) o;
-    return taskExecutorPort == that.taskExecutorPort &&
-           useHttps == that.useHttps &&
-           Objects.equals(taskExecutorHost, that.taskExecutorHost) &&
-           Objects.equals(interval, that.interval) &&
-           Objects.equals(numRows, that.numRows) &&
-           Objects.equals(sizeBytes, that.sizeBytes);
-  }
+  /**
+   * @return interval for the partition
+   */
+  Interval getInterval();
 
-  @Override
-  public int hashCode()
-  {
-    return Objects.hash(taskExecutorHost, taskExecutorPort, useHttps, interval, numRows, sizeBytes);
-  }
+  /**
+   * Converts partition stat to PartitionLocation
+   * */
+  PartitionLocation toPartitionLocation(String subtaskId, BuildingShardSpec shardSpec);
 }
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ShuffleClient.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ShuffleClient.java
index 2b33f0f..2da097b 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ShuffleClient.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ShuffleClient.java
@@ -34,13 +34,14 @@ import java.io.IOException;
  * @see PartialSegmentMergeTask
  */
 @ExtensionPoint
-public interface ShuffleClient
+public interface ShuffleClient<P extends PartitionLocation>
 {
   /**
    * Fetch the segment file into the local storage for the given supervisorTaskId and the location.
    * If the segment file should be fetched from a remote site, the returned file will be created under the given
    * partitionDir. Otherwise, the returned file can be located in any path.
+   * @return dir containing the unzipped segment files
    */
-  <T, P extends PartitionLocation<T>> File fetchSegmentFile(File partitionDir, String supervisorTaskId, P location)
+  File fetchSegmentFile(File partitionDir, String supervisorTaskId, P location)
       throws IOException;
 }
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/worker/shuffle/DeepStorageIntermediaryDataManager.java b/indexing-service/src/main/java/org/apache/druid/indexing/worker/shuffle/DeepStorageIntermediaryDataManager.java
new file mode 100644
index 0000000..34199fc
--- /dev/null
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/worker/shuffle/DeepStorageIntermediaryDataManager.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.worker.shuffle;
+
+import com.google.common.io.ByteSource;
+import com.google.inject.Inject;
+import org.apache.druid.indexing.common.TaskToolbox;
+import org.apache.druid.indexing.common.task.batch.parallel.DeepStoragePartitionStat;
+import org.apache.druid.java.util.common.IAE;
+import org.apache.druid.segment.loading.DataSegmentPusher;
+import org.apache.druid.timeline.DataSegment;
+import org.apache.druid.timeline.partition.BucketNumberedShardSpec;
+import org.joda.time.Interval;
+
+import javax.annotation.Nullable;
+import java.io.File;
+import java.io.IOException;
+import java.util.Optional;
+
+public class DeepStorageIntermediaryDataManager implements IntermediaryDataManager
+{
+  public static final String SHUFFLE_DATA_DIR_PREFIX = "shuffle-data";
+  private final DataSegmentPusher dataSegmentPusher;
+
+  @Inject
+  public DeepStorageIntermediaryDataManager(DataSegmentPusher dataSegmentPusher)
+  {
+    this.dataSegmentPusher = dataSegmentPusher;
+  }
+
+  @Override
+  public void start()
+  {
+    // nothing
+  }
+
+  @Override
+  public void stop()
+  {
+    // nothing
+  }
+
+  @Override
+  public DataSegment addSegment(String supervisorTaskId, String subTaskId, DataSegment segment, File segmentDir)
+      throws IOException
+  {
+    if (!(segment.getShardSpec() instanceof BucketNumberedShardSpec)) {
+      throw new IAE(
+          "Invalid shardSpec type. Expected [%s] but got [%s]",
+          BucketNumberedShardSpec.class.getName(),
+          segment.getShardSpec().getClass().getName()
+      );
+    }
+    final BucketNumberedShardSpec<?> bucketNumberedShardSpec = (BucketNumberedShardSpec<?>) segment.getShardSpec();
+    final String partitionFilePath = getPartitionFilePath(
+        supervisorTaskId,
+        subTaskId,
+        segment.getInterval(),
+        bucketNumberedShardSpec.getBucketId() // we must use the bucket ID instead of partition ID
+    );
+    return dataSegmentPusher.pushToPath(segmentDir, segment, SHUFFLE_DATA_DIR_PREFIX + "/" + partitionFilePath);
+  }
+
+  @Override
+  public DeepStoragePartitionStat generatePartitionStat(TaskToolbox toolbox, DataSegment segment)
+  {
+    return new DeepStoragePartitionStat(
+        segment.getInterval(),
+        (BucketNumberedShardSpec) segment.getShardSpec(),
+        segment.getLoadSpec()
+    );
+  }
+
+  @Nullable
+  @Override
+  public Optional<ByteSource> findPartitionFile(String supervisorTaskId, String subTaskId, Interval interval, int bucketId)
+  {
+    throw new UnsupportedOperationException("Not supported, get partition file using segment loadspec");
+  }
+
+  @Override
+  public void deletePartitions(String supervisorTaskId)
+  {
+    throw new UnsupportedOperationException("Not supported");
+  }
+}
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/worker/shuffle/IntermediaryDataManager.java b/indexing-service/src/main/java/org/apache/druid/indexing/worker/shuffle/IntermediaryDataManager.java
index a903bfc..17f898a 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/worker/shuffle/IntermediaryDataManager.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/worker/shuffle/IntermediaryDataManager.java
@@ -21,12 +21,15 @@ package org.apache.druid.indexing.worker.shuffle;
 
 import com.google.common.io.ByteSource;
 import org.apache.druid.guice.annotations.ExtensionPoint;
+import org.apache.druid.indexing.common.TaskToolbox;
+import org.apache.druid.indexing.common.task.batch.parallel.PartitionStat;
 import org.apache.druid.indexing.common.task.batch.parallel.ShuffleClient;
 import org.apache.druid.timeline.DataSegment;
 import org.joda.time.Interval;
 
 import java.io.File;
 import java.io.IOException;
+import java.nio.file.Paths;
 import java.util.Optional;
 
 /**
@@ -43,6 +46,10 @@ import java.util.Optional;
 @ExtensionPoint
 public interface IntermediaryDataManager
 {
+  void start();
+
+  void stop();
+
   /**
    * Write a segment into one of configured locations
    *
@@ -51,9 +58,9 @@ public interface IntermediaryDataManager
    * @param segment - Segment to write
    * @param segmentDir - Directory of the segment to write
    *
-   * @return size of the writen segment
+   * @return the writen segment
    */
-  long addSegment(String supervisorTaskId, String subTaskId, DataSegment segment, File segmentDir) throws IOException;
+  DataSegment addSegment(String supervisorTaskId, String subTaskId, DataSegment segment, File segmentDir) throws IOException;
 
   /**
    * Find the partition file. Note that the returned ByteSource method size() should be fast.
@@ -74,4 +81,30 @@ public interface IntermediaryDataManager
    *
    */
   void deletePartitions(String supervisorTaskId) throws IOException;
+
+  PartitionStat generatePartitionStat(TaskToolbox toolbox, DataSegment segment);
+
+  default String getPartitionFilePath(
+      String supervisorTaskId,
+      String subTaskId,
+      Interval interval,
+      int bucketId
+  )
+  {
+    return Paths.get(getPartitionDirPath(supervisorTaskId, interval, bucketId), subTaskId).toString();
+  }
+
+  default String getPartitionDirPath(
+      String supervisorTaskId,
+      Interval interval,
+      int bucketId
+  )
+  {
+    return Paths.get(
+        supervisorTaskId,
+        interval.getStart().toString(),
+        interval.getEnd().toString(),
+        String.valueOf(bucketId)
+    ).toString();
+  }
 }
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/worker/shuffle/LocalIntermediaryDataManager.java b/indexing-service/src/main/java/org/apache/druid/indexing/worker/shuffle/LocalIntermediaryDataManager.java
index 7331cf6..feaa867 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/worker/shuffle/LocalIntermediaryDataManager.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/worker/shuffle/LocalIntermediaryDataManager.java
@@ -19,6 +19,7 @@
 
 package org.apache.druid.indexing.worker.shuffle;
 
+import com.google.common.base.Throwables;
 import com.google.common.collect.Iterators;
 import com.google.common.io.ByteSource;
 import com.google.common.io.Files;
@@ -29,7 +30,9 @@ import org.apache.druid.client.indexing.IndexingServiceClient;
 import org.apache.druid.client.indexing.TaskStatus;
 import org.apache.druid.common.utils.IdUtils;
 import org.apache.druid.guice.ManageLifecycle;
+import org.apache.druid.indexing.common.TaskToolbox;
 import org.apache.druid.indexing.common.config.TaskConfig;
+import org.apache.druid.indexing.common.task.batch.parallel.GenericPartitionStat;
 import org.apache.druid.indexing.worker.config.WorkerConfig;
 import org.apache.druid.java.util.common.DateTimes;
 import org.apache.druid.java.util.common.IAE;
@@ -40,6 +43,7 @@ import org.apache.druid.java.util.common.io.Closer;
 import org.apache.druid.java.util.common.lifecycle.LifecycleStart;
 import org.apache.druid.java.util.common.lifecycle.LifecycleStop;
 import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.segment.SegmentUtils;
 import org.apache.druid.segment.loading.StorageLocation;
 import org.apache.druid.timeline.DataSegment;
 import org.apache.druid.timeline.partition.BucketNumberedShardSpec;
@@ -52,7 +56,6 @@ import org.joda.time.Period;
 import java.io.File;
 import java.io.IOException;
 import java.nio.file.Path;
-import java.nio.file.Paths;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
@@ -124,6 +127,7 @@ public class LocalIntermediaryDataManager implements IntermediaryDataManager
     this.indexingServiceClient = indexingServiceClient;
   }
 
+  @Override
   @LifecycleStart
   public void start()
   {
@@ -162,12 +166,18 @@ public class LocalIntermediaryDataManager implements IntermediaryDataManager
     );
   }
 
+  @Override
   @LifecycleStop
-  public void stop() throws InterruptedException
+  public void stop()
   {
     if (supervisorTaskChecker != null) {
       supervisorTaskChecker.shutdownNow();
-      supervisorTaskChecker.awaitTermination(10, TimeUnit.SECONDS);
+      try {
+        supervisorTaskChecker.awaitTermination(10, TimeUnit.SECONDS);
+      }
+      catch (InterruptedException e) {
+        Throwables.propagate(e);
+      }
     }
     supervisorTaskCheckTimes.clear();
   }
@@ -268,7 +278,7 @@ public class LocalIntermediaryDataManager implements IntermediaryDataManager
    * supervisorTaskId.
    */
   @Override
-  public long addSegment(String supervisorTaskId, String subTaskId, DataSegment segment, File segmentDir)
+  public DataSegment addSegment(String supervisorTaskId, String subTaskId, DataSegment segment, File segmentDir)
       throws IOException
   {
     // Get or create the location iterator for supervisorTask.
@@ -341,7 +351,7 @@ public class LocalIntermediaryDataManager implements IntermediaryDataManager
                 subTaskId,
                 destFile
             );
-            return unzippedSizeBytes;
+            return segment.withSize(unzippedSizeBytes).withBinaryVersion(SegmentUtils.getVersionFromDir(segmentDir));
           }
           catch (Exception e) {
             location.release(partitionFilePath, tempZippedFile.length());
@@ -364,7 +374,7 @@ public class LocalIntermediaryDataManager implements IntermediaryDataManager
   {
     IdUtils.validateId("supervisorTaskId", supervisorTaskId);
     for (StorageLocation location : shuffleDataLocations) {
-      final File partitionDir = new File(location.getPath(), getPartitionDir(supervisorTaskId, interval, bucketId));
+      final File partitionDir = new File(location.getPath(), getPartitionDirPath(supervisorTaskId, interval, bucketId));
       if (partitionDir.exists()) {
         supervisorTaskCheckTimes.put(supervisorTaskId, getExpiryTimeFromNow());
         final File[] segmentFiles = partitionDir.listFiles();
@@ -384,6 +394,20 @@ public class LocalIntermediaryDataManager implements IntermediaryDataManager
     return Optional.empty();
   }
 
+  @Override
+  public GenericPartitionStat generatePartitionStat(TaskToolbox toolbox, DataSegment segment)
+  {
+    return new GenericPartitionStat(
+        toolbox.getTaskExecutorNode().getHost(),
+        toolbox.getTaskExecutorNode().getPortToUse(),
+        toolbox.getTaskExecutorNode().isEnableTlsPort(),
+        segment.getInterval(),
+        (BucketNumberedShardSpec) segment.getShardSpec(),
+        null, // numRows is not supported yet
+        null  // sizeBytes is not supported yet
+    );
+  }
+
   private DateTime getExpiryTimeFromNow()
   {
     return DateTimes.nowUtc().plus(intermediaryPartitionTimeout);
@@ -405,28 +429,4 @@ public class LocalIntermediaryDataManager implements IntermediaryDataManager
     }
     supervisorTaskCheckTimes.remove(supervisorTaskId);
   }
-
-  private static String getPartitionFilePath(
-      String supervisorTaskId,
-      String subTaskId,
-      Interval interval,
-      int bucketId
-  )
-  {
-    return Paths.get(getPartitionDir(supervisorTaskId, interval, bucketId), subTaskId).toString();
-  }
-
-  private static String getPartitionDir(
-      String supervisorTaskId,
-      Interval interval,
-      int bucketId
-  )
-  {
-    return Paths.get(
-        supervisorTaskId,
-        interval.getStart().toString(),
-        interval.getEnd().toString(),
-        String.valueOf(bucketId)
-    ).toString();
-  }
 }
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/worker/shuffle/ShuffleDataSegmentPusher.java b/indexing-service/src/main/java/org/apache/druid/indexing/worker/shuffle/ShuffleDataSegmentPusher.java
index 6bc83ba..e1726d1 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/worker/shuffle/ShuffleDataSegmentPusher.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/worker/shuffle/ShuffleDataSegmentPusher.java
@@ -19,7 +19,6 @@
 
 package org.apache.druid.indexing.worker.shuffle;
 
-import org.apache.druid.segment.SegmentUtils;
 import org.apache.druid.segment.loading.DataSegmentPusher;
 import org.apache.druid.timeline.DataSegment;
 
@@ -64,9 +63,7 @@ public class ShuffleDataSegmentPusher implements DataSegmentPusher
   @Override
   public DataSegment push(File file, DataSegment segment, boolean useUniquePath) throws IOException
   {
-    final long unzippedSize = intermediaryDataManager.addSegment(supervisorTaskId, subTaskId, segment, file);
-    return segment.withSize(unzippedSize)
-                  .withBinaryVersion(SegmentUtils.getVersionFromDir(file));
+    return intermediaryDataManager.addSegment(supervisorTaskId, subTaskId, segment, file);
   }
 
   @Override
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractParallelIndexSupervisorTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractParallelIndexSupervisorTaskTest.java
index b2bd19a..82bc1f3 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractParallelIndexSupervisorTaskTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractParallelIndexSupervisorTaskTest.java
@@ -95,6 +95,7 @@ import org.apache.druid.server.security.AuthTestUtils;
 import org.apache.druid.server.security.AuthorizerMapper;
 import org.apache.druid.timeline.DataSegment;
 import org.apache.druid.timeline.SegmentId;
+import org.apache.druid.utils.CompressionUtils;
 import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
 import org.joda.time.DateTime;
 import org.joda.time.Duration;
@@ -702,7 +703,7 @@ public class AbstractParallelIndexSupervisorTaskTest extends IngestionTestBase
     }
   }
 
-  static class LocalShuffleClient implements ShuffleClient
+  static class LocalShuffleClient implements ShuffleClient<GenericPartitionLocation>
   {
     private final IntermediaryDataManager intermediaryDataManager;
 
@@ -712,10 +713,10 @@ public class AbstractParallelIndexSupervisorTaskTest extends IngestionTestBase
     }
 
     @Override
-    public <T, P extends PartitionLocation<T>> File fetchSegmentFile(
+    public File fetchSegmentFile(
         File partitionDir,
         String supervisorTaskId,
-        P location
+        GenericPartitionLocation location
     ) throws IOException
     {
       final java.util.Optional<ByteSource> zippedFile = intermediaryDataManager.findPartitionFile(
@@ -732,7 +733,17 @@ public class AbstractParallelIndexSupervisorTaskTest extends IngestionTestBase
           fetchedFile,
           out -> zippedFile.get().copyTo(out)
       );
-      return fetchedFile;
+      final File unzippedDir = new File(partitionDir, StringUtils.format("unzipped_%s", location.getSubTaskId()));
+      try {
+        org.apache.commons.io.FileUtils.forceMkdir(unzippedDir);
+        CompressionUtils.unzip(fetchedFile, unzippedDir);
+      }
+      finally {
+        if (!fetchedFile.delete()) {
+          LOG.warn("Failed to delete temp file[%s]", zippedFile);
+        }
+      }
+      return unzippedDir;
     }
   }
 
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/PartialGenericSegmentMergeIOConfigTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/DeepStoragePartitionLocationTest.java
similarity index 58%
rename from indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/PartialGenericSegmentMergeIOConfigTest.java
rename to indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/DeepStoragePartitionLocationTest.java
index c96adb8..de51d7d 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/PartialGenericSegmentMergeIOConfigTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/DeepStoragePartitionLocationTest.java
@@ -20,30 +20,28 @@
 package org.apache.druid.indexing.common.task.batch.parallel;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.ImmutableMap;
+import nl.jqno.equalsverifier.EqualsVerifier;
 import org.apache.druid.segment.TestHelper;
+import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 
-import java.util.Collections;
-
-public class PartialGenericSegmentMergeIOConfigTest
+public class DeepStoragePartitionLocationTest
 {
   private static final ObjectMapper OBJECT_MAPPER = ParallelIndexTestingFactory.createObjectMapper();
-  private static final GenericPartitionLocation GENERIC_PARTITION_LOCATION = new GenericPartitionLocation(
-      ParallelIndexTestingFactory.HOST,
-      ParallelIndexTestingFactory.PORT,
-      ParallelIndexTestingFactory.USE_HTTPS,
-      ParallelIndexTestingFactory.SUBTASK_ID,
-      ParallelIndexTestingFactory.INTERVAL,
-      ParallelIndexTestingFactory.HASH_BASED_NUMBERED_SHARD_SPEC
-  );
 
-  private PartialGenericSegmentMergeIOConfig target;
+  private DeepStoragePartitionLocation target;
 
   @Before
   public void setup()
   {
-    target = new PartialGenericSegmentMergeIOConfig(Collections.singletonList(GENERIC_PARTITION_LOCATION));
+    target = new DeepStoragePartitionLocation(
+        ParallelIndexTestingFactory.SUBTASK_ID,
+        ParallelIndexTestingFactory.INTERVAL,
+        ParallelIndexTestingFactory.HASH_BASED_NUMBERED_SHARD_SPEC,
+        ImmutableMap.of("path", "/test/path")
+    );
   }
 
   @Test
@@ -51,4 +49,19 @@ public class PartialGenericSegmentMergeIOConfigTest
   {
     TestHelper.testSerializesDeserializes(OBJECT_MAPPER, target);
   }
+
+  @Test
+  public void hasPartitionIdThatMatchesShardSpec()
+  {
+    Assert.assertEquals(ParallelIndexTestingFactory.PARTITION_ID, target.getBucketId());
+  }
+
+  @Test
+  public void testEqualsAndHashCode()
+  {
+    EqualsVerifier.forClass(DeepStoragePartitionLocation.class)
+                  .withNonnullFields("subTaskId", "interval", "shardSpec", "loadSpec")
+                  .usingGetClass()
+                  .verify();
+  }
 }
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/GenericPartitionStatTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/DeepStoragePartitionStatTest.java
similarity index 79%
copy from indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/GenericPartitionStatTest.java
copy to indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/DeepStoragePartitionStatTest.java
index 276b9ad..0b9bdb3 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/GenericPartitionStatTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/DeepStoragePartitionStatTest.java
@@ -20,6 +20,8 @@
 package org.apache.druid.indexing.common.task.batch.parallel;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.ImmutableMap;
+import nl.jqno.equalsverifier.EqualsVerifier;
 import org.apache.druid.segment.TestHelper;
 import org.apache.druid.timeline.partition.HashBucketShardSpec;
 import org.apache.druid.timeline.partition.HashPartitionFunction;
@@ -29,19 +31,16 @@ import org.junit.Test;
 
 import java.util.Collections;
 
-public class GenericPartitionStatTest
+public class DeepStoragePartitionStatTest
 {
   private static final ObjectMapper OBJECT_MAPPER = ParallelIndexTestingFactory.createObjectMapper();
 
-  private GenericPartitionStat target;
+  private DeepStoragePartitionStat target;
 
   @Before
   public void setup()
   {
-    target = new GenericPartitionStat(
-        ParallelIndexTestingFactory.TASK_EXECUTOR_HOST,
-        ParallelIndexTestingFactory.TASK_EXECUTOR_PORT,
-        ParallelIndexTestingFactory.USE_HTTPS,
+    target = new DeepStoragePartitionStat(
         ParallelIndexTestingFactory.INTERVAL,
         new HashBucketShardSpec(
             ParallelIndexTestingFactory.PARTITION_ID,
@@ -50,8 +49,7 @@ public class GenericPartitionStatTest
             HashPartitionFunction.MURMUR3_32_ABS,
             new ObjectMapper()
         ),
-        ParallelIndexTestingFactory.NUM_ROWS,
-        ParallelIndexTestingFactory.SIZE_BYTES
+        ImmutableMap.of("path", "/dummy/index.zip")
     );
   }
 
@@ -66,4 +64,13 @@ public class GenericPartitionStatTest
   {
     Assert.assertEquals(target.getSecondaryPartition().getBucketId(), target.getBucketId());
   }
+
+  @Test
+  public void testEqualsAndHashCode()
+  {
+    EqualsVerifier.forClass(DeepStoragePartitionStat.class)
+                  .withNonnullFields("interval", "shardSpec", "loadSpec")
+                  .usingGetClass()
+                  .verify();
+  }
 }
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/DeepStorageShuffleClientTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/DeepStorageShuffleClientTest.java
new file mode 100644
index 0000000..4438677
--- /dev/null
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/DeepStorageShuffleClientTest.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.common.task.batch.parallel;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.introspect.AnnotationIntrospectorPair;
+import com.fasterxml.jackson.databind.module.SimpleModule;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.inject.Injector;
+import org.apache.druid.guice.GuiceAnnotationIntrospector;
+import org.apache.druid.guice.GuiceInjectableValues;
+import org.apache.druid.guice.GuiceInjectors;
+import org.apache.druid.jackson.DefaultObjectMapper;
+import org.apache.druid.java.util.common.Intervals;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.segment.loading.LocalDataSegmentPuller;
+import org.apache.druid.segment.loading.LocalLoadSpec;
+import org.apache.druid.utils.CompressionUtils;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.Writer;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+
+public class DeepStorageShuffleClientTest
+{
+  private DeepStorageShuffleClient deepStorageShuffleClient;
+  private ObjectMapper mapper;
+  private File segmentFile;
+  private String segmentFileName;
+
+  @Rule
+  public TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+
+  @Before
+  public void setUp() throws Exception
+  {
+    final Injector injector = GuiceInjectors.makeStartupInjectorWithModules(
+        ImmutableList.of(
+            binder -> binder.bind(LocalDataSegmentPuller.class)
+        )
+    );
+    mapper = new DefaultObjectMapper();
+    mapper.registerModule(new SimpleModule("loadSpecTest").registerSubtypes(LocalLoadSpec.class));
+    mapper.setInjectableValues(new GuiceInjectableValues(injector));
+
+    final GuiceAnnotationIntrospector guiceIntrospector = new GuiceAnnotationIntrospector();
+    mapper.setAnnotationIntrospectors(
+        new AnnotationIntrospectorPair(guiceIntrospector, mapper.getSerializationConfig().getAnnotationIntrospector()),
+        new AnnotationIntrospectorPair(guiceIntrospector, mapper.getDeserializationConfig().getAnnotationIntrospector())
+    );
+    deepStorageShuffleClient = new DeepStorageShuffleClient(mapper);
+
+    File temp = temporaryFolder.newFile();
+    segmentFileName = temp.getName();
+    try (Writer writer = Files.newBufferedWriter(temp.toPath(), StandardCharsets.UTF_8)) {
+      for (int j = 0; j < 10; j++) {
+        writer.write(StringUtils.format("let's write some data.\n"));
+      }
+    }
+    segmentFile = new File(temp.getAbsolutePath() + ".zip");
+    CompressionUtils.zip(segmentFile.getParentFile(), segmentFile);
+  }
+
+  @Test
+  public void fetchSegmentFile() throws IOException
+  {
+    File partitionDir = temporaryFolder.newFolder();
+    String subTaskId = "subTask";
+    File unzippedDir = deepStorageShuffleClient.fetchSegmentFile(
+        partitionDir,
+        "testSupervisor",
+        new DeepStoragePartitionLocation(
+            subTaskId,
+            Intervals.of("2000/2099"),
+            null,
+            ImmutableMap.of("type", "local", "path", segmentFile.getAbsolutePath())
+        )
+    );
+    Assert.assertEquals(
+        StringUtils.format("%s/unzipped_%s", partitionDir.getAbsolutePath(), subTaskId),
+        unzippedDir.getAbsolutePath()
+    );
+    File fetchedSegmentFile = unzippedDir.listFiles((dir, name) -> name.endsWith(".tmp"))[0];
+    Assert.assertEquals(segmentFileName, fetchedSegmentFile.getName());
+    Assert.assertTrue(fetchedSegmentFile.length() > 0);
+  }
+}
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/GenericPartitionLocationTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/GenericPartitionLocationTest.java
index 4e46e38..20643a2 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/GenericPartitionLocationTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/GenericPartitionLocationTest.java
@@ -20,6 +20,7 @@
 package org.apache.druid.indexing.common.task.batch.parallel;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
+import nl.jqno.equalsverifier.EqualsVerifier;
 import org.apache.druid.segment.TestHelper;
 import org.junit.Assert;
 import org.junit.Before;
@@ -55,4 +56,13 @@ public class GenericPartitionLocationTest
   {
     Assert.assertEquals(ParallelIndexTestingFactory.PARTITION_ID, target.getBucketId());
   }
+
+  @Test
+  public void testEqualsAndHashCode()
+  {
+    EqualsVerifier.forClass(GenericPartitionLocation.class)
+                  .withNonnullFields("host", "port", "useHttps", "subTaskId", "interval", "shardSpec")
+                  .usingGetClass()
+                  .verify();
+  }
 }
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/GenericPartitionStatTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/GenericPartitionStatTest.java
index 276b9ad..a93adc1 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/GenericPartitionStatTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/GenericPartitionStatTest.java
@@ -20,6 +20,7 @@
 package org.apache.druid.indexing.common.task.batch.parallel;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
+import nl.jqno.equalsverifier.EqualsVerifier;
 import org.apache.druid.segment.TestHelper;
 import org.apache.druid.timeline.partition.HashBucketShardSpec;
 import org.apache.druid.timeline.partition.HashPartitionFunction;
@@ -66,4 +67,13 @@ public class GenericPartitionStatTest
   {
     Assert.assertEquals(target.getSecondaryPartition().getBucketId(), target.getBucketId());
   }
+
+  @Test
+  public void testEqualsAndHashCode()
+  {
+    EqualsVerifier.forClass(GenericPartitionStat.class)
+                  .withNonnullFields("taskExecutorHost", "taskExecutorPort", "useHttps", "interval", "shardSpec")
+                  .usingGetClass()
+                  .verify();
+  }
 }
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/HttpShuffleClientTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/HttpShuffleClientTest.java
index 3ddb63b..34e2b33 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/HttpShuffleClientTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/HttpShuffleClientTest.java
@@ -24,6 +24,7 @@ import org.apache.druid.java.util.common.Intervals;
 import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.java.util.common.concurrent.Execs;
 import org.apache.druid.java.util.http.client.HttpClient;
+import org.apache.druid.utils.CompressionUtils;
 import org.easymock.EasyMock;
 import org.joda.time.Interval;
 import org.junit.Assert;
@@ -66,12 +67,14 @@ public class HttpShuffleClientTest
   @Before
   public void setup() throws IOException
   {
-    segmentFile = temporaryFolder.newFile();
-    try (Writer writer = Files.newBufferedWriter(segmentFile.toPath(), StandardCharsets.UTF_8)) {
+    File temp = temporaryFolder.newFile();
+    try (Writer writer = Files.newBufferedWriter(temp.toPath(), StandardCharsets.UTF_8)) {
       for (int j = 0; j < 10; j++) {
         writer.write(StringUtils.format("let's write some data.\n"));
       }
     }
+    segmentFile = new File(temp.getAbsolutePath() + ".zip");
+    CompressionUtils.zip(segmentFile.getParentFile(), segmentFile);
   }
 
   @Test
@@ -195,17 +198,17 @@ public class HttpShuffleClientTest
     return new HttpShuffleClient(httpClient);
   }
 
-  private static class TestPartitionLocation extends PartitionLocation<Integer>
+  private static class TestPartitionLocation extends GenericPartitionLocation
   {
     private TestPartitionLocation()
     {
-      super(HOST, PORT, false, SUBTASK_ID, INTERVAL, PARTITION_ID);
+      super(HOST, PORT, false, SUBTASK_ID, INTERVAL, null);
     }
 
     @Override
-    int getBucketId()
+    public int getBucketId()
     {
-      return getSecondaryPartition();
+      return PARTITION_ID;
     }
   }
 }
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskTest.java
index a28fb1e..8790da0 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskTest.java
@@ -20,6 +20,7 @@
 package org.apache.druid.indexing.common.task.batch.parallel;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Ordering;
 import org.apache.druid.data.input.InputSource;
 import org.apache.druid.data.input.impl.DimensionsSpec;
@@ -69,35 +70,43 @@ public class ParallelIndexSupervisorTaskTest
   public static class CreateMergeIoConfigsTest
   {
     private static final int TOTAL_NUM_MERGE_TASKS = 10;
-    private static final Function<List<GenericPartitionLocation>, PartialGenericSegmentMergeIOConfig>
-        CREATE_PARTIAL_SEGMENT_MERGE_IO_CONFIG = PartialGenericSegmentMergeIOConfig::new;
+    private static final Function<List<PartitionLocation>, PartialSegmentMergeIOConfig>
+        CREATE_PARTIAL_SEGMENT_MERGE_IO_CONFIG = PartialSegmentMergeIOConfig::new;
 
-    @Parameterized.Parameters(name = "count = {0}")
-    public static Iterable<? extends Object> data()
+    @Parameterized.Parameters(name = "count = {0}, partitionLocationType = {1}")
+    public static Iterable<? extends Object[]> data()
     {
       // different scenarios for last (index = 10 - 1 = 9) partition:
       return Arrays.asList(
-          20,  // even partitions per task: round(20 / 10) * (10 - 1) = 2 * 9 = 18 < 20
-          24,  // round down:               round(24 / 10) * (10 - 1) = 2 * 9 = 18 < 24
-          25,  // round up to greater:      round(25 / 10) * (10 - 1) = 3 * 9 = 27 > 25 (index out of bounds)
-          27   // round up to equal:        round(27 / 10) * (10 - 1) = 3 * 9 = 27 == 27 (empty partition)
+          new Object[][]{
+              {20, GenericPartitionStat.TYPE},  // even partitions per task: round(20 / 10) * (10 - 1) = 2 * 9 = 18 < 20
+              {24, DeepStoragePartitionStat.TYPE},  // round down:               round(24 / 10) * (10 - 1) = 2 * 9 = 18 < 24
+              {25, GenericPartitionStat.TYPE},  // round up to greater:      round(25 / 10) * (10 - 1) = 3 * 9 = 27 > 25 (index out of bounds)
+              {27, DeepStoragePartitionStat.TYPE} // round up to equal:        round(27 / 10) * (10 - 1) = 3 * 9 = 27 == 27 (empty partition)
+          }
       );
     }
 
-    @Parameterized.Parameter
+    public CreateMergeIoConfigsTest(int count, String partitionLocationType)
+    {
+      this.count = count;
+      this.partitionLocationType = partitionLocationType;
+    }
+
     public int count;
+    public String partitionLocationType;
 
     @Test
     public void handlesLastPartitionCorrectly()
     {
-      List<PartialGenericSegmentMergeIOConfig> assignedPartitionLocation = createMergeIOConfigs();
+      List<PartialSegmentMergeIOConfig> assignedPartitionLocation = createMergeIOConfigs();
       assertNoMissingPartitions(count, assignedPartitionLocation);
     }
 
     @Test
     public void sizesPartitionsEvenly()
     {
-      List<PartialGenericSegmentMergeIOConfig> assignedPartitionLocation = createMergeIOConfigs();
+      List<PartialSegmentMergeIOConfig> assignedPartitionLocation = createMergeIOConfigs();
       List<Integer> actualPartitionSizes = assignedPartitionLocation.stream()
                                                                     .map(i -> i.getPartitionLocations().size())
                                                                     .collect(Collectors.toList());
@@ -113,42 +122,56 @@ public class ParallelIndexSupervisorTaskTest
       );
     }
 
-    private List<PartialGenericSegmentMergeIOConfig> createMergeIOConfigs()
+    private List<PartialSegmentMergeIOConfig> createMergeIOConfigs()
     {
       return ParallelIndexSupervisorTask.createMergeIOConfigs(
           TOTAL_NUM_MERGE_TASKS,
-          createPartitionToLocations(count),
+          createPartitionToLocations(count, partitionLocationType),
           CREATE_PARTIAL_SEGMENT_MERGE_IO_CONFIG
       );
     }
 
-    private static Map<Pair<Interval, Integer>, List<GenericPartitionLocation>> createPartitionToLocations(int count)
+    private static Map<Pair<Interval, Integer>, List<PartitionLocation>> createPartitionToLocations(
+        int count,
+        String partitionLocationType
+    )
     {
       return IntStream.range(0, count).boxed().collect(
           Collectors.toMap(
               i -> Pair.of(createInterval(i), i),
-              i -> Collections.singletonList(createPartitionLocation(i))
+              i -> Collections.singletonList(createPartitionLocation(i, partitionLocationType))
           )
       );
     }
 
-    private static GenericPartitionLocation createPartitionLocation(int id)
+    private static PartitionLocation createPartitionLocation(int id, String partitionLocationType)
     {
-      return new GenericPartitionLocation(
-          "host",
-          0,
-          false,
-          "subTaskId",
-          createInterval(id),
-          new BuildingHashBasedNumberedShardSpec(
-              id,
-              id,
-              id + 1,
-              null,
-              HashPartitionFunction.MURMUR3_32_ABS,
-              new ObjectMapper()
-          )
-      );
+      if (DeepStoragePartitionStat.TYPE.equals(partitionLocationType)) {
+        return new DeepStoragePartitionLocation("", Intervals.of("2000/2099"), new BuildingHashBasedNumberedShardSpec(
+            id,
+            id,
+            id + 1,
+            null,
+            HashPartitionFunction.MURMUR3_32_ABS,
+            new ObjectMapper()
+        ), ImmutableMap.of());
+      } else {
+        return new GenericPartitionLocation(
+            "host",
+            0,
+            false,
+            "subTaskId",
+            createInterval(id),
+            new BuildingHashBasedNumberedShardSpec(
+                id,
+                id,
+                id + 1,
+                null,
+                HashPartitionFunction.MURMUR3_32_ABS,
+                new ObjectMapper()
+            )
+        );
+      }
     }
 
     private static Interval createInterval(int id)
@@ -158,7 +181,7 @@ public class ParallelIndexSupervisorTaskTest
 
     private static void assertNoMissingPartitions(
         int count,
-        List<PartialGenericSegmentMergeIOConfig> assignedPartitionLocation
+        List<PartialSegmentMergeIOConfig> assignedPartitionLocation
     )
     {
       List<Integer> expectedIds = IntStream.range(0, count).boxed().collect(Collectors.toList());
@@ -167,7 +190,7 @@ public class ParallelIndexSupervisorTaskTest
                                                          .flatMap(
                                                              i -> i.getPartitionLocations()
                                                                    .stream()
-                                                                   .map(GenericPartitionLocation::getBucketId)
+                                                                   .map(PartitionLocation::getBucketId)
                                                          )
                                                          .sorted()
                                                          .collect(Collectors.toList());
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexTestingFactory.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexTestingFactory.java
index 9e38195..dc6537f 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexTestingFactory.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexTestingFactory.java
@@ -77,10 +77,10 @@ class ParallelIndexTestingFactory
   static final ShuffleClient SHUFFLE_CLIENT = new ShuffleClient()
   {
     @Override
-    public <T, P extends PartitionLocation<T>> File fetchSegmentFile(
+    public File fetchSegmentFile(
         File partitionDir,
         String supervisorTaskId,
-        P location
+        PartitionLocation location
     )
     {
       return null;
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/PartialGenericSegmentMergeTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/PartialGenericSegmentMergeTaskTest.java
index b98f09e..97930b0 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/PartialGenericSegmentMergeTaskTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/PartialGenericSegmentMergeTaskTest.java
@@ -19,6 +19,7 @@
 
 package org.apache.druid.indexing.common.task.batch.parallel;
 
+import com.google.common.collect.ImmutableMap;
 import org.apache.druid.indexer.partitions.HashedPartitionsSpec;
 import org.apache.druid.segment.TestHelper;
 import org.hamcrest.Matchers;
@@ -27,11 +28,27 @@ import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
 
+import java.util.Arrays;
 import java.util.Collections;
 
+@RunWith(Parameterized.class)
 public class PartialGenericSegmentMergeTaskTest extends AbstractParallelIndexSupervisorTaskTest
 {
+  @Parameterized.Parameters(name = "partitionLocation = {0}")
+  public static Iterable<? extends Object> data()
+  {
+    return Arrays.asList(
+        GENERIC_PARTITION_LOCATION,
+        DEEP_STORE_PARTITION_LOCATION
+    );
+  }
+
+  @Parameterized.Parameter
+  public PartitionLocation partitionLocation;
+
   private static final GenericPartitionLocation GENERIC_PARTITION_LOCATION = new GenericPartitionLocation(
       ParallelIndexTestingFactory.HOST,
       ParallelIndexTestingFactory.PORT,
@@ -40,26 +57,21 @@ public class PartialGenericSegmentMergeTaskTest extends AbstractParallelIndexSup
       ParallelIndexTestingFactory.INTERVAL,
       ParallelIndexTestingFactory.HASH_BASED_NUMBERED_SHARD_SPEC
   );
-  private static final PartialGenericSegmentMergeIOConfig IO_CONFIG =
-      new PartialGenericSegmentMergeIOConfig(Collections.singletonList(GENERIC_PARTITION_LOCATION));
-  private static final HashedPartitionsSpec PARTITIONS_SPEC = new HashedPartitionsSpec(
-      null,
-      1,
-      Collections.emptyList()
+
+  private static final DeepStoragePartitionLocation DEEP_STORE_PARTITION_LOCATION = new DeepStoragePartitionLocation(
+      ParallelIndexTestingFactory.SUBTASK_ID,
+      ParallelIndexTestingFactory.INTERVAL,
+      ParallelIndexTestingFactory.HASH_BASED_NUMBERED_SHARD_SPEC,
+      ImmutableMap.of()
   );
-  private static final PartialGenericSegmentMergeIngestionSpec INGESTION_SPEC =
-      new PartialGenericSegmentMergeIngestionSpec(
-          ParallelIndexTestingFactory.createDataSchema(ParallelIndexTestingFactory.INPUT_INTERVALS),
-          IO_CONFIG,
-          new ParallelIndexTestingFactory.TuningConfigBuilder()
-              .partitionsSpec(PARTITIONS_SPEC)
-              .build()
-      );
 
   @Rule
   public ExpectedException exception = ExpectedException.none();
 
   private PartialGenericSegmentMergeTask target;
+  private PartialSegmentMergeIOConfig ioConfig;
+  private HashedPartitionsSpec partitionsSpec;
+  private PartialSegmentMergeIngestionSpec ingestionSpec;
 
   public PartialGenericSegmentMergeTaskTest()
   {
@@ -70,6 +82,19 @@ public class PartialGenericSegmentMergeTaskTest extends AbstractParallelIndexSup
   @Before
   public void setup()
   {
+    ioConfig = new PartialSegmentMergeIOConfig(Collections.singletonList(partitionLocation));
+    partitionsSpec = new HashedPartitionsSpec(
+        null,
+        1,
+        Collections.emptyList()
+    );
+    ingestionSpec = new PartialSegmentMergeIngestionSpec(
+        ParallelIndexTestingFactory.createDataSchema(ParallelIndexTestingFactory.INPUT_INTERVALS),
+        ioConfig,
+        new ParallelIndexTestingFactory.TuningConfigBuilder()
+            .partitionsSpec(partitionsSpec)
+            .build()
+    );
     target = new PartialGenericSegmentMergeTask(
         ParallelIndexTestingFactory.AUTOMATIC_ID,
         ParallelIndexTestingFactory.GROUP_ID,
@@ -77,7 +102,7 @@ public class PartialGenericSegmentMergeTaskTest extends AbstractParallelIndexSup
         ParallelIndexTestingFactory.SUPERVISOR_TASK_ID,
         ParallelIndexTestingFactory.SUBTASK_SPEC_ID,
         ParallelIndexTestingFactory.NUM_ATTEMPTS,
-        INGESTION_SPEC,
+        ingestionSpec,
         ParallelIndexTestingFactory.CONTEXT
     );
   }
@@ -108,11 +133,11 @@ public class PartialGenericSegmentMergeTaskTest extends AbstractParallelIndexSup
         ParallelIndexTestingFactory.SUPERVISOR_TASK_ID,
         ParallelIndexTestingFactory.SUBTASK_SPEC_ID,
         ParallelIndexTestingFactory.NUM_ATTEMPTS,
-        new PartialGenericSegmentMergeIngestionSpec(
+        new PartialSegmentMergeIngestionSpec(
             ParallelIndexTestingFactory.createDataSchema(null),
-            IO_CONFIG,
+            ioConfig,
             new ParallelIndexTestingFactory.TuningConfigBuilder()
-                .partitionsSpec(PARTITIONS_SPEC)
+                .partitionsSpec(partitionsSpec)
                 .build()
         ),
         ParallelIndexTestingFactory.CONTEXT
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/PartialGenericSegmentMergeIngestionSpecTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/PartialSegmentMergeIOConfigTest.java
similarity index 54%
copy from indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/PartialGenericSegmentMergeIngestionSpecTest.java
copy to indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/PartialSegmentMergeIOConfigTest.java
index c30cc9e..a7901ab 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/PartialGenericSegmentMergeIngestionSpecTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/PartialSegmentMergeIOConfigTest.java
@@ -20,15 +20,49 @@
 package org.apache.druid.indexing.common.task.batch.parallel;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
-import org.apache.druid.indexer.partitions.HashedPartitionsSpec;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
 import org.apache.druid.segment.TestHelper;
 import org.junit.Before;
 import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
 
+import java.util.Collection;
 import java.util.Collections;
 
-public class PartialGenericSegmentMergeIngestionSpecTest
+@RunWith(Parameterized.class)
+public class PartialSegmentMergeIOConfigTest
 {
+  final PartitionLocation partitionLocation;
+
+  public PartialSegmentMergeIOConfigTest(PartitionLocation partitionLocation)
+  {
+    this.partitionLocation = partitionLocation;
+  }
+
+  @Parameterized.Parameters(name = "partitionLocation = {0}")
+  public static Collection<Object[]> data()
+  {
+    return ImmutableList.of(new Object[]{
+        new GenericPartitionLocation(
+            ParallelIndexTestingFactory.HOST,
+            ParallelIndexTestingFactory.PORT,
+            ParallelIndexTestingFactory.USE_HTTPS,
+            ParallelIndexTestingFactory.SUBTASK_ID,
+            ParallelIndexTestingFactory.INTERVAL,
+            ParallelIndexTestingFactory.HASH_BASED_NUMBERED_SHARD_SPEC
+        )
+    }, new Object[]{
+        new DeepStoragePartitionLocation(
+            ParallelIndexTestingFactory.SUBTASK_ID,
+            ParallelIndexTestingFactory.INTERVAL,
+            ParallelIndexTestingFactory.HASH_BASED_NUMBERED_SHARD_SPEC,
+            ImmutableMap.of("path", "/test/path")
+        )
+    });
+  }
+
   private static final ObjectMapper OBJECT_MAPPER = ParallelIndexTestingFactory.createObjectMapper();
   private static final GenericPartitionLocation GENERIC_PARTITION_LOCATION = new GenericPartitionLocation(
       ParallelIndexTestingFactory.HOST,
@@ -38,26 +72,13 @@ public class PartialGenericSegmentMergeIngestionSpecTest
       ParallelIndexTestingFactory.INTERVAL,
       ParallelIndexTestingFactory.HASH_BASED_NUMBERED_SHARD_SPEC
   );
-  private static final PartialGenericSegmentMergeIOConfig IO_CONFIG =
-      new PartialGenericSegmentMergeIOConfig(Collections.singletonList(GENERIC_PARTITION_LOCATION));
-  private static final HashedPartitionsSpec PARTITIONS_SPEC = new HashedPartitionsSpec(
-      null,
-      1,
-      Collections.emptyList()
-  );
 
-  private PartialGenericSegmentMergeIngestionSpec target;
+  private PartialSegmentMergeIOConfig target;
 
   @Before
   public void setup()
   {
-    target = new PartialGenericSegmentMergeIngestionSpec(
-        ParallelIndexTestingFactory.createDataSchema(ParallelIndexTestingFactory.INPUT_INTERVALS),
-        IO_CONFIG,
-        new ParallelIndexTestingFactory.TuningConfigBuilder()
-            .partitionsSpec(PARTITIONS_SPEC)
-            .build()
-    );
+    target = new PartialSegmentMergeIOConfig(Collections.singletonList(partitionLocation));
   }
 
   @Test
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/PartialGenericSegmentMergeIngestionSpecTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/PartialSegmentMergeIngestionSpecTest.java
similarity index 60%
rename from indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/PartialGenericSegmentMergeIngestionSpecTest.java
rename to indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/PartialSegmentMergeIngestionSpecTest.java
index c30cc9e..04c2819 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/PartialGenericSegmentMergeIngestionSpecTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/PartialSegmentMergeIngestionSpecTest.java
@@ -20,16 +20,34 @@
 package org.apache.druid.indexing.common.task.batch.parallel;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.ImmutableMap;
 import org.apache.druid.indexer.partitions.HashedPartitionsSpec;
 import org.apache.druid.segment.TestHelper;
 import org.junit.Before;
 import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
 
+import java.util.Arrays;
 import java.util.Collections;
 
-public class PartialGenericSegmentMergeIngestionSpecTest
+@RunWith(Parameterized.class)
+public class PartialSegmentMergeIngestionSpecTest
 {
   private static final ObjectMapper OBJECT_MAPPER = ParallelIndexTestingFactory.createObjectMapper();
+
+  @Parameterized.Parameters(name = "partitionLocation = {0}")
+  public static Iterable<? extends Object> data()
+  {
+    return Arrays.asList(
+        GENERIC_PARTITION_LOCATION,
+        DEEP_STORE_PARTITION_LOCATION
+    );
+  }
+
+  @Parameterized.Parameter
+  public PartitionLocation partitionLocation;
+
   private static final GenericPartitionLocation GENERIC_PARTITION_LOCATION = new GenericPartitionLocation(
       ParallelIndexTestingFactory.HOST,
       ParallelIndexTestingFactory.PORT,
@@ -38,24 +56,32 @@ public class PartialGenericSegmentMergeIngestionSpecTest
       ParallelIndexTestingFactory.INTERVAL,
       ParallelIndexTestingFactory.HASH_BASED_NUMBERED_SHARD_SPEC
   );
-  private static final PartialGenericSegmentMergeIOConfig IO_CONFIG =
-      new PartialGenericSegmentMergeIOConfig(Collections.singletonList(GENERIC_PARTITION_LOCATION));
-  private static final HashedPartitionsSpec PARTITIONS_SPEC = new HashedPartitionsSpec(
-      null,
-      1,
-      Collections.emptyList()
+
+  private static final DeepStoragePartitionLocation DEEP_STORE_PARTITION_LOCATION = new DeepStoragePartitionLocation(
+      ParallelIndexTestingFactory.SUBTASK_ID,
+      ParallelIndexTestingFactory.INTERVAL,
+      ParallelIndexTestingFactory.HASH_BASED_NUMBERED_SHARD_SPEC,
+      ImmutableMap.of()
   );
 
-  private PartialGenericSegmentMergeIngestionSpec target;
+  private PartialSegmentMergeIngestionSpec target;
+  private PartialSegmentMergeIOConfig ioConfig;
+  private HashedPartitionsSpec partitionsSpec;
 
   @Before
   public void setup()
   {
-    target = new PartialGenericSegmentMergeIngestionSpec(
+    ioConfig = new PartialSegmentMergeIOConfig(Collections.singletonList(partitionLocation));
+    partitionsSpec = new HashedPartitionsSpec(
+        null,
+        1,
+        Collections.emptyList()
+    );
+    target = new PartialSegmentMergeIngestionSpec(
         ParallelIndexTestingFactory.createDataSchema(ParallelIndexTestingFactory.INPUT_INTERVALS),
-        IO_CONFIG,
+        ioConfig,
         new ParallelIndexTestingFactory.TuningConfigBuilder()
-            .partitionsSpec(PARTITIONS_SPEC)
+            .partitionsSpec(partitionsSpec)
             .build()
     );
   }
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/worker/shuffle/LocalIntermediaryDataManagerAutoCleanupTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/worker/shuffle/LocalIntermediaryDataManagerAutoCleanupTest.java
index 632b1ef..2031b19 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/worker/shuffle/LocalIntermediaryDataManagerAutoCleanupTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/worker/shuffle/LocalIntermediaryDataManagerAutoCleanupTest.java
@@ -20,6 +20,7 @@
 package org.apache.druid.indexing.worker.shuffle;
 
 import com.google.common.collect.ImmutableList;
+import com.google.common.primitives.Ints;
 import org.apache.commons.io.FileUtils;
 import org.apache.druid.client.indexing.IndexingServiceClient;
 import org.apache.druid.client.indexing.NoopIndexingServiceClient;
@@ -112,7 +113,7 @@ public class LocalIntermediaryDataManagerAutoCleanupTest
   }
 
   @After
-  public void teardown() throws InterruptedException
+  public void teardown()
   {
     intermediaryDataManager.stop();
   }
@@ -136,6 +137,7 @@ public class LocalIntermediaryDataManagerAutoCleanupTest
     // Each file size is 138 bytes after compression
     final File segmentDir = tempDir.newFolder();
     FileUtils.write(new File(segmentDir, fileName), "test data.", StandardCharsets.UTF_8);
+    FileUtils.writeByteArrayToFile(new File(segmentDir, "version.bin"), Ints.toByteArray(9));
     return segmentDir;
   }
 
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/worker/shuffle/LocalIntermediaryDataManagerManualAddAndDeleteTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/worker/shuffle/LocalIntermediaryDataManagerManualAddAndDeleteTest.java
index 5ad391a..9a5dc8f 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/worker/shuffle/LocalIntermediaryDataManagerManualAddAndDeleteTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/worker/shuffle/LocalIntermediaryDataManagerManualAddAndDeleteTest.java
@@ -21,6 +21,7 @@ package org.apache.druid.indexing.worker.shuffle;
 
 import com.google.common.collect.ImmutableList;
 import com.google.common.io.ByteSource;
+import com.google.common.primitives.Ints;
 import org.apache.commons.io.FileUtils;
 import org.apache.druid.client.indexing.IndexingServiceClient;
 import org.apache.druid.client.indexing.NoopIndexingServiceClient;
@@ -76,7 +77,7 @@ public class LocalIntermediaryDataManagerManualAddAndDeleteTest
         false,
         null,
         null,
-        ImmutableList.of(new StorageLocationConfig(intermediarySegmentsLocation, 600L, null)),
+        ImmutableList.of(new StorageLocationConfig(intermediarySegmentsLocation, 1200L, null)),
         false,
         false
     );
@@ -86,7 +87,7 @@ public class LocalIntermediaryDataManagerManualAddAndDeleteTest
   }
 
   @After
-  public void teardown() throws InterruptedException
+  public void teardown()
   {
     intermediaryDataManager.stop();
   }
@@ -232,6 +233,7 @@ public class LocalIntermediaryDataManagerManualAddAndDeleteTest
     // Each file size is 138 bytes after compression
     final File segmentDir = tempDir.newFolder();
     FileUtils.write(new File(segmentDir, fileName), "test data.", StandardCharsets.UTF_8);
+    FileUtils.writeByteArrayToFile(new File(segmentDir, "version.bin"), Ints.toByteArray(9));
     return segmentDir;
   }
 
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/worker/shuffle/ShuffleDataSegmentPusherTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/worker/shuffle/ShuffleDataSegmentPusherTest.java
index f5fbb7f..f05a1dc 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/worker/shuffle/ShuffleDataSegmentPusherTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/worker/shuffle/ShuffleDataSegmentPusherTest.java
@@ -19,17 +19,30 @@
 
 package org.apache.druid.indexing.worker.shuffle;
 
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.introspect.AnnotationIntrospectorPair;
+import com.fasterxml.jackson.databind.module.SimpleModule;
 import com.google.common.collect.ImmutableList;
 import com.google.common.io.ByteSource;
 import com.google.common.io.Files;
 import com.google.common.primitives.Ints;
+import com.google.inject.Injector;
 import org.apache.commons.io.FileUtils;
 import org.apache.druid.client.indexing.IndexingServiceClient;
 import org.apache.druid.client.indexing.NoopIndexingServiceClient;
+import org.apache.druid.guice.GuiceAnnotationIntrospector;
+import org.apache.druid.guice.GuiceInjectableValues;
+import org.apache.druid.guice.GuiceInjectors;
 import org.apache.druid.indexing.common.config.TaskConfig;
 import org.apache.druid.indexing.worker.config.WorkerConfig;
-import org.apache.druid.java.util.common.FileUtils.FileCopyResult;
+import org.apache.druid.jackson.DefaultObjectMapper;
 import org.apache.druid.java.util.common.Intervals;
+import org.apache.druid.segment.loading.LoadSpec;
+import org.apache.druid.segment.loading.LocalDataSegmentPuller;
+import org.apache.druid.segment.loading.LocalDataSegmentPusher;
+import org.apache.druid.segment.loading.LocalDataSegmentPusherConfig;
+import org.apache.druid.segment.loading.LocalLoadSpec;
+import org.apache.druid.segment.loading.SegmentLoadingException;
 import org.apache.druid.segment.loading.StorageLocationConfig;
 import org.apache.druid.timeline.DataSegment;
 import org.apache.druid.timeline.partition.BucketNumberedShardSpec;
@@ -41,23 +54,45 @@ import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
 import org.mockito.Mockito;
 
 import java.io.File;
 import java.io.IOException;
 import java.nio.charset.StandardCharsets;
-import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
 import java.util.Comparator;
 import java.util.List;
 import java.util.Optional;
 
+@RunWith(Parameterized.class)
 public class ShuffleDataSegmentPusherTest
 {
+  private static final String LOCAL = "local";
+  private static final String DEEPSTORE = "deepstore";
+
+  @Parameterized.Parameters(name = "intermediateDataManager={0}")
+  public static Collection<Object[]> data()
+  {
+    return ImmutableList.of(new Object[]{LOCAL}, new Object[]{DEEPSTORE});
+  }
+
   @Rule
   public final TemporaryFolder temporaryFolder = new TemporaryFolder();
 
-  private LocalIntermediaryDataManager intermediaryDataManager;
+  private IntermediaryDataManager intermediaryDataManager;
   private ShuffleDataSegmentPusher segmentPusher;
+  private ObjectMapper mapper;
+
+  private final String intermediateDataStore;
+  private File localDeepStore;
+
+  public ShuffleDataSegmentPusherTest(String intermediateDataStore)
+  {
+    this.intermediateDataStore = intermediateDataStore;
+  }
 
   @Before
   public void setup() throws IOException
@@ -77,19 +112,47 @@ public class ShuffleDataSegmentPusherTest
         false
     );
     final IndexingServiceClient indexingServiceClient = new NoopIndexingServiceClient();
-    intermediaryDataManager = new LocalIntermediaryDataManager(workerConfig, taskConfig, indexingServiceClient);
+    if (LOCAL.equals(intermediateDataStore)) {
+      intermediaryDataManager = new LocalIntermediaryDataManager(workerConfig, taskConfig, indexingServiceClient);
+    } else if (DEEPSTORE.equals(intermediateDataStore)) {
+      localDeepStore = temporaryFolder.newFolder("localStorage");
+      intermediaryDataManager = new DeepStorageIntermediaryDataManager(
+          new LocalDataSegmentPusher(
+              new LocalDataSegmentPusherConfig()
+              {
+                @Override
+                public File getStorageDirectory()
+                {
+                  return localDeepStore;
+                }
+              }));
+    }
     intermediaryDataManager.start();
     segmentPusher = new ShuffleDataSegmentPusher("supervisorTaskId", "subTaskId", intermediaryDataManager);
+
+    final Injector injector = GuiceInjectors.makeStartupInjectorWithModules(
+        ImmutableList.of(
+            binder -> binder.bind(LocalDataSegmentPuller.class)
+        )
+    );
+    mapper = new DefaultObjectMapper();
+    mapper.registerModule(new SimpleModule("loadSpecTest").registerSubtypes(LocalLoadSpec.class));
+    mapper.setInjectableValues(new GuiceInjectableValues(injector));
+    final GuiceAnnotationIntrospector guiceIntrospector = new GuiceAnnotationIntrospector();
+    mapper.setAnnotationIntrospectors(
+        new AnnotationIntrospectorPair(guiceIntrospector, mapper.getSerializationConfig().getAnnotationIntrospector()),
+        new AnnotationIntrospectorPair(guiceIntrospector, mapper.getDeserializationConfig().getAnnotationIntrospector())
+    );
   }
 
   @After
-  public void teardown() throws InterruptedException
+  public void teardown()
   {
     intermediaryDataManager.stop();
   }
 
   @Test
-  public void testPush() throws IOException
+  public void testPush() throws IOException, SegmentLoadingException
   {
     final File segmentDir = generateSegmentDir();
     final DataSegment segment = newSegment(Intervals.of("2018/2019"));
@@ -98,21 +161,33 @@ public class ShuffleDataSegmentPusherTest
     Assert.assertEquals(9, pushed.getBinaryVersion().intValue());
     Assert.assertEquals(14, pushed.getSize()); // 10 bytes data + 4 bytes version
 
-    final Optional<ByteSource> zippedSegment = intermediaryDataManager.findPartitionFile(
-        "supervisorTaskId",
-        "subTaskId",
-        segment.getInterval(),
-        segment.getShardSpec().getPartitionNum()
-    );
-    Assert.assertTrue(zippedSegment.isPresent());
     final File tempDir = temporaryFolder.newFolder();
-    final FileCopyResult result = CompressionUtils.unzip(
-        zippedSegment.get(),
-        tempDir,
-        org.apache.druid.java.util.common.FileUtils.IS_EXCEPTION,
-        false
-    );
-    final List<File> unzippedFiles = new ArrayList<>(result.getFiles());
+    if (intermediaryDataManager instanceof LocalIntermediaryDataManager) {
+      final Optional<ByteSource> zippedSegment = intermediaryDataManager.findPartitionFile(
+          "supervisorTaskId",
+          "subTaskId",
+          segment.getInterval(),
+          segment.getShardSpec().getPartitionNum()
+      );
+      Assert.assertTrue(zippedSegment.isPresent());
+      CompressionUtils.unzip(
+          zippedSegment.get(),
+          tempDir,
+          org.apache.druid.java.util.common.FileUtils.IS_EXCEPTION,
+          false
+      );
+    } else if (intermediaryDataManager instanceof DeepStorageIntermediaryDataManager) {
+      final LoadSpec loadSpec = mapper.convertValue(pushed.getLoadSpec(), LoadSpec.class);
+      Assert.assertTrue(pushed.getLoadSpec()
+                              .get("path")
+                              .toString()
+                              .startsWith(localDeepStore.getAbsolutePath()
+                                          + "/"
+                                          + DeepStorageIntermediaryDataManager.SHUFFLE_DATA_DIR_PREFIX));
+      loadSpec.loadSegment(tempDir);
+    }
+
+    final List<File> unzippedFiles = Arrays.asList(tempDir.listFiles());
     unzippedFiles.sort(Comparator.comparing(File::getName));
     final File dataFile = unzippedFiles.get(0);
     Assert.assertEquals("test", dataFile.getName());
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/worker/shuffle/ShuffleResourceTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/worker/shuffle/ShuffleResourceTest.java
index 798b05f..0334030 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/worker/shuffle/ShuffleResourceTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/worker/shuffle/ShuffleResourceTest.java
@@ -20,6 +20,7 @@
 package org.apache.druid.indexing.worker.shuffle;
 
 import com.google.common.collect.ImmutableList;
+import com.google.common.primitives.Ints;
 import org.apache.commons.io.FileUtils;
 import org.apache.druid.client.indexing.IndexingServiceClient;
 import org.apache.druid.client.indexing.NoopIndexingServiceClient;
@@ -153,7 +154,7 @@ public class ShuffleResourceTest
     final Map<String, PerDatasourceShuffleMetrics> snapshot = shuffleMetrics.snapshotAndReset();
     Assert.assertEquals(Status.OK.getStatusCode(), response.getStatus());
     Assert.assertEquals(1, snapshot.get(supervisorTaskId).getShuffleRequests());
-    Assert.assertEquals(134, snapshot.get(supervisorTaskId).getShuffleBytes());
+    Assert.assertEquals(254, snapshot.get(supervisorTaskId).getShuffleBytes());
   }
 
   @Test
@@ -213,6 +214,7 @@ public class ShuffleResourceTest
     // Each file size is 138 bytes after compression
     final File segmentDir = tempDir.newFolder();
     FileUtils.write(new File(segmentDir, fileName), "test data.", StandardCharsets.UTF_8);
+    FileUtils.writeByteArrayToFile(new File(segmentDir, "version.bin"), Ints.toByteArray(9));
     return segmentDir;
   }
 }
diff --git a/integration-tests/docker/Dockerfile b/integration-tests/docker/Dockerfile
index 845bf21..1dc42c8 100644
--- a/integration-tests/docker/Dockerfile
+++ b/integration-tests/docker/Dockerfile
@@ -123,8 +123,8 @@ ENTRYPOINT /tls/generate-server-certs-and-keystores.sh \
             # Some test groups require pre-existing data to be setup
             && setupData \
             # Export the service config file path to use in supervisord conf file
-            && export DRUID_COMMON_CONF_DIR="$(. /druid.sh; getConfPath ${DRUID_SERVICE})" \
+            && export DRUID_SERVICE_CONF_DIR="$(. /druid.sh; getConfPath ${DRUID_SERVICE})" \
             # Export the common config file path to use in supervisord conf file
-            && export DRUID_SERVICE_CONF_DIR="$(. /druid.sh; getConfPath _common)" \
+            && export DRUID_COMMON_CONF_DIR="$(. /druid.sh; getConfPath _common)" \
             # Run Druid service using supervisord
             && exec /usr/bin/supervisord -c /etc/supervisor/conf.d/supervisord.conf
diff --git a/integration-tests/docker/docker-compose.shuffle-deep-store.yml b/integration-tests/docker/docker-compose.shuffle-deep-store.yml
new file mode 100644
index 0000000..cf2670e
--- /dev/null
+++ b/integration-tests/docker/docker-compose.shuffle-deep-store.yml
@@ -0,0 +1,105 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+version: "2.2"
+services:
+  druid-zookeeper-kafka:
+    extends:
+      file: docker-compose.base.yml
+      service: druid-zookeeper-kafka
+
+  druid-metadata-storage:
+    extends:
+      file: docker-compose.base.yml
+      service: druid-metadata-storage
+    environment:
+      - DRUID_INTEGRATION_TEST_GROUP=${DRUID_INTEGRATION_TEST_GROUP}
+    depends_on:
+      - druid-zookeeper-kafka
+
+  druid-coordinator:
+    extends:
+      file: docker-compose.base.yml
+      service: druid-coordinator
+    environment:
+      - DRUID_INTEGRATION_TEST_GROUP=${DRUID_INTEGRATION_TEST_GROUP}
+    depends_on:
+      - druid-metadata-storage
+      - druid-zookeeper-kafka
+
+  druid-overlord:
+    extends:
+      file: docker-compose.base.yml
+      service: druid-overlord
+    env_file:
+      - ./environment-configs/common-shuffle-deep-store
+    environment:
+      - DRUID_INTEGRATION_TEST_GROUP=${DRUID_INTEGRATION_TEST_GROUP}
+    depends_on:
+      - druid-coordinator
+      - druid-metadata-storage
+      - druid-zookeeper-kafka
+
+  druid-historical:
+    extends:
+      file: docker-compose.base.yml
+      service: druid-historical
+    environment:
+      - DRUID_INTEGRATION_TEST_GROUP=${DRUID_INTEGRATION_TEST_GROUP}
+    depends_on:
+      - druid-zookeeper-kafka
+
+  druid-middlemanager:
+    extends:
+      file: docker-compose.base.yml
+      service: druid-middlemanager
+    env_file:
+      - ./environment-configs/common-shuffle-deep-store
+    environment:
+      - DRUID_INTEGRATION_TEST_GROUP=${DRUID_INTEGRATION_TEST_GROUP}
+    depends_on:
+      - druid-zookeeper-kafka
+      - druid-overlord
+
+  druid-broker:
+    extends:
+      file: docker-compose.base.yml
+      service: druid-broker
+    environment:
+      - DRUID_INTEGRATION_TEST_GROUP=${DRUID_INTEGRATION_TEST_GROUP}
+    depends_on:
+      - druid-coordinator
+      - druid-zookeeper-kafka
+      - druid-middlemanager
+      - druid-historical
+
+  druid-router:
+    extends:
+      file: docker-compose.base.yml
+      service: druid-router
+    environment:
+      - DRUID_INTEGRATION_TEST_GROUP=${DRUID_INTEGRATION_TEST_GROUP}
+    depends_on:
+      - druid-zookeeper-kafka
+      - druid-coordinator
+      - druid-broker
+      - druid-overlord
+
+networks:
+  druid-it-net:
+    name: druid-it-net
+    ipam:
+      config:
+        - subnet: 172.172.172.0/24
\ No newline at end of file
diff --git a/integration-tests/docker/environment-configs/common-shuffle-deep-store b/integration-tests/docker/environment-configs/common-shuffle-deep-store
new file mode 100644
index 0000000..30117bf
--- /dev/null
+++ b/integration-tests/docker/environment-configs/common-shuffle-deep-store
@@ -0,0 +1,81 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+LANG=C.UTF-8
+LANGUAGE=C.UTF-8
+LC_ALL=C.UTF-8
+
+# JAVA OPTS
+COMMON_DRUID_JAVA_OPTS=-Duser.timezone=UTC -Dfile.encoding=UTF-8 -Dlog4j.configurationFile=/shared/docker/lib/log4j2.xml -XX:+ExitOnOutOfMemoryError -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=/tmp
+DRUID_DEP_LIB_DIR=/shared/hadoop_xml:/shared/docker/lib/*:/usr/local/druid/lib/mysql-connector-java.jar
+
+# Druid configs
+druid_extensions_loadList=[]
+druid_extensions_directory=/shared/docker/extensions
+druid_auth_authenticator_basic_authorizerName=basic
+druid_auth_authenticator_basic_initialAdminPassword=priest
+druid_auth_authenticator_basic_initialInternalClientPassword=warlock
+druid_auth_authenticator_basic_type=basic
+druid_auth_authenticatorChain=["basic"]
+druid_auth_authorizer_basic_type=basic
+druid_auth_authorizers=["basic"]
+druid_client_https_certAlias=druid
+druid_client_https_keyManagerPassword=druid123
+druid_client_https_keyStorePassword=druid123
+druid_client_https_keyStorePath=/tls/server.jks
+druid_client_https_protocol=TLSv1.2
+druid_client_https_trustStoreAlgorithm=PKIX
+druid_client_https_trustStorePassword=druid123
+druid_client_https_trustStorePath=/tls/truststore.jks
+druid_enableTlsPort=true
+druid_escalator_authorizerName=basic
+druid_escalator_internalClientPassword=warlock
+druid_escalator_internalClientUsername=druid_system
+druid_escalator_type=basic
+druid_lookup_numLookupLoadingThreads=1
+druid_server_http_numThreads=20
+# Allow OPTIONS method for ITBasicAuthConfigurationTest.testSystemSchemaAccess
+druid_server_http_allowedHttpMethods=["OPTIONS"]
+druid_server_https_certAlias=druid
+druid_server_https_keyManagerPassword=druid123
+druid_server_https_keyStorePassword=druid123
+druid_server_https_keyStorePath=/tls/server.jks
+druid_server_https_keyStoreType=jks
+druid_server_https_requireClientCertificate=true
+druid_server_https_trustStoreAlgorithm=PKIX
+druid_server_https_trustStorePassword=druid123
+druid_server_https_trustStorePath=/tls/truststore.jks
+druid_server_https_validateHostnames=true
+druid_zk_service_host=druid-zookeeper-kafka
+druid_auth_basic_common_maxSyncRetries=20
+druid_indexer_logs_directory=/shared/tasklogs
+druid_sql_enable=true
+druid_extensions_hadoopDependenciesDir=/shared/hadoop-dependencies
+druid_request_logging_type=slf4j
+druid_coordinator_kill_supervisor_on=true
+druid_coordinator_kill_supervisor_period=PT10S
+druid_coordinator_kill_supervisor_durationToRetain=PT0M
+druid_coordinator_period_metadataStoreManagementPeriod=PT10S
+
+# Testing the legacy config from https://github.com/apache/druid/pull/10267
+# Can remove this when the flag is no longer needed
+druid_indexer_task_ignoreTimestampSpecForDruidInputSource=true
+# Test with deep storage as intermediate location to store shuffle data
+# Local deep storage will be used here
+druid_processing_intermediaryData_storage_type=deepstore
diff --git a/integration-tests/script/docker_compose_args.sh b/integration-tests/script/docker_compose_args.sh
index db25696..ff84fd3 100644
--- a/integration-tests/script/docker_compose_args.sh
+++ b/integration-tests/script/docker_compose_args.sh
@@ -69,6 +69,10 @@ getComposeArgs()
     then
       # default + schema registry container
       echo "-f ${DOCKERDIR}/docker-compose.yml -f ${DOCKERDIR}/docker-compose.schema-registry.yml"
+    elif [ "$DRUID_INTEGRATION_TEST_GROUP" = "shuffle-deep-store" ]
+    then
+      # default + schema registry container
+      echo "-f ${DOCKERDIR}/docker-compose.shuffle-deep-store.yml"
     else
       # default
       echo "-f ${DOCKERDIR}/docker-compose.yml"
diff --git a/integration-tests/src/test/java/org/apache/druid/tests/TestNGGroup.java b/integration-tests/src/test/java/org/apache/druid/tests/TestNGGroup.java
index bd346ff..f583be4 100644
--- a/integration-tests/src/test/java/org/apache/druid/tests/TestNGGroup.java
+++ b/integration-tests/src/test/java/org/apache/druid/tests/TestNGGroup.java
@@ -155,4 +155,6 @@ public class TestNGGroup
   public static final String KINESIS_DATA_FORMAT = "kinesis-data-format";
 
   public static final String HIGH_AVAILABILTY = "high-availability";
+
+  public static final String SHUFFLE_DEEP_STORE = "shuffle-deep-store";
 }
diff --git a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITPerfectRollupParallelIndexTest.java b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITPerfectRollupParallelIndexTest.java
index 1bb1a79..1cd90f0 100644
--- a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITPerfectRollupParallelIndexTest.java
+++ b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITPerfectRollupParallelIndexTest.java
@@ -36,7 +36,7 @@ import org.testng.annotations.Test;
 import java.io.Closeable;
 import java.util.function.Function;
 
-@Test(groups = TestNGGroup.PERFECT_ROLLUP_PARALLEL_BATCH_INDEX)
+@Test(groups = {TestNGGroup.PERFECT_ROLLUP_PARALLEL_BATCH_INDEX, TestNGGroup.SHUFFLE_DEEP_STORE})
 @Guice(moduleFactory = DruidTestModuleFactory.class)
 public class ITPerfectRollupParallelIndexTest extends AbstractITBatchIndexTest
 {
diff --git a/server/src/main/java/org/apache/druid/segment/loading/LocalDataSegmentPusher.java b/server/src/main/java/org/apache/druid/segment/loading/LocalDataSegmentPusher.java
index 5117ff0..fc8aeb8 100644
--- a/server/src/main/java/org/apache/druid/segment/loading/LocalDataSegmentPusher.java
+++ b/server/src/main/java/org/apache/druid/segment/loading/LocalDataSegmentPusher.java
@@ -65,8 +65,14 @@ public class LocalDataSegmentPusher implements DataSegmentPusher
   public DataSegment push(final File dataSegmentFile, final DataSegment segment, final boolean useUniquePath)
       throws IOException
   {
+    return pushToPath(dataSegmentFile, segment, this.getStorageDir(segment, useUniquePath));
+  }
+
+  @Override
+  public DataSegment pushToPath(File dataSegmentFile, DataSegment segment, String storageDirSuffix) throws IOException
+  {
     final File baseStorageDir = config.getStorageDirectory();
-    final File outDir = new File(baseStorageDir, this.getStorageDir(segment, useUniquePath));
+    final File outDir = new File(baseStorageDir, storageDirSuffix);
 
     log.debug("Copying segment[%s] to local filesystem at location[%s]", segment.getId(), outDir.toString());
 
@@ -81,7 +87,7 @@ public class LocalDataSegmentPusher implements DataSegmentPusher
                     .withBinaryVersion(SegmentUtils.getVersionFromDir(dataSegmentFile));
     }
 
-    final File tmpOutDir = new File(baseStorageDir, makeIntermediateDir());
+    final File tmpOutDir = new File(config.getStorageDirectory(), makeIntermediateDir());
     log.debug("Creating intermediate directory[%s] for segment[%s].", tmpOutDir.toString(), segment.getId());
     org.apache.commons.io.FileUtils.forceMkdir(tmpOutDir);
 
@@ -90,8 +96,8 @@ public class LocalDataSegmentPusher implements DataSegmentPusher
       final long size = compressSegment(dataSegmentFile, tmpIndexFile);
 
       final DataSegment dataSegment = segment.withLoadSpec(makeLoadSpec(new File(outDir, INDEX_FILENAME).toURI()))
-                                       .withSize(size)
-                                       .withBinaryVersion(SegmentUtils.getVersionFromDir(dataSegmentFile));
+                                             .withSize(size)
+                                             .withBinaryVersion(SegmentUtils.getVersionFromDir(dataSegmentFile));
 
       org.apache.commons.io.FileUtils.forceMkdir(outDir);
       final File indexFileTarget = new File(outDir, tmpIndexFile.getName());
diff --git a/services/src/main/java/org/apache/druid/cli/CliMiddleManager.java b/services/src/main/java/org/apache/druid/cli/CliMiddleManager.java
index 40a0a5a..e98c95f 100644
--- a/services/src/main/java/org/apache/druid/cli/CliMiddleManager.java
+++ b/services/src/main/java/org/apache/druid/cli/CliMiddleManager.java
@@ -61,6 +61,7 @@ import org.apache.druid.indexing.worker.WorkerTaskMonitor;
 import org.apache.druid.indexing.worker.config.WorkerConfig;
 import org.apache.druid.indexing.worker.http.TaskManagementResource;
 import org.apache.druid.indexing.worker.http.WorkerResource;
+import org.apache.druid.indexing.worker.shuffle.DeepStorageIntermediaryDataManager;
 import org.apache.druid.indexing.worker.shuffle.IntermediaryDataManager;
 import org.apache.druid.indexing.worker.shuffle.LocalIntermediaryDataManager;
 import org.apache.druid.indexing.worker.shuffle.ShuffleModule;
@@ -185,6 +186,7 @@ public class CliMiddleManager extends ServerRunnable
                 Key.get(IntermediaryDataManager.class)
             );
             biddy.addBinding("local").to(LocalIntermediaryDataManager.class);
+            biddy.addBinding("deepstore").to(DeepStorageIntermediaryDataManager.class).in(LazySingleton.class);
           }
 
           @Provides
diff --git a/services/src/main/java/org/apache/druid/cli/CliPeon.java b/services/src/main/java/org/apache/druid/cli/CliPeon.java
index 8c1fb00..ab02a07 100644
--- a/services/src/main/java/org/apache/druid/cli/CliPeon.java
+++ b/services/src/main/java/org/apache/druid/cli/CliPeon.java
@@ -77,6 +77,7 @@ import org.apache.druid.indexing.common.config.TaskStorageConfig;
 import org.apache.druid.indexing.common.stats.DropwizardRowIngestionMetersFactory;
 import org.apache.druid.indexing.common.task.IndexTaskClientFactory;
 import org.apache.druid.indexing.common.task.Task;
+import org.apache.druid.indexing.common.task.batch.parallel.DeepStorageShuffleClient;
 import org.apache.druid.indexing.common.task.batch.parallel.HttpShuffleClient;
 import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexSupervisorTaskClient;
 import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexTaskClientFactory;
@@ -88,6 +89,7 @@ import org.apache.druid.indexing.overlord.TaskRunner;
 import org.apache.druid.indexing.overlord.TaskStorage;
 import org.apache.druid.indexing.worker.executor.ExecutorLifecycle;
 import org.apache.druid.indexing.worker.executor.ExecutorLifecycleConfig;
+import org.apache.druid.indexing.worker.shuffle.DeepStorageIntermediaryDataManager;
 import org.apache.druid.indexing.worker.shuffle.IntermediaryDataManager;
 import org.apache.druid.indexing.worker.shuffle.LocalIntermediaryDataManager;
 import org.apache.druid.java.util.common.lifecycle.Lifecycle;
@@ -467,6 +469,7 @@ public class CliPeon extends GuiceRunnable
         Key.get(IntermediaryDataManager.class)
     );
     intermediaryDataManagerBiddy.addBinding("local").to(LocalIntermediaryDataManager.class).in(LazySingleton.class);
+    intermediaryDataManagerBiddy.addBinding("deepstore").to(DeepStorageIntermediaryDataManager.class).in(LazySingleton.class);
 
     PolyBind.createChoice(
         binder,
@@ -479,5 +482,6 @@ public class CliPeon extends GuiceRunnable
         Key.get(ShuffleClient.class)
     );
     shuffleClientBiddy.addBinding("local").to(HttpShuffleClient.class).in(LazySingleton.class);
+    shuffleClientBiddy.addBinding("deepstore").to(DeepStorageShuffleClient.class).in(LazySingleton.class);
   }
 }
diff --git a/website/.spelling b/website/.spelling
index 76c314a..2f7b016 100644
--- a/website/.spelling
+++ b/website/.spelling
@@ -240,6 +240,7 @@ datasketches
 datasource
 datasources
 dbcp
+deepstore
 denormalization
 denormalize
 denormalized
@@ -427,6 +428,7 @@ unintuitive
 unioned
 unmergeable
 unmerged
+UNNEST
 unparseable
 unparsed
 unsetting

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org