You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by ji...@apache.org on 2019/01/10 17:50:21 UTC

[incubator-druid] branch master updated: Add support maxRowsPerSegment for auto compaction (#6780)

This is an automated email from the ASF dual-hosted git repository.

jihoonson pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-druid.git


The following commit(s) were added to refs/heads/master by this push:
     new c35a39d  Add support maxRowsPerSegment for auto compaction (#6780)
c35a39d is described below

commit c35a39d70bf705aa49c3a3c97bab87959bb80a4e
Author: Jihoon Son <ji...@apache.org>
AuthorDate: Thu Jan 10 09:50:14 2019 -0800

    Add support maxRowsPerSegment for auto compaction (#6780)
    
    * Add support maxRowsPerSegment for auto compaction
    
    * fix build
    
    * fix build
    
    * fix teamcity
    
    * add test
    
    * fix test
    
    * address comment
---
 .../NewestSegmentFirstPolicyBenchmark.java         |   1 +
 docs/content/configuration/index.md                |  15 +-
 docs/content/ingestion/compaction.md               |   7 +-
 docs/content/ingestion/native_tasks.md             |  12 +-
 docs/content/tutorials/tutorial-batch.md           |   2 +-
 docs/content/tutorials/tutorial-compaction.md      |   4 +-
 docs/content/tutorials/tutorial-ingestion-spec.md  |   4 +-
 docs/content/tutorials/tutorial-rollup.md          |   2 +-
 docs/content/tutorials/tutorial-transform-spec.md  |   2 +-
 .../tutorial/compaction-final-index.json           |   2 +-
 .../quickstart/tutorial/compaction-init-index.json |   2 +-
 examples/quickstart/tutorial/deletion-index.json   |   2 +-
 examples/quickstart/tutorial/retention-index.json  |   2 +-
 examples/quickstart/tutorial/rollup-index.json     |   2 +-
 examples/quickstart/tutorial/transform-index.json  |   2 +-
 .../quickstart/tutorial/updates-append-index.json  |   2 +-
 .../quickstart/tutorial/updates-append-index2.json |   2 +-
 .../quickstart/tutorial/updates-init-index.json    |   2 +-
 .../tutorial/updates-overwrite-index.json          |   2 +-
 examples/quickstart/tutorial/wikipedia-index.json  |   2 +-
 .../kafka/KafkaIndexTaskTuningConfigTest.java      |   6 +-
 .../KafkaSupervisorTuningConfigTest.java           |   4 +-
 .../kinesis/KinesisIndexTaskTuningConfig.java      |  64 ++---
 .../kinesis/KinesisIndexTaskTuningConfigTest.java  |   6 +-
 .../KinesisSupervisorTuningConfigTest.java         |   4 +-
 .../index/RealtimeAppenderatorTuningConfig.java    |   2 +-
 .../druid/indexing/common/task/CompactionTask.java | 219 +++++++++++++---
 .../druid/indexing/common/task/IndexTask.java      |  98 ++++---
 .../task/batch/parallel/ParallelIndexSubTask.java  |  24 +-
 .../parallel/ParallelIndexSupervisorTask.java      |   3 +-
 .../batch/parallel/ParallelIndexTuningConfig.java  |   5 +-
 .../SeekableStreamIndexTaskTuningConfig.java       |   2 +-
 .../common/task/CompactionTaskRunTest.java         | 133 +++-------
 .../indexing/common/task/CompactionTaskTest.java   | 282 ++++++++++++++-------
 .../druid/indexing/common/task/IndexTaskTest.java  |  32 ++-
 .../druid/indexing/common/task/TaskSerdeTest.java  |  31 ++-
 .../ParallelIndexSupervisorTaskResourceTest.java   |   1 +
 .../ParallelIndexSupervisorTaskSerdeTest.java      |   1 +
 .../parallel/ParallelIndexSupervisorTaskTest.java  |   1 +
 .../druid/indexing/overlord/TaskLifecycleTest.java |   3 +
 .../resources/indexer/wikipedia_index_task.json    |   2 +-
 .../druid/client/indexing/ClientCompactQuery.java  |  18 +-
 .../indexing/ClientCompactQueryTuningConfig.java   |  89 ++++---
 .../client/indexing/HttpIndexingServiceClient.java |   2 +-
 .../client/indexing/IndexingServiceClient.java     |   2 +-
 .../realtime/appenderator/AppenderatorConfig.java  |   3 +-
 .../appenderator/AppenderatorDriverAddResult.java  |  15 +-
 .../coordinator/DataSourceCompactionConfig.java    | 113 +++++----
 .../helper/DruidCoordinatorSegmentCompactor.java   |   6 +-
 .../helper/NewestSegmentFirstIterator.java         |   2 +-
 .../client/indexing/NoopIndexingServiceClient.java |   2 +-
 .../DataSourceCompactionConfigTest.java            | 104 ++++++++
 .../DruidCoordinatorSegmentCompactorTest.java      |   4 +-
 .../helper/NewestSegmentFirstPolicyTest.java       |   1 +
 54 files changed, 840 insertions(+), 515 deletions(-)

diff --git a/benchmarks/src/main/java/org/apache/druid/server/coordinator/NewestSegmentFirstPolicyBenchmark.java b/benchmarks/src/main/java/org/apache/druid/server/coordinator/NewestSegmentFirstPolicyBenchmark.java
index 658a331..d84bbfd 100644
--- a/benchmarks/src/main/java/org/apache/druid/server/coordinator/NewestSegmentFirstPolicyBenchmark.java
+++ b/benchmarks/src/main/java/org/apache/druid/server/coordinator/NewestSegmentFirstPolicyBenchmark.java
@@ -97,6 +97,7 @@ public class NewestSegmentFirstPolicyBenchmark
               null,
               null,
               null,
+              null,
               null
           )
       );
diff --git a/docs/content/configuration/index.md b/docs/content/configuration/index.md
index 77a89ab..f112b15 100644
--- a/docs/content/configuration/index.md
+++ b/docs/content/configuration/index.md
@@ -806,10 +806,11 @@ A description of the compaction config is:
 |`keepSegmentGranularity`|Set [keepSegmentGranularity](../ingestion/compaction.html) to true for compactionTask.|no (default = true)|
 |`taskPriority`|[Priority](../ingestion/tasks.html#task-priorities) of compact task.|no (default = 25)|
 |`inputSegmentSizeBytes`|Total input segment size of a compactionTask.|no (default = 419430400)|
-|`targetCompactionSizeBytes`|The target segment size of compaction. The actual size of a compact segment might be slightly larger or smaller than this value.|no (default = 419430400)|
+|`targetCompactionSizeBytes`|The target segment size of compaction. The actual size of a compact segment might be slightly larger or smaller than this value. This configuration cannot be used together with `maxRowsPerSegment`.|no (default = 419430400 if `maxRowsPerSegment` is not specified)|
+|`maxRowsPerSegment`|Max number of rows per segment after compaction. This configuration cannot be used together with `targetCompactionSizeBytes`.|no|
 |`maxNumSegmentsToCompact`|Max number of segments to compact together.|no (default = 150)|
 |`skipOffsetFromLatest`|The offset for searching segments to be compacted. Strongly recommended to set for realtime dataSources. |no (default = "P1D")|
-|`tuningConfig`|Tuning config for compact tasks. See below [Compact Task TuningConfig](#compact-task-tuningconfig).|no|
+|`tuningConfig`|Tuning config for compact tasks. See [Compaction TuningConfig](#compaction-tuningconfig).|no|
 |`taskContext`|[Task context](../ingestion/tasks.html#task-context) for compact tasks.|no|
 
 An example of compaction config is:
@@ -822,6 +823,16 @@ An example of compaction config is:
 
 For realtime dataSources, it's recommended to set `skipOffsetFromLatest` to some sufficiently large value to avoid frequent compact task failures.
 
+##### Compaction TuningConfig
+
+|Property|Description|Required|
+|--------|-----------|--------|
+|`maxRowsInMemory`|See [tuningConfig for indexTask](../ingestion/native_tasks.html#tuningconfig)|no (default = 1000000)|
+|`maxTotalRows`|See [tuningConfig for indexTask](../ingestion/native_tasks.html#tuningconfig)|no (default = 20000000)|
+|`indexSpec`|See [IndexSpec](../ingestion/native_tasks.html#indexspec)|no|
+|`maxPendingPersists`|See [tuningConfig for indexTask](../ingestion/native_tasks.html#tuningconfig)|no (default = 0 (meaning one persist can be running concurrently with ingestion, and none can be queued up))|
+|`pushTimeout`|See [tuningConfig for indexTask](../ingestion/native_tasks.html#tuningconfig)|no (default = 0)|
+
 ## Overlord
 
 For general Overlord Node information, see [here](../design/indexing-service.html).
diff --git a/docs/content/ingestion/compaction.md b/docs/content/ingestion/compaction.md
index 2991584..4d1b71b 100644
--- a/docs/content/ingestion/compaction.md
+++ b/docs/content/ingestion/compaction.md
@@ -47,10 +47,11 @@ Compaction tasks merge all segments of the given interval. The syntax is:
 |`id`|Task id|No|
 |`dataSource`|DataSource name to be compacted|Yes|
 |`interval`|Interval of segments to be compacted|Yes|
-|`dimensions`|Custom dimensionsSpec. compaction task will use this dimensionsSpec if exist instead of generating one. See below for more details.|No|
+|`dimensionsSpec`|Custom dimensionsSpec. Compaction task will use this dimensionsSpec if exist instead of generating one. See below for more details.|No|
+|`metricsSpec`|Custom metricsSpec. Compaction task will use this metricsSpec if specified rather than generating one.|No|
 |`segmentGranularity`|If this is set, compactionTask will change the segment granularity for the given interval. See [segmentGranularity of Uniform Granularity Spec](./ingestion-spec.html#uniform-granularity-spec) for more details. See the below table for the behavior.|No|
 |`keepSegmentGranularity`|Deprecated. Please use `segmentGranularity` instead. See the below table for its behavior.|No|
-|`targetCompactionSizeBytes`|Target segment size after comapction. Cannot be used with `targetPartitionSize`, `maxTotalRows`, and `numShards` in tuningConfig.|No|
+|`targetCompactionSizeBytes`|Target segment size after comapction. Cannot be used with `maxRowsPerSegment`, `maxTotalRows`, and `numShards` in tuningConfig.|No|
 |`tuningConfig`|[Index task tuningConfig](../ingestion/native_tasks.html#tuningconfig)|No|
 |`context`|[Task context](../ingestion/locking-and-priority.html#task-context)|No|
 
@@ -77,7 +78,7 @@ An example of compaction task is
 
 This compaction task reads _all segments_ of the interval `2017-01-01/2018-01-01` and results in new segments.
 Since both `segmentGranularity` and `keepSegmentGranularity` are null, the original segment granularity will be remained and not changed after compaction.
-To control the number of result segments per time chunk, you can set `targetPartitionSize` or `numShards`. See [indexTuningConfig](../ingestion/native_tasks.html#tuningconfig) for more details.
+To control the number of result segments per time chunk, you can set [maxRowsPerSegment](../configuration/index.html#compaction-dynamic-configuration) or [numShards](../ingestion/native_tasks.html#tuningconfig).
 Please note that you can run multiple compactionTasks at the same time. For example, you can run 12 compactionTasks per month instead of running a single task for the entire year.
 
 A compaction task internally generates an `index` task spec for performing compaction work with some fixed parameters.
diff --git a/docs/content/ingestion/native_tasks.md b/docs/content/ingestion/native_tasks.md
index f4d4061..1f2c453 100644
--- a/docs/content/ingestion/native_tasks.md
+++ b/docs/content/ingestion/native_tasks.md
@@ -152,11 +152,11 @@ The tuningConfig is optional and default parameters will be used if no tuningCon
 |property|description|default|required?|
 |--------|-----------|-------|---------|
 |type|The task type, this should always be `index_parallel`.|none|yes|
-|targetPartitionSize|Used in sharding. Determines how many rows are in each segment.|5000000|no|
+|maxRowsPerSegment|Used in sharding. Determines how many rows are in each segment.|5000000|no|
 |maxRowsInMemory|Used in determining when intermediate persists to disk should occur. Normally user does not need to set this, but depending on the nature of data, if rows are short in terms of bytes, user may not want to store a million rows in memory and this value should be set.|1000000|no|
 |maxBytesInMemory|Used in determining when intermediate persists to disk should occur. Normally this is computed internally and user does not need to set it. This value represents number of bytes to aggregate in heap memory before persisting. This is based on a rough estimate of memory usage and not actual usage. The maximum heap memory usage for indexing is maxBytesInMemory * (2 + maxPendingPersists)|1/6 of max JVM memory|no|
 |maxTotalRows|Total number of rows in segments waiting for being pushed. Used in determining when intermediate pushing should occur.|20000000|no|
-|numShards|Directly specify the number of shards to create. If this is specified and 'intervals' is specified in the granularitySpec, the index task can skip the determine intervals/partitions pass through the data. numShards cannot be specified if targetPartitionSize is set.|null|no|
+|numShards|Directly specify the number of shards to create. If this is specified and 'intervals' is specified in the granularitySpec, the index task can skip the determine intervals/partitions pass through the data. numShards cannot be specified if maxRowsPerSegment is set.|null|no|
 |indexSpec|defines segment storage format options to be used at indexing time, see [IndexSpec](#indexspec)|null|no|
 |maxPendingPersists|Maximum number of persists that can be pending but not started. If this limit would be exceeded by a new intermediate persist, ingestion will block until the currently-running persist finishes. Maximum heap memory usage for indexing scales with maxRowsInMemory * (2 + maxPendingPersists).|0 (meaning one persist can be running concurrently with ingestion, and none can be queued up)|no|
 |forceExtendableShardSpecs|Forces use of extendable shardSpecs. Experimental feature intended for use with the [Kafka indexing service extension](../development/extensions-core/kafka-ingestion.html).|false|no|
@@ -337,7 +337,7 @@ An example of the result is
       },
       "tuningConfig": {
         "type": "index_parallel",
-        "targetPartitionSize": 5000000,
+        "maxRowsPerSegment": 5000000,
         "maxRowsInMemory": 1000000,
         "maxTotalRows": 20000000,
         "numShards": null,
@@ -454,7 +454,7 @@ The Local Index Task is designed to be used for smaller data sets. The task exec
     },
     "tuningConfig" : {
       "type" : "index",
-      "targetPartitionSize" : 5000000,
+      "maxRowsPerSegment" : 5000000,
       "maxRowsInMemory" : 1000000
     }
   }
@@ -491,11 +491,11 @@ The tuningConfig is optional and default parameters will be used if no tuningCon
 |property|description|default|required?|
 |--------|-----------|-------|---------|
 |type|The task type, this should always be "index".|none|yes|
-|targetPartitionSize|Used in sharding. Determines how many rows are in each segment.|5000000|no|
+|maxRowsPerSegment|Used in sharding. Determines how many rows are in each segment.|5000000|no|
 |maxRowsInMemory|Used in determining when intermediate persists to disk should occur. Normally user does not need to set this, but depending on the nature of data, if rows are short in terms of bytes, user may not want to store a million rows in memory and this value should be set.|1000000|no|
 |maxBytesInMemory|Used in determining when intermediate persists to disk should occur. Normally this is computed internally and user does not need to set it. This value represents number of bytes to aggregate in heap memory before persisting. This is based on a rough estimate of memory usage and not actual usage. The maximum heap memory usage for indexing is maxBytesInMemory * (2 + maxPendingPersists)|1/6 of max JVM memory|no|
 |maxTotalRows|Total number of rows in segments waiting for being pushed. Used in determining when intermediate pushing should occur.|20000000|no|
-|numShards|Directly specify the number of shards to create. If this is specified and 'intervals' is specified in the granularitySpec, the index task can skip the determine intervals/partitions pass through the data. numShards cannot be specified if targetPartitionSize is set.|null|no|
+|numShards|Directly specify the number of shards to create. If this is specified and 'intervals' is specified in the granularitySpec, the index task can skip the determine intervals/partitions pass through the data. numShards cannot be specified if maxRowsPerSegment is set.|null|no|
 |partitionDimensions|The dimensions to partition on. Leave blank to select all dimensions. Only used with `forceGuaranteedRollup` = true, will be ignored otherwise.|null|no|
 |indexSpec|defines segment storage format options to be used at indexing time, see [IndexSpec](#indexspec)|null|no|
 |maxPendingPersists|Maximum number of persists that can be pending but not started. If this limit would be exceeded by a new intermediate persist, ingestion will block until the currently-running persist finishes. Maximum heap memory usage for indexing scales with maxRowsInMemory * (2 + maxPendingPersists).|0 (meaning one persist can be running concurrently with ingestion, and none can be queued up)|no|
diff --git a/docs/content/tutorials/tutorial-batch.md b/docs/content/tutorials/tutorial-batch.md
index ebe10bb..972569f 100644
--- a/docs/content/tutorials/tutorial-batch.md
+++ b/docs/content/tutorials/tutorial-batch.md
@@ -98,7 +98,7 @@ which has been configured to read the `quickstart/tutorial/wikiticker-2015-09-12
     },
     "tuningConfig" : {
       "type" : "index",
-      "targetPartitionSize" : 5000000,
+      "maxRowsPerSegment" : 5000000,
       "maxRowsInMemory" : 25000,
       "forceExtendableShardSpecs" : true
     }
diff --git a/docs/content/tutorials/tutorial-compaction.md b/docs/content/tutorials/tutorial-compaction.md
index ac4445e..daf1f3c 100644
--- a/docs/content/tutorials/tutorial-compaction.md
+++ b/docs/content/tutorials/tutorial-compaction.md
@@ -74,7 +74,7 @@ We have included a compaction task spec for this tutorial datasource at `quickst
   "interval": "2015-09-12/2015-09-13",
   "tuningConfig" : {
     "type" : "index",
-    "targetPartitionSize" : 5000000,
+    "maxRowsPerSegment" : 5000000,
     "maxRowsInMemory" : 25000,
     "forceExtendableShardSpecs" : true
   }
@@ -85,7 +85,7 @@ This will compact all segments for the interval `2015-09-12/2015-09-13` in the `
 
 The parameters in the `tuningConfig` control how many segments will be present in the compacted set of segments. 
 
-In this tutorial example, only one compacted segment will be created, as the 39244 rows in the input is less than the 5000000 `targetPartitionSize`.
+In this tutorial example, only one compacted segment will be created, as the 39244 rows in the input is less than the 5000000 `maxRowsPerSegment`.
 
 Let's submit this task now:
 
diff --git a/docs/content/tutorials/tutorial-ingestion-spec.md b/docs/content/tutorials/tutorial-ingestion-spec.md
index 1c939ae..3311a6f 100644
--- a/docs/content/tutorials/tutorial-ingestion-spec.md
+++ b/docs/content/tutorials/tutorial-ingestion-spec.md
@@ -564,7 +564,7 @@ As an example, let's add a `tuningConfig` that sets a target segment size for th
 ```json
     "tuningConfig" : {
       "type" : "index",
-      "targetPartitionSize" : 5000000
+      "maxRowsPerSegment" : 5000000
     }
 ```
 
@@ -623,7 +623,7 @@ We've finished defining the ingestion spec, it should now look like the followin
     },
     "tuningConfig" : {
       "type" : "index",
-      "targetPartitionSize" : 5000000
+      "maxRowsPerSegment" : 5000000
     }
   }
 }
diff --git a/docs/content/tutorials/tutorial-rollup.md b/docs/content/tutorials/tutorial-rollup.md
index 1828dba..013fa0e 100644
--- a/docs/content/tutorials/tutorial-rollup.md
+++ b/docs/content/tutorials/tutorial-rollup.md
@@ -99,7 +99,7 @@ We'll ingest this data using the following ingestion task spec, located at `quic
     },
     "tuningConfig" : {
       "type" : "index",
-      "targetPartitionSize" : 5000000,
+      "maxRowsPerSegment" : 5000000,
       "maxRowsInMemory" : 25000,
       "forceExtendableShardSpecs" : true
     }
diff --git a/docs/content/tutorials/tutorial-transform-spec.md b/docs/content/tutorials/tutorial-transform-spec.md
index ba9933c..60d7bf8 100644
--- a/docs/content/tutorials/tutorial-transform-spec.md
+++ b/docs/content/tutorials/tutorial-transform-spec.md
@@ -114,7 +114,7 @@ We will ingest the sample data using the following spec, which demonstrates the
     },
     "tuningConfig" : {
       "type" : "index",
-      "targetPartitionSize" : 5000000,
+      "maxRowsPerSegment" : 5000000,
       "maxRowsInMemory" : 25000,
       "forceExtendableShardSpecs" : true
     }
diff --git a/examples/quickstart/tutorial/compaction-final-index.json b/examples/quickstart/tutorial/compaction-final-index.json
index b84e2ed..a7ff422 100644
--- a/examples/quickstart/tutorial/compaction-final-index.json
+++ b/examples/quickstart/tutorial/compaction-final-index.json
@@ -4,7 +4,7 @@
   "interval": "2015-09-12/2015-09-13",
   "tuningConfig" : {
     "type" : "index",
-    "targetPartitionSize" : 5000000,
+    "maxRowsPerSegment" : 5000000,
     "maxRowsInMemory" : 25000,
     "forceExtendableShardSpecs" : true
   }
diff --git a/examples/quickstart/tutorial/compaction-init-index.json b/examples/quickstart/tutorial/compaction-init-index.json
index 74a00df..90ee826 100644
--- a/examples/quickstart/tutorial/compaction-init-index.json
+++ b/examples/quickstart/tutorial/compaction-init-index.json
@@ -56,7 +56,7 @@
     },
     "tuningConfig" : {
       "type" : "index",
-      "targetPartitionSize" : 5000000,
+      "maxRowsPerSegment" : 5000000,
       "maxRowsInMemory" : 25000,
       "forceExtendableShardSpecs" : true
     }
diff --git a/examples/quickstart/tutorial/deletion-index.json b/examples/quickstart/tutorial/deletion-index.json
index 57d1ccd..d84dc21 100644
--- a/examples/quickstart/tutorial/deletion-index.json
+++ b/examples/quickstart/tutorial/deletion-index.json
@@ -56,7 +56,7 @@
     },
     "tuningConfig" : {
       "type" : "index",
-      "targetPartitionSize" : 5000000,
+      "maxRowsPerSegment" : 5000000,
       "maxRowsInMemory" : 25000,
       "forceExtendableShardSpecs" : true
     }
diff --git a/examples/quickstart/tutorial/retention-index.json b/examples/quickstart/tutorial/retention-index.json
index 613ddcf..9aee62a 100644
--- a/examples/quickstart/tutorial/retention-index.json
+++ b/examples/quickstart/tutorial/retention-index.json
@@ -56,7 +56,7 @@
     },
     "tuningConfig" : {
       "type" : "index",
-      "targetPartitionSize" : 5000000,
+      "maxRowsPerSegment" : 5000000,
       "maxRowsInMemory" : 25000,
       "forceExtendableShardSpecs" : true
     }
diff --git a/examples/quickstart/tutorial/rollup-index.json b/examples/quickstart/tutorial/rollup-index.json
index 482c751..b4d96aa 100644
--- a/examples/quickstart/tutorial/rollup-index.json
+++ b/examples/quickstart/tutorial/rollup-index.json
@@ -43,7 +43,7 @@
     },
     "tuningConfig" : {
       "type" : "index",
-      "targetPartitionSize" : 5000000,
+      "maxRowsPerSegment" : 5000000,
       "maxRowsInMemory" : 25000,
       "forceExtendableShardSpecs" : true
     }
diff --git a/examples/quickstart/tutorial/transform-index.json b/examples/quickstart/tutorial/transform-index.json
index 0dfcef3..ea8ead7 100644
--- a/examples/quickstart/tutorial/transform-index.json
+++ b/examples/quickstart/tutorial/transform-index.json
@@ -65,7 +65,7 @@
     },
     "tuningConfig" : {
       "type" : "index",
-      "targetPartitionSize" : 5000000,
+      "maxRowsPerSegment" : 5000000,
       "maxRowsInMemory" : 25000,
       "forceExtendableShardSpecs" : true
     }
diff --git a/examples/quickstart/tutorial/updates-append-index.json b/examples/quickstart/tutorial/updates-append-index.json
index dfa9887..b8de06a 100644
--- a/examples/quickstart/tutorial/updates-append-index.json
+++ b/examples/quickstart/tutorial/updates-append-index.json
@@ -51,7 +51,7 @@
     },
     "tuningConfig" : {
       "type" : "index",
-      "targetPartitionSize" : 5000000,
+      "maxRowsPerSegment" : 5000000,
       "maxRowsInMemory" : 25000,
       "forceExtendableShardSpecs" : true
     }
diff --git a/examples/quickstart/tutorial/updates-append-index2.json b/examples/quickstart/tutorial/updates-append-index2.json
index 0e7404a..a5c49c4 100644
--- a/examples/quickstart/tutorial/updates-append-index2.json
+++ b/examples/quickstart/tutorial/updates-append-index2.json
@@ -41,7 +41,7 @@
     },
     "tuningConfig" : {
       "type" : "index",
-      "targetPartitionSize" : 5000000,
+      "maxRowsPerSegment" : 5000000,
       "maxRowsInMemory" : 25000,
       "forceExtendableShardSpecs" : true
     }
diff --git a/examples/quickstart/tutorial/updates-init-index.json b/examples/quickstart/tutorial/updates-init-index.json
index 52a4aef..3620ff1 100644
--- a/examples/quickstart/tutorial/updates-init-index.json
+++ b/examples/quickstart/tutorial/updates-init-index.json
@@ -41,7 +41,7 @@
     },
     "tuningConfig" : {
       "type" : "index",
-      "targetPartitionSize" : 5000000,
+      "maxRowsPerSegment" : 5000000,
       "maxRowsInMemory" : 25000,
       "forceExtendableShardSpecs" : true
     }
diff --git a/examples/quickstart/tutorial/updates-overwrite-index.json b/examples/quickstart/tutorial/updates-overwrite-index.json
index ac4785e..95f2e2d 100644
--- a/examples/quickstart/tutorial/updates-overwrite-index.json
+++ b/examples/quickstart/tutorial/updates-overwrite-index.json
@@ -41,7 +41,7 @@
     },
     "tuningConfig" : {
       "type" : "index",
-      "targetPartitionSize" : 5000000,
+      "maxRowsPerSegment" : 5000000,
       "maxRowsInMemory" : 25000,
       "forceExtendableShardSpecs" : true
     }
diff --git a/examples/quickstart/tutorial/wikipedia-index.json b/examples/quickstart/tutorial/wikipedia-index.json
index 02177d1..888e08e 100644
--- a/examples/quickstart/tutorial/wikipedia-index.json
+++ b/examples/quickstart/tutorial/wikipedia-index.json
@@ -56,7 +56,7 @@
     },
     "tuningConfig" : {
       "type" : "index",
-      "targetPartitionSize" : 5000000,
+      "maxRowsPerSegment" : 5000000,
       "maxRowsInMemory" : 25000,
       "forceExtendableShardSpecs" : true
     }
diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTuningConfigTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTuningConfigTest.java
index abcf07f..14ecd4e 100644
--- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTuningConfigTest.java
+++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTuningConfigTest.java
@@ -58,7 +58,7 @@ public class KafkaIndexTaskTuningConfigTest
 
     Assert.assertNotNull(config.getBasePersistDirectory());
     Assert.assertEquals(1000000, config.getMaxRowsInMemory());
-    Assert.assertEquals(5_000_000, config.getMaxRowsPerSegment());
+    Assert.assertEquals(5_000_000, config.getMaxRowsPerSegment().intValue());
     Assert.assertEquals(null, config.getMaxTotalRows());
     Assert.assertEquals(new Period("PT10M"), config.getIntermediatePersistPeriod());
     Assert.assertEquals(0, config.getMaxPendingPersists());
@@ -94,7 +94,7 @@ public class KafkaIndexTaskTuningConfigTest
 
     Assert.assertEquals(new File("/tmp/xxx"), config.getBasePersistDirectory());
     Assert.assertEquals(100, config.getMaxRowsInMemory());
-    Assert.assertEquals(100, config.getMaxRowsPerSegment());
+    Assert.assertEquals(100, config.getMaxRowsPerSegment().intValue());
     Assert.assertNotEquals(null, config.getMaxTotalRows());
     Assert.assertEquals(1000, config.getMaxTotalRows().longValue());
     Assert.assertEquals(new Period("PT1H"), config.getIntermediatePersistPeriod());
@@ -134,7 +134,7 @@ public class KafkaIndexTaskTuningConfigTest
     KafkaIndexTaskTuningConfig copy = (KafkaIndexTaskTuningConfig) original.convertToTaskTuningConfig();
 
     Assert.assertEquals(1, copy.getMaxRowsInMemory());
-    Assert.assertEquals(2, copy.getMaxRowsPerSegment());
+    Assert.assertEquals(2, copy.getMaxRowsPerSegment().intValue());
     Assert.assertNotEquals(null, copy.getMaxTotalRows());
     Assert.assertEquals(10L, copy.getMaxTotalRows().longValue());
     Assert.assertEquals(new Period("PT3S"), copy.getIntermediatePersistPeriod());
diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTuningConfigTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTuningConfigTest.java
index 45470ff..3312a10 100644
--- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTuningConfigTest.java
+++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTuningConfigTest.java
@@ -59,7 +59,7 @@ public class KafkaSupervisorTuningConfigTest
 
     Assert.assertNotNull(config.getBasePersistDirectory());
     Assert.assertEquals(1000000, config.getMaxRowsInMemory());
-    Assert.assertEquals(5_000_000, config.getMaxRowsPerSegment());
+    Assert.assertEquals(5_000_000, config.getMaxRowsPerSegment().intValue());
     Assert.assertEquals(new Period("PT10M"), config.getIntermediatePersistPeriod());
     Assert.assertEquals(0, config.getMaxPendingPersists());
     Assert.assertEquals(new IndexSpec(), config.getIndexSpec());
@@ -105,7 +105,7 @@ public class KafkaSupervisorTuningConfigTest
 
     Assert.assertEquals(new File("/tmp/xxx"), config.getBasePersistDirectory());
     Assert.assertEquals(100, config.getMaxRowsInMemory());
-    Assert.assertEquals(100, config.getMaxRowsPerSegment());
+    Assert.assertEquals(100, config.getMaxRowsPerSegment().intValue());
     Assert.assertEquals(new Period("PT1H"), config.getIntermediatePersistPeriod());
     Assert.assertEquals(100, config.getMaxPendingPersists());
     Assert.assertEquals(true, config.isReportParseExceptions());
diff --git a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTuningConfig.java b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTuningConfig.java
index 95ee278..534330e 100644
--- a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTuningConfig.java
+++ b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTuningConfig.java
@@ -188,61 +188,29 @@ public class KinesisIndexTaskTuningConfig extends SeekableStreamIndexTaskTuningC
     if (o == null || getClass() != o.getClass()) {
       return false;
     }
+    if (!super.equals(o)) {
+      return false;
+    }
     KinesisIndexTaskTuningConfig that = (KinesisIndexTaskTuningConfig) o;
-    return getMaxRowsInMemory() == that.getMaxRowsInMemory() &&
-           getMaxBytesInMemory() == that.getMaxBytesInMemory() &&
-           getMaxRowsPerSegment() == that.getMaxRowsPerSegment() &&
-           getMaxPendingPersists() == that.getMaxPendingPersists() &&
-           getBuildV9Directly() == that.getBuildV9Directly() &&
-           isReportParseExceptions() == that.isReportParseExceptions() &&
-           getHandoffConditionTimeout() == that.getHandoffConditionTimeout() &&
-           isResetOffsetAutomatically() == that.isResetOffsetAutomatically() &&
-           isSkipSequenceNumberAvailabilityCheck() == that.isSkipSequenceNumberAvailabilityCheck() &&
-           getRecordBufferSize() == that.getRecordBufferSize() &&
-           getRecordBufferOfferTimeout() == that.getRecordBufferOfferTimeout() &&
-           getRecordBufferFullWait() == that.getRecordBufferFullWait() &&
-           getFetchSequenceNumberTimeout() == that.getFetchSequenceNumberTimeout() &&
-           isLogParseExceptions() == that.isLogParseExceptions() &&
-           getMaxParseExceptions() == that.getMaxParseExceptions() &&
-           getMaxSavedParseExceptions() == that.getMaxSavedParseExceptions() &&
-           getMaxRecordsPerPoll() == that.getMaxRecordsPerPoll() &&
-           Objects.equals(getIntermediatePersistPeriod(), that.getIntermediatePersistPeriod()) &&
-           Objects.equals(getBasePersistDirectory(), that.getBasePersistDirectory()) &&
-           Objects.equals(getIndexSpec(), that.getIndexSpec()) &&
-           Objects.equals(getFetchThreads(), that.getFetchThreads()) &&
-           Objects.equals(getSegmentWriteOutMediumFactory(), that.getSegmentWriteOutMediumFactory()) &&
-           Objects.equals(getMaxTotalRows(), that.getMaxTotalRows()) &&
-           Objects.equals(getIntermediateHandoffPeriod(), that.getIntermediateHandoffPeriod());
+    return recordBufferSize == that.recordBufferSize &&
+           recordBufferOfferTimeout == that.recordBufferOfferTimeout &&
+           recordBufferFullWait == that.recordBufferFullWait &&
+           fetchSequenceNumberTimeout == that.fetchSequenceNumberTimeout &&
+           maxRecordsPerPoll == that.maxRecordsPerPoll &&
+           Objects.equals(fetchThreads, that.fetchThreads);
   }
 
   @Override
   public int hashCode()
   {
     return Objects.hash(
-        getMaxRowsInMemory(),
-        getMaxBytesInMemory(),
-        getMaxRowsPerSegment(),
-        getMaxTotalRows(),
-        getIntermediatePersistPeriod(),
-        getBasePersistDirectory(),
-        getMaxPendingPersists(),
-        getIndexSpec(),
-        true,
-        isReportParseExceptions(),
-        getHandoffConditionTimeout(),
-        isResetOffsetAutomatically(),
-        isSkipSequenceNumberAvailabilityCheck(),
-        getRecordBufferSize(),
-        getRecordBufferOfferTimeout(),
-        getRecordBufferFullWait(),
-        getFetchSequenceNumberTimeout(),
-        getFetchThreads(),
-        getSegmentWriteOutMediumFactory(),
-        isLogParseExceptions(),
-        getMaxParseExceptions(),
-        getMaxSavedParseExceptions(),
-        getMaxRecordsPerPoll(),
-        getIntermediateHandoffPeriod()
+        super.hashCode(),
+        recordBufferSize,
+        recordBufferOfferTimeout,
+        recordBufferFullWait,
+        fetchSequenceNumberTimeout,
+        fetchThreads,
+        maxRecordsPerPoll
     );
   }
 
diff --git a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTuningConfigTest.java b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTuningConfigTest.java
index 4e967c4..2983324 100644
--- a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTuningConfigTest.java
+++ b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTuningConfigTest.java
@@ -65,7 +65,7 @@ public class KinesisIndexTaskTuningConfigTest
 
     Assert.assertNotNull(config.getBasePersistDirectory());
     Assert.assertEquals(1000000, config.getMaxRowsInMemory());
-    Assert.assertEquals(5_000_000, config.getMaxRowsPerSegment());
+    Assert.assertEquals(5_000_000, config.getMaxRowsPerSegment().intValue());
     Assert.assertEquals(new Period("PT10M"), config.getIntermediatePersistPeriod());
     Assert.assertEquals(0, config.getMaxPendingPersists());
     Assert.assertEquals(new IndexSpec(), config.getIndexSpec());
@@ -115,7 +115,7 @@ public class KinesisIndexTaskTuningConfigTest
 
     Assert.assertEquals(new File("/tmp/xxx"), config.getBasePersistDirectory());
     Assert.assertEquals(100, config.getMaxRowsInMemory());
-    Assert.assertEquals(100, config.getMaxRowsPerSegment());
+    Assert.assertEquals(100, config.getMaxRowsPerSegment().intValue());
     Assert.assertEquals(new Period("PT1H"), config.getIntermediatePersistPeriod());
     Assert.assertEquals(100, config.getMaxPendingPersists());
     Assert.assertTrue(config.getBuildV9Directly());
@@ -197,7 +197,7 @@ public class KinesisIndexTaskTuningConfigTest
 
     Assert.assertEquals(1, copy.getMaxRowsInMemory());
     Assert.assertEquals(3, copy.getMaxBytesInMemory());
-    Assert.assertEquals(2, copy.getMaxRowsPerSegment());
+    Assert.assertEquals(2, copy.getMaxRowsPerSegment().intValue());
     Assert.assertEquals(100L, (long) copy.getMaxTotalRows());
     Assert.assertEquals(new Period("PT3S"), copy.getIntermediatePersistPeriod());
     Assert.assertEquals(new File("/tmp/xxx"), copy.getBasePersistDirectory());
diff --git a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTuningConfigTest.java b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTuningConfigTest.java
index 370f3ea..45151b5 100644
--- a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTuningConfigTest.java
+++ b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTuningConfigTest.java
@@ -59,7 +59,7 @@ public class KinesisSupervisorTuningConfigTest
 
     Assert.assertNotNull(config.getBasePersistDirectory());
     Assert.assertEquals(1000000, config.getMaxRowsInMemory());
-    Assert.assertEquals(5_000_000, config.getMaxRowsPerSegment());
+    Assert.assertEquals(5_000_000, config.getMaxRowsPerSegment().intValue());
     Assert.assertEquals(new Period("PT10M"), config.getIntermediatePersistPeriod());
     Assert.assertEquals(0, config.getMaxPendingPersists());
     Assert.assertEquals(new IndexSpec(), config.getIndexSpec());
@@ -105,7 +105,7 @@ public class KinesisSupervisorTuningConfigTest
 
     Assert.assertEquals(new File("/tmp/xxx"), config.getBasePersistDirectory());
     Assert.assertEquals(100, config.getMaxRowsInMemory());
-    Assert.assertEquals(100, config.getMaxRowsPerSegment());
+    Assert.assertEquals(100, config.getMaxRowsPerSegment().intValue());
     Assert.assertEquals(new Period("PT1H"), config.getIntermediatePersistPeriod());
     Assert.assertEquals(100, config.getMaxPendingPersists());
     Assert.assertEquals(true, config.getBuildV9Directly());
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/index/RealtimeAppenderatorTuningConfig.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/index/RealtimeAppenderatorTuningConfig.java
index 6f2052d..c055202 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/index/RealtimeAppenderatorTuningConfig.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/index/RealtimeAppenderatorTuningConfig.java
@@ -149,7 +149,7 @@ public class RealtimeAppenderatorTuningConfig implements TuningConfig, Appendera
 
   @Override
   @JsonProperty
-  public int getMaxRowsPerSegment()
+  public Integer getMaxRowsPerSegment()
   {
     return maxRowsPerSegment;
   }
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java
index 69cd5c6..2bdf539 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java
@@ -105,7 +105,10 @@ public class CompactionTask extends AbstractTask
 
   private final Interval interval;
   private final List<DataSegment> segments;
+  @Nullable
   private final DimensionsSpec dimensionsSpec;
+  @Nullable
+  private final AggregatorFactory[] metricsSpec;
   @Deprecated
   @Nullable
   private final Boolean keepSegmentGranularity;
@@ -139,7 +142,9 @@ public class CompactionTask extends AbstractTask
       @JsonProperty("dataSource") final String dataSource,
       @Nullable @JsonProperty("interval") final Interval interval,
       @Nullable @JsonProperty("segments") final List<DataSegment> segments,
-      @Nullable @JsonProperty("dimensions") final DimensionsSpec dimensionsSpec,
+      @Nullable @JsonProperty("dimensions") final DimensionsSpec dimensions,
+      @Nullable @JsonProperty("dimensionsSpec") final DimensionsSpec dimensionsSpec,
+      @Nullable @JsonProperty("metricsSpec") final AggregatorFactory[] metricsSpec,
       @Nullable @JsonProperty("keepSegmentGranularity") @Deprecated final Boolean keepSegmentGranularity,
       @Nullable @JsonProperty("segmentGranularity") final Granularity segmentGranularity,
       @Nullable @JsonProperty("targetCompactionSizeBytes") final Long targetCompactionSizeBytes,
@@ -169,14 +174,15 @@ public class CompactionTask extends AbstractTask
 
     this.interval = interval;
     this.segments = segments;
-    this.dimensionsSpec = dimensionsSpec;
+    this.dimensionsSpec = dimensionsSpec == null ? dimensions : dimensionsSpec;
+    this.metricsSpec = metricsSpec;
     this.keepSegmentGranularity = keepSegmentGranularity;
     this.segmentGranularity = segmentGranularity;
     this.targetCompactionSizeBytes = targetCompactionSizeBytes;
     this.tuningConfig = tuningConfig;
     this.jsonMapper = jsonMapper;
     this.segmentProvider = segments == null ? new SegmentProvider(dataSource, interval) : new SegmentProvider(segments);
-    this.partitionConfigurationManager = new PartitionConfigurationManager(this.targetCompactionSizeBytes, tuningConfig);
+    this.partitionConfigurationManager = new PartitionConfigurationManager(targetCompactionSizeBytes, tuningConfig);
     this.authorizerMapper = authorizerMapper;
     this.chatHandlerProvider = chatHandlerProvider;
     this.rowIngestionMetersFactory = rowIngestionMetersFactory;
@@ -201,6 +207,12 @@ public class CompactionTask extends AbstractTask
   }
 
   @JsonProperty
+  public AggregatorFactory[] getMetricsSpec()
+  {
+    return metricsSpec;
+  }
+
+  @JsonProperty
   @Deprecated
   @Nullable
   public Boolean isKeepSegmentGranularity()
@@ -263,6 +275,7 @@ public class CompactionTask extends AbstractTask
           segmentProvider,
           partitionConfigurationManager,
           dimensionsSpec,
+          metricsSpec,
           keepSegmentGranularity,
           segmentGranularity,
           jsonMapper
@@ -321,7 +334,8 @@ public class CompactionTask extends AbstractTask
       final TaskToolbox toolbox,
       final SegmentProvider segmentProvider,
       final PartitionConfigurationManager partitionConfigurationManager,
-      final DimensionsSpec dimensionsSpec,
+      @Nullable final DimensionsSpec dimensionsSpec,
+      @Nullable final AggregatorFactory[] metricsSpec,
       @Nullable final Boolean keepSegmentGranularity,
       @Nullable final Granularity segmentGranularity,
       final ObjectMapper jsonMapper
@@ -357,6 +371,7 @@ public class CompactionTask extends AbstractTask
             segmentProvider.interval,
             queryableIndexAndSegments,
             dimensionsSpec,
+            metricsSpec,
             Granularities.ALL,
             jsonMapper
         );
@@ -388,6 +403,7 @@ public class CompactionTask extends AbstractTask
               interval,
               segmentsToCompact,
               dimensionsSpec,
+              metricsSpec,
               GranularityType.fromPeriod(interval.toPeriod()).getDefaultGranularity(),
               jsonMapper
           );
@@ -414,6 +430,7 @@ public class CompactionTask extends AbstractTask
             segmentProvider.interval,
             queryableIndexAndSegments,
             dimensionsSpec,
+            metricsSpec,
             segmentGranularity,
             jsonMapper
         );
@@ -462,27 +479,19 @@ public class CompactionTask extends AbstractTask
       String dataSource,
       Interval totalInterval,
       List<Pair<QueryableIndex, DataSegment>> queryableIndexAndSegments,
-      DimensionsSpec dimensionsSpec,
+      @Nullable DimensionsSpec dimensionsSpec,
+      @Nullable AggregatorFactory[] metricsSpec,
       Granularity segmentGranularity,
       ObjectMapper jsonMapper
   )
   {
-    // find merged aggregators
+    // check index metadata
     for (Pair<QueryableIndex, DataSegment> pair : queryableIndexAndSegments) {
       final QueryableIndex index = pair.lhs;
       if (index.getMetadata() == null) {
         throw new RE("Index metadata doesn't exist for segment[%s]", pair.rhs.getIdentifier());
       }
     }
-    final List<AggregatorFactory[]> aggregatorFactories = queryableIndexAndSegments
-        .stream()
-        .map(pair -> pair.lhs.getMetadata().getAggregators()) // We have already done null check on index.getMetadata()
-        .collect(Collectors.toList());
-    final AggregatorFactory[] mergedAggregators = AggregatorFactory.mergeAggregators(aggregatorFactories);
-
-    if (mergedAggregators == null) {
-      throw new ISE("Failed to merge aggregators[%s]", aggregatorFactories);
-    }
 
     // find granularity spec
     // set rollup only if rollup is set for all segments
@@ -500,21 +509,47 @@ public class CompactionTask extends AbstractTask
     );
 
     // find unique dimensions
-    final DimensionsSpec finalDimensionsSpec = dimensionsSpec == null ?
-                                               createDimensionsSpec(queryableIndexAndSegments) :
-                                               dimensionsSpec;
+    final DimensionsSpec finalDimensionsSpec = dimensionsSpec == null
+                                               ? createDimensionsSpec(queryableIndexAndSegments)
+                                               : dimensionsSpec;
+    final AggregatorFactory[] finalMetricsSpec = metricsSpec == null
+                                                 ? createMetricsSpec(queryableIndexAndSegments)
+                                                 : convertToCombiningFactories(metricsSpec);
     final InputRowParser parser = new NoopInputRowParser(new TimeAndDimsParseSpec(null, finalDimensionsSpec));
 
     return new DataSchema(
         dataSource,
         jsonMapper.convertValue(parser, JacksonUtils.TYPE_REFERENCE_MAP_STRING_OBJECT),
-        mergedAggregators,
+        finalMetricsSpec,
         granularitySpec,
         null,
         jsonMapper
     );
   }
 
+  private static AggregatorFactory[] createMetricsSpec(
+      List<Pair<QueryableIndex, DataSegment>> queryableIndexAndSegments
+  )
+  {
+    final List<AggregatorFactory[]> aggregatorFactories = queryableIndexAndSegments
+        .stream()
+        .map(pair -> pair.lhs.getMetadata().getAggregators()) // We have already done null check on index.getMetadata()
+        .collect(Collectors.toList());
+    final AggregatorFactory[] mergedAggregators = AggregatorFactory.mergeAggregators(aggregatorFactories);
+
+    if (mergedAggregators == null) {
+      throw new ISE("Failed to merge aggregators[%s]", aggregatorFactories);
+    }
+    return mergedAggregators;
+  }
+
+  private static AggregatorFactory[] convertToCombiningFactories(AggregatorFactory[] metricsSpec)
+  {
+    return Arrays.stream(metricsSpec)
+                 .map(AggregatorFactory::getCombiningFactory)
+                 .toArray(AggregatorFactory[]::new);
+  }
+
   private static DimensionsSpec createDimensionsSpec(List<Pair<QueryableIndex, DataSegment>> queryableIndices)
   {
     final BiMap<String, Integer> uniqueDims = HashBiMap.create();
@@ -739,7 +774,7 @@ public class CompactionTask extends AbstractTask
             targetCompactionSizeBytes,
             "targetCompactionSizeBytes"
         );
-        // Find IndexTuningConfig.targetPartitionSize which is the number of rows per segment.
+        // Find IndexTuningConfig.maxRowsPerSegment which is the number of rows per segment.
         // Assume that the segment size is proportional to the number of rows. We can improve this later.
         final long totalNumRows = queryableIndexAndSegments
             .stream()
@@ -755,17 +790,17 @@ public class CompactionTask extends AbstractTask
         }
 
         final double avgRowsPerByte = totalNumRows / (double) totalSizeBytes;
-        final int targetPartitionSize = Math.toIntExact(Math.round(avgRowsPerByte * nonNullTargetCompactionSizeBytes));
-        Preconditions.checkState(targetPartitionSize > 0, "Negative targetPartitionSize[%s]", targetPartitionSize);
+        final int maxRowsPerSegment = Math.toIntExact(Math.round(avgRowsPerByte * nonNullTargetCompactionSizeBytes));
+        Preconditions.checkState(maxRowsPerSegment > 0, "Negative maxRowsPerSegment[%s]", maxRowsPerSegment);
 
         log.info(
-            "Estimated targetPartitionSize[%d] = avgRowsPerByte[%f] * targetCompactionSizeBytes[%d]",
-            targetPartitionSize,
+            "Estimated maxRowsPerSegment[%d] = avgRowsPerByte[%f] * targetCompactionSizeBytes[%d]",
+            maxRowsPerSegment,
             avgRowsPerByte,
             nonNullTargetCompactionSizeBytes
         );
         return (tuningConfig == null ? IndexTuningConfig.createDefault() : tuningConfig)
-            .withTargetPartitionSize(targetPartitionSize);
+            .withMaxRowsPerSegment(maxRowsPerSegment);
       } else {
         return tuningConfig;
       }
@@ -773,7 +808,7 @@ public class CompactionTask extends AbstractTask
 
     /**
      * Check the validity of {@link #targetCompactionSizeBytes} and return a valid value. Note that
-     * targetCompactionSizeBytes cannot be used with {@link IndexTuningConfig#targetPartitionSize},
+     * targetCompactionSizeBytes cannot be used with {@link IndexTuningConfig#maxRowsPerSegment},
      * {@link IndexTuningConfig#maxTotalRows}, or {@link IndexTuningConfig#numShards} together.
      * {@link #hasPartitionConfig} checks one of those configs is set.
      *
@@ -788,15 +823,15 @@ public class CompactionTask extends AbstractTask
         @Nullable IndexTuningConfig tuningConfig
     )
     {
-      if (targetCompactionSizeBytes != null) {
+      if (targetCompactionSizeBytes != null && tuningConfig != null) {
         Preconditions.checkArgument(
             !hasPartitionConfig(tuningConfig),
-            "targetCompactionSizeBytes[%s] cannot be used with targetPartitionSize[%s], maxTotalRows[%s],"
+            "targetCompactionSizeBytes[%s] cannot be used with maxRowsPerSegment[%s], maxTotalRows[%s],"
             + " or numShards[%s] of tuningConfig",
             targetCompactionSizeBytes,
-            tuningConfig == null ? null : tuningConfig.getTargetPartitionSize(),
-            tuningConfig == null ? null : tuningConfig.getMaxTotalRows(),
-            tuningConfig == null ? null : tuningConfig.getNumShards()
+            tuningConfig.getMaxRowsPerSegment(),
+            tuningConfig.getMaxTotalRows(),
+            tuningConfig.getNumShards()
         );
         return targetCompactionSizeBytes;
       } else {
@@ -809,7 +844,7 @@ public class CompactionTask extends AbstractTask
     private static boolean hasPartitionConfig(@Nullable IndexTuningConfig tuningConfig)
     {
       if (tuningConfig != null) {
-        return tuningConfig.getTargetPartitionSize() != null
+        return tuningConfig.getMaxRowsPerSegment() != null
                || tuningConfig.getMaxTotalRows() != null
                || tuningConfig.getNumShards() != null;
       } else {
@@ -817,4 +852,124 @@ public class CompactionTask extends AbstractTask
       }
     }
   }
+
+  public static class Builder
+  {
+    private final String dataSource;
+    private final ObjectMapper jsonMapper;
+    private final AuthorizerMapper authorizerMapper;
+    private final ChatHandlerProvider chatHandlerProvider;
+    private final RowIngestionMetersFactory rowIngestionMetersFactory;
+
+    @Nullable
+    private Interval interval;
+    @Nullable
+    private List<DataSegment> segments;
+    @Nullable
+    private DimensionsSpec dimensionsSpec;
+    @Nullable
+    private AggregatorFactory[] metricsSpec;
+    @Nullable
+    private Boolean keepSegmentGranularity;
+    @Nullable
+    private Granularity segmentGranularity;
+    @Nullable
+    private Long targetCompactionSizeBytes;
+    @Nullable
+    private IndexTuningConfig tuningConfig;
+    @Nullable
+    private Map<String, Object> context;
+
+    public Builder(
+        String dataSource,
+        ObjectMapper jsonMapper,
+        AuthorizerMapper authorizerMapper,
+        ChatHandlerProvider chatHandlerProvider,
+        RowIngestionMetersFactory rowIngestionMetersFactory
+    )
+    {
+      this.dataSource = dataSource;
+      this.jsonMapper = jsonMapper;
+      this.authorizerMapper = authorizerMapper;
+      this.chatHandlerProvider = chatHandlerProvider;
+      this.rowIngestionMetersFactory = rowIngestionMetersFactory;
+    }
+
+    public Builder interval(Interval interval)
+    {
+      this.interval = interval;
+      return this;
+    }
+
+    public Builder segments(List<DataSegment> segments)
+    {
+      this.segments = segments;
+      return this;
+    }
+
+    public Builder dimensionsSpec(DimensionsSpec dimensionsSpec)
+    {
+      this.dimensionsSpec = dimensionsSpec;
+      return this;
+    }
+
+    public Builder metricsSpec(AggregatorFactory[] metricsSpec)
+    {
+      this.metricsSpec = metricsSpec;
+      return this;
+    }
+
+    public Builder keepSegmentGranularity(boolean keepSegmentGranularity)
+    {
+      this.keepSegmentGranularity = keepSegmentGranularity;
+      return this;
+    }
+
+    public Builder segmentGranularity(Granularity segmentGranularity)
+    {
+      this.segmentGranularity = segmentGranularity;
+      return this;
+    }
+
+    public Builder targetCompactionSizeBytes(long targetCompactionSizeBytes)
+    {
+      this.targetCompactionSizeBytes = targetCompactionSizeBytes;
+      return this;
+    }
+
+    public Builder tuningConfig(IndexTuningConfig tuningConfig)
+    {
+      this.tuningConfig = tuningConfig;
+      return this;
+    }
+
+    public Builder context(Map<String, Object> context)
+    {
+      this.context = context;
+      return this;
+    }
+
+    public CompactionTask build()
+    {
+      return new CompactionTask(
+          null,
+          null,
+          dataSource,
+          interval,
+          segments,
+          null,
+          dimensionsSpec,
+          metricsSpec,
+          keepSegmentGranularity,
+          segmentGranularity,
+          targetCompactionSizeBytes,
+          tuningConfig,
+          context,
+          jsonMapper,
+          authorizerMapper,
+          chatHandlerProvider,
+          rowIngestionMetersFactory
+      );
+    }
+  }
 }
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java
index c1e1397..fabd4e0 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java
@@ -430,9 +430,9 @@ public class IndexTask extends AbstractTask implements ChatHandler
 
       // Initialize maxRowsPerSegment and maxTotalRows lazily
       final IndexTuningConfig tuningConfig = ingestionSchema.tuningConfig;
-      @Nullable final Integer targetPartitionSize = getValidTargetPartitionSize(tuningConfig);
+      @Nullable final Integer maxRowsPerSegment = getValidMaxRowsPerSegment(tuningConfig);
       @Nullable final Long maxTotalRows = getValidMaxTotalRows(tuningConfig);
-      final ShardSpecs shardSpecs = determineShardSpecs(toolbox, firehoseFactory, firehoseTempDir, targetPartitionSize);
+      final ShardSpecs shardSpecs = determineShardSpecs(toolbox, firehoseFactory, firehoseTempDir, maxRowsPerSegment);
       final DataSchema dataSchema;
       final Map<Interval, String> versions;
       if (determineIntervals) {
@@ -469,7 +469,7 @@ public class IndexTask extends AbstractTask implements ChatHandler
           versions,
           firehoseFactory,
           firehoseTempDir,
-          targetPartitionSize,
+          maxRowsPerSegment,
           maxTotalRows
       );
     }
@@ -598,7 +598,7 @@ public class IndexTask extends AbstractTask implements ChatHandler
       final TaskToolbox toolbox,
       final FirehoseFactory firehoseFactory,
       final File firehoseTempDir,
-      @Nullable final Integer targetPartitionSize
+      @Nullable final Integer maxRowsPerSegment
   ) throws IOException
   {
     final ObjectMapper jsonMapper = toolbox.getObjectMapper();
@@ -634,7 +634,7 @@ public class IndexTask extends AbstractTask implements ChatHandler
           tuningConfig,
           determineIntervals,
           determineNumPartitions,
-          targetPartitionSize
+          maxRowsPerSegment
       );
     }
   }
@@ -684,7 +684,7 @@ public class IndexTask extends AbstractTask implements ChatHandler
       IndexTuningConfig tuningConfig,
       boolean determineIntervals,
       boolean determineNumPartitions,
-      @Nullable Integer targetPartitionSize
+      @Nullable Integer maxRowsPerSegment
   ) throws IOException
   {
     log.info("Determining intervals and shardSpecs");
@@ -710,7 +710,7 @@ public class IndexTask extends AbstractTask implements ChatHandler
       if (determineNumPartitions) {
         final long numRows = Preconditions.checkNotNull(collector, "HLL collector").estimateCardinalityRound();
         numShards = (int) Math.ceil(
-            (double) numRows / Preconditions.checkNotNull(targetPartitionSize, "targetPartitionSize")
+            (double) numRows / Preconditions.checkNotNull(maxRowsPerSegment, "maxRowsPerSegment")
         );
         log.info("Estimated [%,d] rows of data for interval [%s], creating [%,d] shards", numRows, interval, numShards);
       } else {
@@ -866,7 +866,7 @@ public class IndexTask extends AbstractTask implements ChatHandler
    *
    * <ul>
    * <li>
-   * If the number of rows in a segment exceeds {@link IndexTuningConfig#targetPartitionSize}
+   * If the number of rows in a segment exceeds {@link IndexTuningConfig#maxRowsPerSegment}
    * </li>
    * <li>
    * If the number of rows added to {@link BaseAppenderatorDriver} so far exceeds {@link IndexTuningConfig#maxTotalRows}
@@ -884,7 +884,7 @@ public class IndexTask extends AbstractTask implements ChatHandler
       final Map<Interval, String> versions,
       final FirehoseFactory firehoseFactory,
       final File firehoseTempDir,
-      @Nullable final Integer targetPartitionSize,
+      @Nullable final Integer maxRowsPerSegment,
       @Nullable final Long maxTotalRows
   ) throws IOException, InterruptedException
   {
@@ -1031,9 +1031,7 @@ public class IndexTask extends AbstractTask implements ChatHandler
 
           if (addResult.isOk()) {
             // incremental segment publishment is allowed only when rollup don't have to be perfect.
-            if (!isGuaranteedRollup &&
-                (exceedMaxRowsInSegment(targetPartitionSize, addResult.getNumRowsInSegment()) ||
-                 exceedMaxRowsInAppenderator(maxTotalRows, addResult.getTotalNumRowsInAppenderator()))) {
+            if (!isGuaranteedRollup && addResult.isPushRequired(maxRowsPerSegment, maxTotalRows)) {
               // There can be some segments waiting for being published even though any rows won't be added to them.
               // If those segments are not published here, the available space in appenderator will be kept to be small
               // which makes the size of segments smaller.
@@ -1099,17 +1097,17 @@ public class IndexTask extends AbstractTask implements ChatHandler
 
   /**
    * Return the valid target partition size. If {@link IndexTuningConfig#numShards} is valid, this returns null.
-   * Otherwise, this returns {@link IndexTuningConfig#DEFAULT_TARGET_PARTITION_SIZE} or the given
-   * {@link IndexTuningConfig#targetPartitionSize}.
+   * Otherwise, this returns {@link IndexTuningConfig#DEFAULT_MAX_ROWS_PER_SEGMENT} or the given
+   * {@link IndexTuningConfig#maxRowsPerSegment}.
    */
-  public static Integer getValidTargetPartitionSize(IndexTuningConfig tuningConfig)
+  public static Integer getValidMaxRowsPerSegment(IndexTuningConfig tuningConfig)
   {
     @Nullable final Integer numShards = tuningConfig.numShards;
-    @Nullable final Integer targetPartitionSize = tuningConfig.targetPartitionSize;
+    @Nullable final Integer maxRowsPerSegment = tuningConfig.maxRowsPerSegment;
     if (numShards == null || numShards == -1) {
-      return targetPartitionSize == null || targetPartitionSize.equals(-1)
-             ? IndexTuningConfig.DEFAULT_TARGET_PARTITION_SIZE
-             : targetPartitionSize;
+      return maxRowsPerSegment == null || maxRowsPerSegment.equals(-1)
+             ? IndexTuningConfig.DEFAULT_MAX_ROWS_PER_SEGMENT
+             : maxRowsPerSegment;
     } else {
       return null;
     }
@@ -1154,23 +1152,6 @@ public class IndexTask extends AbstractTask implements ChatHandler
     }
   }
 
-  private static boolean exceedMaxRowsInSegment(
-      @Nullable Integer maxRowsInSegment, // maxRowsInSegment can be null if numShards is set in indexTuningConfig
-      int numRowsInSegment
-  )
-  {
-    return maxRowsInSegment != null && maxRowsInSegment <= numRowsInSegment;
-  }
-
-  private static boolean exceedMaxRowsInAppenderator(
-      // maxRowsInAppenderator can be null if numShards is set in indexTuningConfig
-      @Nullable final Long maxRowsInAppenderator,
-      long numRowsInAppenderator
-  )
-  {
-    return maxRowsInAppenderator != null && maxRowsInAppenderator <= numRowsInAppenderator;
-  }
-
   private static SegmentsAndMetadata awaitPublish(
       ListenableFuture<SegmentsAndMetadata> publishFuture,
       long publishTimeout
@@ -1336,7 +1317,7 @@ public class IndexTask extends AbstractTask implements ChatHandler
   @JsonTypeName("index")
   public static class IndexTuningConfig implements TuningConfig, AppenderatorConfig
   {
-    static final int DEFAULT_TARGET_PARTITION_SIZE = 5_000_000;
+    static final int DEFAULT_MAX_ROWS_PER_SEGMENT = 5_000_000;
     static final int DEFAULT_MAX_TOTAL_ROWS = 20_000_000;
 
     private static final IndexSpec DEFAULT_INDEX_SPEC = new IndexSpec();
@@ -1347,7 +1328,7 @@ public class IndexTask extends AbstractTask implements ChatHandler
     private static final long DEFAULT_PUSH_TIMEOUT = 0;
 
     @Nullable
-    private final Integer targetPartitionSize;
+    private final Integer maxRowsPerSegment;
     private final int maxRowsInMemory;
     private final long maxBytesInMemory;
     @Nullable
@@ -1388,7 +1369,8 @@ public class IndexTask extends AbstractTask implements ChatHandler
 
     @JsonCreator
     public IndexTuningConfig(
-        @JsonProperty("targetPartitionSize") @Nullable Integer targetPartitionSize,
+        @JsonProperty("targetPartitionSize") @Deprecated @Nullable Integer targetPartitionSize,
+        @JsonProperty("maxRowsPerSegment") @Nullable Integer maxRowsPerSegment,
         @JsonProperty("maxRowsInMemory") @Nullable Integer maxRowsInMemory,
         @JsonProperty("maxBytesInMemory") @Nullable Long maxBytesInMemory,
         @JsonProperty("maxTotalRows") @Nullable Long maxTotalRows,
@@ -1412,7 +1394,7 @@ public class IndexTask extends AbstractTask implements ChatHandler
     )
     {
       this(
-          targetPartitionSize,
+          maxRowsPerSegment == null ? targetPartitionSize : maxRowsPerSegment,
           maxRowsInMemory != null ? maxRowsInMemory : rowFlushBoundary_forBackCompatibility,
           maxBytesInMemory != null ? maxBytesInMemory : 0,
           maxTotalRows,
@@ -1430,6 +1412,11 @@ public class IndexTask extends AbstractTask implements ChatHandler
           maxParseExceptions,
           maxSavedParseExceptions
       );
+
+      Preconditions.checkArgument(
+          targetPartitionSize == null || maxRowsPerSegment == null,
+          "Can't use targetPartitionSize and maxRowsPerSegment together"
+      );
     }
 
     private IndexTuningConfig()
@@ -1438,7 +1425,7 @@ public class IndexTask extends AbstractTask implements ChatHandler
     }
 
     private IndexTuningConfig(
-        @Nullable Integer targetPartitionSize,
+        @Nullable Integer maxRowsPerSegment,
         @Nullable Integer maxRowsInMemory,
         @Nullable Long maxBytesInMemory,
         @Nullable Long maxTotalRows,
@@ -1458,13 +1445,13 @@ public class IndexTask extends AbstractTask implements ChatHandler
     )
     {
       Preconditions.checkArgument(
-          targetPartitionSize == null || targetPartitionSize.equals(-1) || numShards == null || numShards.equals(-1),
-          "targetPartitionSize and numShards cannot both be set"
+          maxRowsPerSegment == null || maxRowsPerSegment.equals(-1) || numShards == null || numShards.equals(-1),
+          "maxRowsPerSegment and numShards cannot both be set"
       );
 
-      this.targetPartitionSize = (targetPartitionSize != null && targetPartitionSize == -1)
+      this.maxRowsPerSegment = (maxRowsPerSegment != null && maxRowsPerSegment == -1)
                                  ? null
-                                 : targetPartitionSize;
+                                 : maxRowsPerSegment;
       this.maxRowsInMemory = maxRowsInMemory == null ? TuningConfig.DEFAULT_MAX_ROWS_IN_MEMORY : maxRowsInMemory;
       // initializing this to 0, it will be lazily initialized to a value
       // @see server.src.main.java.org.apache.druid.segment.indexing.TuningConfigs#getMaxBytesInMemoryOrDefault(long)
@@ -1505,7 +1492,7 @@ public class IndexTask extends AbstractTask implements ChatHandler
     public IndexTuningConfig withBasePersistDirectory(File dir)
     {
       return new IndexTuningConfig(
-          targetPartitionSize,
+          maxRowsPerSegment,
           maxRowsInMemory,
           maxBytesInMemory,
           maxTotalRows,
@@ -1525,10 +1512,10 @@ public class IndexTask extends AbstractTask implements ChatHandler
       );
     }
 
-    public IndexTuningConfig withTargetPartitionSize(int targetPartitionSize)
+    public IndexTuningConfig withMaxRowsPerSegment(int maxRowsPerSegment)
     {
       return new IndexTuningConfig(
-          targetPartitionSize,
+          maxRowsPerSegment,
           maxRowsInMemory,
           maxBytesInMemory,
           maxTotalRows,
@@ -1549,14 +1536,15 @@ public class IndexTask extends AbstractTask implements ChatHandler
     }
 
     /**
-     * Return the target number of rows per segment. This returns null if it's not specified in tuningConfig.
-     * Please use {@link IndexTask#getValidTargetPartitionSize} instead to get the valid value.
+     * Return the max number of rows per segment. This returns null if it's not specified in tuningConfig.
+     * Please use {@link IndexTask#getValidMaxRowsPerSegment} instead to get the valid value.
      */
     @Nullable
     @JsonProperty
-    public Integer getTargetPartitionSize()
+    @Override
+    public Integer getMaxRowsPerSegment()
     {
-      return targetPartitionSize;
+      return maxRowsPerSegment;
     }
 
     @JsonProperty
@@ -1701,7 +1689,7 @@ public class IndexTask extends AbstractTask implements ChatHandler
              forceGuaranteedRollup == that.forceGuaranteedRollup &&
              reportParseExceptions == that.reportParseExceptions &&
              pushTimeout == that.pushTimeout &&
-             Objects.equals(targetPartitionSize, that.targetPartitionSize) &&
+             Objects.equals(maxRowsPerSegment, that.maxRowsPerSegment) &&
              Objects.equals(numShards, that.numShards) &&
              Objects.equals(indexSpec, that.indexSpec) &&
              Objects.equals(basePersistDirectory, that.basePersistDirectory) &&
@@ -1715,7 +1703,7 @@ public class IndexTask extends AbstractTask implements ChatHandler
     public int hashCode()
     {
       return Objects.hash(
-          targetPartitionSize,
+          maxRowsPerSegment,
           maxRowsInMemory,
           maxTotalRows,
           numShards,
@@ -1737,7 +1725,7 @@ public class IndexTask extends AbstractTask implements ChatHandler
     public String toString()
     {
       return "IndexTuningConfig{" +
-             "targetPartitionSize=" + targetPartitionSize +
+             "maxRowsPerSegment=" + maxRowsPerSegment +
              ", maxRowsInMemory=" + maxRowsInMemory +
              ", maxBytesInMemory=" + maxBytesInMemory +
              ", maxTotalRows=" + maxTotalRows +
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSubTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSubTask.java
index be72fc4..18e87d7 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSubTask.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSubTask.java
@@ -269,7 +269,7 @@ public class ParallelIndexSubTask extends AbstractTask
    *
    * <ul>
    * <li>
-   * If the number of rows in a segment exceeds {@link ParallelIndexTuningConfig#targetPartitionSize}
+   * If the number of rows in a segment exceeds {@link ParallelIndexTuningConfig#maxRowsPerSegment}
    * </li>
    * <li>
    * If the number of rows added to {@link BaseAppenderatorDriver} so far exceeds {@link ParallelIndexTuningConfig#maxTotalRows}
@@ -304,7 +304,7 @@ public class ParallelIndexSubTask extends AbstractTask
 
     // Initialize maxRowsPerSegment and maxTotalRows lazily
     final ParallelIndexTuningConfig tuningConfig = ingestionSchema.getTuningConfig();
-    @Nullable final Integer targetPartitionSize = IndexTask.getValidTargetPartitionSize(tuningConfig);
+    @Nullable final Integer maxRowsPerSegment = IndexTask.getValidMaxRowsPerSegment(tuningConfig);
     @Nullable final Long maxTotalRows = IndexTask.getValidMaxTotalRows(tuningConfig);
     final long pushTimeout = tuningConfig.getPushTimeout();
     final boolean explicitIntervals = granularitySpec.bucketIntervals().isPresent();
@@ -350,8 +350,7 @@ public class ParallelIndexSubTask extends AbstractTask
           final AppenderatorDriverAddResult addResult = driver.add(inputRow, sequenceName);
 
           if (addResult.isOk()) {
-            if (exceedMaxRowsInSegment(targetPartitionSize, addResult.getNumRowsInSegment()) ||
-                exceedMaxRowsInAppenderator(maxTotalRows, addResult.getTotalNumRowsInAppenderator())) {
+            if (addResult.isPushRequired(maxRowsPerSegment, maxTotalRows)) {
               // There can be some segments waiting for being published even though any rows won't be added to them.
               // If those segments are not published here, the available space in appenderator will be kept to be small
               // which makes the size of segments smaller.
@@ -385,23 +384,6 @@ public class ParallelIndexSubTask extends AbstractTask
     }
   }
 
-  private static boolean exceedMaxRowsInSegment(
-      @Nullable Integer maxRowsInSegment, // maxRowsInSegment can be null if numShards is set in indexTuningConfig
-      int numRowsInSegment
-  )
-  {
-    return maxRowsInSegment != null && maxRowsInSegment <= numRowsInSegment;
-  }
-
-  private static boolean exceedMaxRowsInAppenderator(
-      // maxRowsInAppenderator can be null if numShards is set in indexTuningConfig
-      @Nullable Long maxRowsInAppenderator,
-      long numRowsInAppenderator
-  )
-  {
-    return maxRowsInAppenderator != null && maxRowsInAppenderator <= numRowsInAppenderator;
-  }
-
   private static Appenderator newAppenderator(
       FireDepartmentMetrics metrics,
       TaskToolbox toolbox,
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java
index aecbbc0..53f05c6 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java
@@ -290,7 +290,8 @@ public class ParallelIndexSupervisorTask extends AbstractTask implements ChatHan
   private static IndexTuningConfig convertToIndexTuningConfig(ParallelIndexTuningConfig tuningConfig)
   {
     return new IndexTuningConfig(
-        tuningConfig.getTargetPartitionSize(),
+        null,
+        tuningConfig.getMaxRowsPerSegment(),
         tuningConfig.getMaxRowsInMemory(),
         tuningConfig.getMaxBytesInMemory(),
         tuningConfig.getMaxTotalRows(),
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexTuningConfig.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexTuningConfig.java
index 8f6239d..85929db 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexTuningConfig.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexTuningConfig.java
@@ -70,13 +70,15 @@ public class ParallelIndexTuningConfig extends IndexTuningConfig
         null,
         null,
         null,
+        null,
         null
     );
   }
 
   @JsonCreator
   public ParallelIndexTuningConfig(
-      @JsonProperty("targetPartitionSize") @Nullable Integer targetPartitionSize,
+      @JsonProperty("targetPartitionSize") @Deprecated @Nullable Integer targetPartitionSize,
+      @JsonProperty("maxRowsPerSegment") @Nullable Integer maxRowsPerSegment,
       @JsonProperty("maxRowsInMemory") @Nullable Integer maxRowsInMemory,
       @JsonProperty("maxBytesInMemory") @Nullable Long maxBytesInMemory,
       @JsonProperty("maxTotalRows") @Nullable Long maxTotalRows,
@@ -100,6 +102,7 @@ public class ParallelIndexTuningConfig extends IndexTuningConfig
   {
     super(
         targetPartitionSize,
+        maxRowsPerSegment,
         maxRowsInMemory,
         maxBytesInMemory,
         maxTotalRows,
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskTuningConfig.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskTuningConfig.java
index 816949c..3ebfe59 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskTuningConfig.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskTuningConfig.java
@@ -147,7 +147,7 @@ public abstract class SeekableStreamIndexTaskTuningConfig implements TuningConfi
 
   @Override
   @JsonProperty
-  public int getMaxRowsPerSegment()
+  public Integer getMaxRowsPerSegment()
   {
     return maxRowsPerSegment;
   }
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskRunTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskRunTest.java
index 5020baa..54796a5 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskRunTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskRunTest.java
@@ -32,6 +32,7 @@ import org.apache.druid.indexing.common.TaskToolbox;
 import org.apache.druid.indexing.common.TestUtils;
 import org.apache.druid.indexing.common.actions.LocalTaskActionClient;
 import org.apache.druid.indexing.common.stats.RowIngestionMetersFactory;
+import org.apache.druid.indexing.common.task.CompactionTask.Builder;
 import org.apache.druid.java.util.common.ISE;
 import org.apache.druid.java.util.common.Intervals;
 import org.apache.druid.java.util.common.Pair;
@@ -120,24 +121,18 @@ public class CompactionTaskRunTest extends IngestionTestBase
   {
     runIndexTask();
 
-    final CompactionTask compactionTask = new CompactionTask(
-        null,
-        null,
+    final Builder builder = new Builder(
         DATA_SOURCE,
-        Intervals.of("2014-01-01/2014-01-02"),
-        null,
-        null,
-        null,
-        null,
-        null,
-        null,
-        null,
         getObjectMapper(),
         AuthTestUtils.TEST_AUTHORIZER_MAPPER,
         null,
         rowIngestionMetersFactory
     );
 
+    final CompactionTask compactionTask = builder
+        .interval(Intervals.of("2014-01-01/2014-01-02"))
+        .build();
+
     final Pair<TaskStatus, List<DataSegment>> resultPair = runTask(compactionTask);
 
     Assert.assertTrue(resultPair.lhs.isSuccess());
@@ -156,24 +151,19 @@ public class CompactionTaskRunTest extends IngestionTestBase
   {
     runIndexTask();
 
-    final CompactionTask compactionTask1 = new CompactionTask(
-        null,
-        null,
+    final Builder builder = new Builder(
         DATA_SOURCE,
-        Intervals.of("2014-01-01/2014-01-02"),
-        null,
-        null,
-        false,
-        null,
-        null,
-        null,
-        null,
         getObjectMapper(),
         AuthTestUtils.TEST_AUTHORIZER_MAPPER,
         null,
         rowIngestionMetersFactory
     );
 
+    final CompactionTask compactionTask1 = builder
+        .interval(Intervals.of("2014-01-01/2014-01-02"))
+        .keepSegmentGranularity(false)
+        .build();
+
     Pair<TaskStatus, List<DataSegment>> resultPair = runTask(compactionTask1);
 
     Assert.assertTrue(resultPair.lhs.isSuccess());
@@ -184,23 +174,10 @@ public class CompactionTaskRunTest extends IngestionTestBase
     Assert.assertEquals(Intervals.of("2014-01-01/2014-01-02"), segments.get(0).getInterval());
     Assert.assertEquals(new NumberedShardSpec(0, 0), segments.get(0).getShardSpec());
 
-    final CompactionTask compactionTask2 = new CompactionTask(
-        null,
-        null,
-        DATA_SOURCE,
-        Intervals.of("2014-01-01/2014-01-02"),
-        null,
-        null,
-        false,
-        null,
-        null,
-        null,
-        null,
-        getObjectMapper(),
-        AuthTestUtils.TEST_AUTHORIZER_MAPPER,
-        null,
-        rowIngestionMetersFactory
-    );
+    final CompactionTask compactionTask2 = builder
+        .interval(Intervals.of("2014-01-01/2014-01-02"))
+        .keepSegmentGranularity(false)
+        .build();
 
     resultPair = runTask(compactionTask2);
 
@@ -218,24 +195,19 @@ public class CompactionTaskRunTest extends IngestionTestBase
   {
     runIndexTask();
 
-    final CompactionTask compactionTask1 = new CompactionTask(
-        null,
-        null,
+    final Builder builder = new Builder(
         DATA_SOURCE,
-        Intervals.of("2014-01-01/2014-01-02"),
-        null,
-        null,
-        true,
-        null,
-        null,
-        null,
-        null,
         getObjectMapper(),
         AuthTestUtils.TEST_AUTHORIZER_MAPPER,
         null,
         rowIngestionMetersFactory
     );
 
+    final CompactionTask compactionTask1 = builder
+        .interval(Intervals.of("2014-01-01/2014-01-02"))
+        .keepSegmentGranularity(true)
+        .build();
+
     Pair<TaskStatus, List<DataSegment>> resultPair = runTask(compactionTask1);
 
     Assert.assertTrue(resultPair.lhs.isSuccess());
@@ -248,23 +220,10 @@ public class CompactionTaskRunTest extends IngestionTestBase
       Assert.assertEquals(new NumberedShardSpec(0, 0), segments.get(i).getShardSpec());
     }
 
-    final CompactionTask compactionTask2 = new CompactionTask(
-        null,
-        null,
-        DATA_SOURCE,
-        Intervals.of("2014-01-01/2014-01-02"),
-        null,
-        null,
-        true,
-        null,
-        null,
-        null,
-        null,
-        getObjectMapper(),
-        AuthTestUtils.TEST_AUTHORIZER_MAPPER,
-        null,
-        rowIngestionMetersFactory
-    );
+    final CompactionTask compactionTask2 = builder
+        .interval(Intervals.of("2014-01-01/2014-01-02"))
+        .keepSegmentGranularity(true)
+        .build();
 
     resultPair = runTask(compactionTask2);
 
@@ -284,25 +243,20 @@ public class CompactionTaskRunTest extends IngestionTestBase
   {
     runIndexTask();
 
-    // day segmentGranularity
-    final CompactionTask compactionTask1 = new CompactionTask(
-        null,
-        null,
+    final Builder builder = new Builder(
         DATA_SOURCE,
-        Intervals.of("2014-01-01/2014-01-02"),
-        null,
-        null,
-        null,
-        Granularities.DAY,
-        null,
-        null,
-        null,
         getObjectMapper(),
         AuthTestUtils.TEST_AUTHORIZER_MAPPER,
         null,
         rowIngestionMetersFactory
     );
 
+    // day segmentGranularity
+    final CompactionTask compactionTask1 = builder
+        .interval(Intervals.of("2014-01-01/2014-01-02"))
+        .segmentGranularity(Granularities.DAY)
+        .build();
+
     Pair<TaskStatus, List<DataSegment>> resultPair = runTask(compactionTask1);
 
     Assert.assertTrue(resultPair.lhs.isSuccess());
@@ -315,23 +269,10 @@ public class CompactionTaskRunTest extends IngestionTestBase
     Assert.assertEquals(new NumberedShardSpec(0, 0), segments.get(0).getShardSpec());
 
     // hour segmentGranularity
-    final CompactionTask compactionTask2 = new CompactionTask(
-        null,
-        null,
-        DATA_SOURCE,
-        Intervals.of("2014-01-01/2014-01-02"),
-        null,
-        null,
-        null,
-        Granularities.HOUR,
-        null,
-        null,
-        null,
-        getObjectMapper(),
-        AuthTestUtils.TEST_AUTHORIZER_MAPPER,
-        null,
-        rowIngestionMetersFactory
-    );
+    final CompactionTask compactionTask2 = builder
+        .interval(Intervals.of("2014-01-01/2014-01-02"))
+        .segmentGranularity(Granularities.HOUR)
+        .build();
 
     resultPair = runTask(compactionTask2);
 
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java
index 867cdfe..f759cc7 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java
@@ -47,6 +47,7 @@ import org.apache.druid.indexing.common.actions.SegmentListUsedAction;
 import org.apache.druid.indexing.common.actions.TaskAction;
 import org.apache.druid.indexing.common.actions.TaskActionClient;
 import org.apache.druid.indexing.common.stats.RowIngestionMetersFactory;
+import org.apache.druid.indexing.common.task.CompactionTask.Builder;
 import org.apache.druid.indexing.common.task.CompactionTask.PartitionConfigurationManager;
 import org.apache.druid.indexing.common.task.CompactionTask.SegmentProvider;
 import org.apache.druid.indexing.common.task.IndexTask.IndexIOConfig;
@@ -64,6 +65,8 @@ import org.apache.druid.java.util.common.granularity.PeriodGranularity;
 import org.apache.druid.java.util.common.guava.Comparators;
 import org.apache.druid.query.aggregation.AggregatorFactory;
 import org.apache.druid.query.aggregation.CountAggregatorFactory;
+import org.apache.druid.query.aggregation.DoubleMaxAggregatorFactory;
+import org.apache.druid.query.aggregation.FloatMinAggregatorFactory;
 import org.apache.druid.query.aggregation.LongMaxAggregatorFactory;
 import org.apache.druid.query.aggregation.LongSumAggregatorFactory;
 import org.apache.druid.query.aggregation.first.FloatFirstAggregatorFactory;
@@ -122,7 +125,7 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
-import java.util.Set;
+import java.util.Optional;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 
@@ -147,7 +150,7 @@ public class CompactionTaskTest
   private static final IndexTuningConfig TUNING_CONFIG = createTuningConfig();
 
   private static Map<String, DimensionSchema> DIMENSIONS;
-  private static Map<String, AggregatorFactory> AGGREGATORS;
+  private static List<AggregatorFactory> AGGREGATORS;
   private static List<DataSegment> SEGMENTS;
   private static RowIngestionMetersFactory rowIngestionMetersFactory = new TestUtils().getRowIngestionMetersFactory();
   private static ObjectMapper objectMapper = setupInjectablesInObjectMapper(new DefaultObjectMapper());
@@ -168,7 +171,7 @@ public class CompactionTaskTest
     MIXED_TYPE_COLUMN_MAP.put(Intervals.of("2017-06-01/2017-07-01"), new DoubleDimensionSchema(MIXED_TYPE_COLUMN));
 
     DIMENSIONS = new HashMap<>();
-    AGGREGATORS = new HashMap<>();
+    AGGREGATORS = new ArrayList<>();
 
     DIMENSIONS.put(ColumnHolder.TIME_COLUMN_NAME, new LongDimensionSchema(ColumnHolder.TIME_COLUMN_NAME));
     DIMENSIONS.put(TIMESTAMP_COLUMN, new LongDimensionSchema(TIMESTAMP_COLUMN));
@@ -193,11 +196,11 @@ public class CompactionTaskTest
       DIMENSIONS.put(schema.getName(), schema);
     }
 
-    AGGREGATORS.put("agg_0", new CountAggregatorFactory("agg_0"));
-    AGGREGATORS.put("agg_1", new LongSumAggregatorFactory("agg_1", "long_dim_1"));
-    AGGREGATORS.put("agg_2", new LongMaxAggregatorFactory("agg_2", "long_dim_2"));
-    AGGREGATORS.put("agg_3", new FloatFirstAggregatorFactory("agg_3", "float_dim_3"));
-    AGGREGATORS.put("agg_4", new DoubleLastAggregatorFactory("agg_4", "double_dim_4"));
+    AGGREGATORS.add(new CountAggregatorFactory("agg_0"));
+    AGGREGATORS.add(new LongSumAggregatorFactory("agg_1", "long_dim_1"));
+    AGGREGATORS.add(new LongMaxAggregatorFactory("agg_2", "long_dim_2"));
+    AGGREGATORS.add(new FloatFirstAggregatorFactory("agg_3", "float_dim_3"));
+    AGGREGATORS.add(new DoubleLastAggregatorFactory("agg_4", "double_dim_4"));
 
     segmentMap = new HashMap<>(SEGMENT_INTERVALS.size());
     for (int i = 0; i < SEGMENT_INTERVALS.size(); i++) {
@@ -209,7 +212,7 @@ public class CompactionTaskTest
               "version",
               ImmutableMap.of(),
               findDimensions(i, segmentInterval),
-              new ArrayList<>(AGGREGATORS.keySet()),
+              AGGREGATORS.stream().map(AggregatorFactory::getName).collect(Collectors.toList()),
               new NumberedShardSpec(0, 1),
               0,
               SEGMENT_SIZE_BYTES
@@ -270,7 +273,8 @@ public class CompactionTaskTest
   private static IndexTuningConfig createTuningConfig()
   {
     return new IndexTuningConfig(
-        null, // null to compute targetPartitionSize automatically
+        null,
+        null, // null to compute maxRowsPerSegment automatically
         500000,
         1000000L,
         null,
@@ -327,66 +331,88 @@ public class CompactionTaskTest
   @Test
   public void testSerdeWithInterval() throws IOException
   {
-    final CompactionTask task = new CompactionTask(
-        null,
-        null,
+    final Builder builder = new Builder(
         DATA_SOURCE,
-        COMPACTION_INTERVAL,
-        null,
-        null,
-        null,
-        null,
-        null,
-        createTuningConfig(),
-        ImmutableMap.of("testKey", "testContext"),
         objectMapper,
         AuthTestUtils.TEST_AUTHORIZER_MAPPER,
         null,
         rowIngestionMetersFactory
     );
+    final CompactionTask task = builder
+        .interval(COMPACTION_INTERVAL)
+        .tuningConfig(createTuningConfig())
+        .context(ImmutableMap.of("testKey", "testContext"))
+        .build();
+
     final byte[] bytes = objectMapper.writeValueAsBytes(task);
     final CompactionTask fromJson = objectMapper.readValue(bytes, CompactionTask.class);
-    Assert.assertEquals(task.getType(), fromJson.getType());
-    Assert.assertEquals(task.getDataSource(), fromJson.getDataSource());
-    Assert.assertEquals(task.getInterval(), fromJson.getInterval());
-    Assert.assertEquals(task.getSegments(), fromJson.getSegments());
-    Assert.assertEquals(task.getDimensionsSpec(), fromJson.getDimensionsSpec());
-    Assert.assertEquals(task.getTuningConfig(), fromJson.getTuningConfig());
-    Assert.assertEquals(task.getContext(), fromJson.getContext());
-    Assert.assertNull(fromJson.getSegmentProvider().getSegments());
+    assertEquals(task, fromJson);
   }
 
   @Test
   public void testSerdeWithSegments() throws IOException
   {
-    final CompactionTask task = new CompactionTask(
-        null,
-        null,
+    final Builder builder = new Builder(
         DATA_SOURCE,
+        objectMapper,
+        AuthTestUtils.TEST_AUTHORIZER_MAPPER,
         null,
-        SEGMENTS,
-        null,
-        null,
-        null,
-        null,
-        createTuningConfig(),
-        ImmutableMap.of("testKey", "testContext"),
+        rowIngestionMetersFactory
+    );
+    final CompactionTask task = builder
+        .segments(SEGMENTS)
+        .tuningConfig(createTuningConfig())
+        .context(ImmutableMap.of("testKey", "testContext"))
+        .build();
+
+    final byte[] bytes = objectMapper.writeValueAsBytes(task);
+    final CompactionTask fromJson = objectMapper.readValue(bytes, CompactionTask.class);
+    assertEquals(task, fromJson);
+  }
+
+  @Test
+  public void testSerdeWithDimensions() throws IOException
+  {
+    final Builder builder = new Builder(
+        DATA_SOURCE,
         objectMapper,
         AuthTestUtils.TEST_AUTHORIZER_MAPPER,
         null,
         rowIngestionMetersFactory
     );
+
+    final CompactionTask task = builder
+        .segments(SEGMENTS)
+        .dimensionsSpec(
+            new DimensionsSpec(
+                ImmutableList.of(
+                    new StringDimensionSchema("dim1"),
+                    new StringDimensionSchema("dim2"),
+                    new StringDimensionSchema("dim3")
+                )
+            )
+        )
+        .tuningConfig(createTuningConfig())
+        .context(ImmutableMap.of("testKey", "testVal"))
+        .build();
+
     final byte[] bytes = objectMapper.writeValueAsBytes(task);
     final CompactionTask fromJson = objectMapper.readValue(bytes, CompactionTask.class);
-    Assert.assertEquals(task.getType(), fromJson.getType());
-    Assert.assertEquals(task.getDataSource(), fromJson.getDataSource());
-    Assert.assertEquals(task.getInterval(), fromJson.getInterval());
-    Assert.assertEquals(task.getSegments(), fromJson.getSegments());
-    Assert.assertEquals(task.getDimensionsSpec(), fromJson.getDimensionsSpec());
-    Assert.assertEquals(task.isKeepSegmentGranularity(), fromJson.isKeepSegmentGranularity());
-    Assert.assertEquals(task.getTargetCompactionSizeBytes(), fromJson.getTargetCompactionSizeBytes());
-    Assert.assertEquals(task.getTuningConfig(), fromJson.getTuningConfig());
-    Assert.assertEquals(task.getContext(), fromJson.getContext());
+    assertEquals(task, fromJson);
+  }
+
+  private static void assertEquals(CompactionTask expected, CompactionTask actual)
+  {
+    Assert.assertEquals(expected.getType(), actual.getType());
+    Assert.assertEquals(expected.getDataSource(), actual.getDataSource());
+    Assert.assertEquals(expected.getInterval(), actual.getInterval());
+    Assert.assertEquals(expected.getSegments(), actual.getSegments());
+    Assert.assertEquals(expected.getDimensionsSpec(), actual.getDimensionsSpec());
+    Assert.assertTrue(Arrays.equals(expected.getMetricsSpec(), actual.getMetricsSpec()));
+    Assert.assertEquals(expected.isKeepSegmentGranularity(), actual.isKeepSegmentGranularity());
+    Assert.assertEquals(expected.getTargetCompactionSizeBytes(), actual.getTargetCompactionSizeBytes());
+    Assert.assertEquals(expected.getTuningConfig(), actual.getTuningConfig());
+    Assert.assertEquals(expected.getContext(), actual.getContext());
   }
 
   @Test
@@ -397,6 +423,7 @@ public class CompactionTaskTest
         new SegmentProvider(DATA_SOURCE, COMPACTION_INTERVAL),
         new PartitionConfigurationManager(null, TUNING_CONFIG),
         null,
+        null,
         keepSegmentGranularity,
         null,
         objectMapper
@@ -413,12 +440,13 @@ public class CompactionTaskTest
           )
       );
       Assert.assertEquals(6, ingestionSpecs.size());
-      assertIngestionSchema(ingestionSpecs, expectedDimensionsSpec, SEGMENT_INTERVALS, Granularities.MONTH);
+      assertIngestionSchema(ingestionSpecs, expectedDimensionsSpec, AGGREGATORS, SEGMENT_INTERVALS, Granularities.MONTH);
     } else {
       Assert.assertEquals(1, ingestionSpecs.size());
       assertIngestionSchema(
           ingestionSpecs,
           expectedDimensionsSpec,
+          AGGREGATORS,
           Collections.singletonList(COMPACTION_INTERVAL),
           Granularities.ALL
       );
@@ -429,6 +457,7 @@ public class CompactionTaskTest
   public void testCreateIngestionSchemaWithTargetPartitionSize() throws IOException, SegmentLoadingException
   {
     final IndexTuningConfig tuningConfig = new IndexTuningConfig(
+        null,
         6,
         500000,
         1000000L,
@@ -459,6 +488,7 @@ public class CompactionTaskTest
         new SegmentProvider(DATA_SOURCE, COMPACTION_INTERVAL),
         new PartitionConfigurationManager(null, tuningConfig),
         null,
+        null,
         keepSegmentGranularity,
         null,
         objectMapper
@@ -478,6 +508,7 @@ public class CompactionTaskTest
       assertIngestionSchema(
           ingestionSpecs,
           expectedDimensionsSpec,
+          AGGREGATORS,
           SEGMENT_INTERVALS,
           tuningConfig,
           Granularities.MONTH
@@ -487,6 +518,7 @@ public class CompactionTaskTest
       assertIngestionSchema(
           ingestionSpecs,
           expectedDimensionsSpec,
+          AGGREGATORS,
           Collections.singletonList(COMPACTION_INTERVAL),
           tuningConfig,
           Granularities.ALL
@@ -499,6 +531,7 @@ public class CompactionTaskTest
   {
     final IndexTuningConfig tuningConfig = new IndexTuningConfig(
         null,
+        null,
         500000,
         1000000L,
         6L,
@@ -528,6 +561,7 @@ public class CompactionTaskTest
         new SegmentProvider(DATA_SOURCE, COMPACTION_INTERVAL),
         new PartitionConfigurationManager(null, tuningConfig),
         null,
+        null,
         keepSegmentGranularity,
         null,
         objectMapper
@@ -547,6 +581,7 @@ public class CompactionTaskTest
       assertIngestionSchema(
           ingestionSpecs,
           expectedDimensionsSpec,
+          AGGREGATORS,
           SEGMENT_INTERVALS,
           tuningConfig,
           Granularities.MONTH
@@ -556,6 +591,7 @@ public class CompactionTaskTest
       assertIngestionSchema(
           ingestionSpecs,
           expectedDimensionsSpec,
+          AGGREGATORS,
           Collections.singletonList(COMPACTION_INTERVAL),
           tuningConfig,
           Granularities.ALL
@@ -568,6 +604,7 @@ public class CompactionTaskTest
   {
     final IndexTuningConfig tuningConfig = new IndexTuningConfig(
         null,
+        null,
         500000,
         1000000L,
         null,
@@ -597,6 +634,7 @@ public class CompactionTaskTest
         new SegmentProvider(DATA_SOURCE, COMPACTION_INTERVAL),
         new PartitionConfigurationManager(null, tuningConfig),
         null,
+        null,
         keepSegmentGranularity,
         null,
         objectMapper
@@ -616,6 +654,7 @@ public class CompactionTaskTest
       assertIngestionSchema(
           ingestionSpecs,
           expectedDimensionsSpec,
+          AGGREGATORS,
           SEGMENT_INTERVALS,
           tuningConfig,
           Granularities.MONTH
@@ -625,6 +664,7 @@ public class CompactionTaskTest
       assertIngestionSchema(
           ingestionSpecs,
           expectedDimensionsSpec,
+          AGGREGATORS,
           Collections.singletonList(COMPACTION_INTERVAL),
           tuningConfig,
           Granularities.ALL
@@ -667,6 +707,7 @@ public class CompactionTaskTest
         new SegmentProvider(DATA_SOURCE, COMPACTION_INTERVAL),
         new PartitionConfigurationManager(null, TUNING_CONFIG),
         customSpec,
+        null,
         keepSegmentGranularity,
         null,
         objectMapper
@@ -685,6 +726,7 @@ public class CompactionTaskTest
       assertIngestionSchema(
           ingestionSpecs,
           dimensionsSpecs,
+          AGGREGATORS,
           SEGMENT_INTERVALS,
           Granularities.MONTH
       );
@@ -693,6 +735,59 @@ public class CompactionTaskTest
       assertIngestionSchema(
           ingestionSpecs,
           Collections.singletonList(customSpec),
+          AGGREGATORS,
+          Collections.singletonList(COMPACTION_INTERVAL),
+          Granularities.ALL
+      );
+    }
+  }
+
+  @Test
+  public void testCreateIngestionSchemaWithCustomMetricsSpec() throws IOException, SegmentLoadingException
+  {
+    final AggregatorFactory[] customMetricsSpec = new AggregatorFactory[]{
+        new CountAggregatorFactory("custom_count"),
+        new LongSumAggregatorFactory("custom_long_sum", "agg_1"),
+        new FloatMinAggregatorFactory("custom_float_min", "agg_3"),
+        new DoubleMaxAggregatorFactory("custom_double_max", "agg_4")
+    };
+
+    final List<IndexIngestionSpec> ingestionSpecs = CompactionTask.createIngestionSchema(
+        toolbox,
+        new SegmentProvider(DATA_SOURCE, COMPACTION_INTERVAL),
+        new PartitionConfigurationManager(null, TUNING_CONFIG),
+        null,
+        customMetricsSpec,
+        keepSegmentGranularity,
+        null,
+        objectMapper
+    );
+
+    final List<DimensionsSpec> expectedDimensionsSpec = getExpectedDimensionsSpecForAutoGeneration(
+        keepSegmentGranularity
+    );
+
+    if (keepSegmentGranularity) {
+      ingestionSpecs.sort(
+          (s1, s2) -> Comparators.intervalsByStartThenEnd().compare(
+              s1.getDataSchema().getGranularitySpec().inputIntervals().get(0),
+              s2.getDataSchema().getGranularitySpec().inputIntervals().get(0)
+          )
+      );
+      Assert.assertEquals(6, ingestionSpecs.size());
+      assertIngestionSchema(
+          ingestionSpecs,
+          expectedDimensionsSpec,
+          Arrays.asList(customMetricsSpec),
+          SEGMENT_INTERVALS,
+          Granularities.MONTH
+      );
+    } else {
+      Assert.assertEquals(1, ingestionSpecs.size());
+      assertIngestionSchema(
+          ingestionSpecs,
+          expectedDimensionsSpec,
+          Arrays.asList(customMetricsSpec),
           Collections.singletonList(COMPACTION_INTERVAL),
           Granularities.ALL
       );
@@ -707,6 +802,7 @@ public class CompactionTaskTest
         new SegmentProvider(SEGMENTS),
         new PartitionConfigurationManager(null, TUNING_CONFIG),
         null,
+        null,
         keepSegmentGranularity,
         null,
         objectMapper
@@ -723,12 +819,13 @@ public class CompactionTaskTest
           )
       );
       Assert.assertEquals(6, ingestionSpecs.size());
-      assertIngestionSchema(ingestionSpecs, expectedDimensionsSpec, SEGMENT_INTERVALS, Granularities.MONTH);
+      assertIngestionSchema(ingestionSpecs, expectedDimensionsSpec, AGGREGATORS, SEGMENT_INTERVALS, Granularities.MONTH);
     } else {
       Assert.assertEquals(1, ingestionSpecs.size());
       assertIngestionSchema(
           ingestionSpecs,
           expectedDimensionsSpec,
+          AGGREGATORS,
           Collections.singletonList(COMPACTION_INTERVAL),
           Granularities.ALL
       );
@@ -750,6 +847,7 @@ public class CompactionTaskTest
         new SegmentProvider(segments),
         new PartitionConfigurationManager(null, TUNING_CONFIG),
         null,
+        null,
         keepSegmentGranularity,
         null,
         objectMapper
@@ -770,6 +868,7 @@ public class CompactionTaskTest
         new SegmentProvider(segments),
         new PartitionConfigurationManager(null, TUNING_CONFIG),
         null,
+        null,
         keepSegmentGranularity,
         null,
         objectMapper
@@ -782,29 +881,24 @@ public class CompactionTaskTest
     expectedException.expect(IllegalArgumentException.class);
     expectedException.expectMessage(CoreMatchers.containsString("must specify a nonempty interval"));
 
-    final CompactionTask task = new CompactionTask(
-        null,
-        null,
-        "foo",
-        Intervals.of("2000-01-01/2000-01-01"),
-        null,
-        null,
-        null,
-        null,
-        null,
-        null,
-        null,
+    final Builder builder = new Builder(
+        DATA_SOURCE,
         objectMapper,
         AuthTestUtils.TEST_AUTHORIZER_MAPPER,
-        new NoopChatHandlerProvider(),
-        null
+        null,
+        rowIngestionMetersFactory
     );
+
+    final CompactionTask task = builder
+        .interval(Intervals.of("2000-01-01/2000-01-01"))
+        .build();
   }
 
   @Test
   public void testTargetPartitionSizeWithPartitionConfig() throws IOException, SegmentLoadingException
   {
     final IndexTuningConfig tuningConfig = new IndexTuningConfig(
+        null,
         6,
         500000,
         1000000L,
@@ -837,6 +931,7 @@ public class CompactionTaskTest
         new SegmentProvider(DATA_SOURCE, COMPACTION_INTERVAL),
         new PartitionConfigurationManager(6L, tuningConfig),
         null,
+        null,
         keepSegmentGranularity,
         null,
         objectMapper
@@ -852,6 +947,7 @@ public class CompactionTaskTest
         new PartitionConfigurationManager(null, TUNING_CONFIG),
         null,
         null,
+        null,
         new PeriodGranularity(Period.months(3), null, null),
         objectMapper
     );
@@ -869,6 +965,7 @@ public class CompactionTaskTest
     assertIngestionSchema(
         ingestionSpecs,
         expectedDimensionsSpec,
+        AGGREGATORS,
         Collections.singletonList(COMPACTION_INTERVAL),
         new PeriodGranularity(Period.months(3), null, null)
     );
@@ -882,6 +979,7 @@ public class CompactionTaskTest
         new SegmentProvider(DATA_SOURCE, COMPACTION_INTERVAL),
         new PartitionConfigurationManager(null, TUNING_CONFIG),
         null,
+        null,
         false,
         new PeriodGranularity(Period.months(3), null, null),
         objectMapper
@@ -900,6 +998,7 @@ public class CompactionTaskTest
     assertIngestionSchema(
         ingestionSpecs,
         expectedDimensionsSpec,
+        AGGREGATORS,
         Collections.singletonList(COMPACTION_INTERVAL),
         new PeriodGranularity(Period.months(3), null, null)
     );
@@ -915,6 +1014,7 @@ public class CompactionTaskTest
         null,
         null,
         null,
+        null,
         objectMapper
     );
     final List<DimensionsSpec> expectedDimensionsSpec = getExpectedDimensionsSpecForAutoGeneration(
@@ -931,6 +1031,7 @@ public class CompactionTaskTest
     assertIngestionSchema(
         ingestionSpecs,
         expectedDimensionsSpec,
+        AGGREGATORS,
         SEGMENT_INTERVALS,
         Granularities.MONTH
     );
@@ -941,23 +1042,21 @@ public class CompactionTaskTest
   {
     expectedException.expect(IAE.class);
     expectedException.expectMessage("keepSegmentGranularity and segmentGranularity can't be used together");
-    final CompactionTask task = new CompactionTask(
-        null,
-        null,
+
+    final Builder builder = new Builder(
         DATA_SOURCE,
-        COMPACTION_INTERVAL,
-        null,
-        null,
-        true,
-        Granularities.YEAR,
-        null,
-        createTuningConfig(),
-        ImmutableMap.of("testKey", "testContext"),
         objectMapper,
         AuthTestUtils.TEST_AUTHORIZER_MAPPER,
         null,
         rowIngestionMetersFactory
     );
+    final CompactionTask task = builder
+        .interval(COMPACTION_INTERVAL)
+        .keepSegmentGranularity(true)
+        .segmentGranularity(Granularities.YEAR)
+        .tuningConfig(createTuningConfig())
+        .context(ImmutableMap.of("testKey", "testContext"))
+        .build();
   }
 
   private static List<DimensionsSpec> getExpectedDimensionsSpecForAutoGeneration(boolean keepSegmentGranularity)
@@ -1013,6 +1112,7 @@ public class CompactionTaskTest
   private static void assertIngestionSchema(
       List<IndexIngestionSpec> ingestionSchemas,
       List<DimensionsSpec> expectedDimensionsSpecs,
+      List<AggregatorFactory> expectedMetricsSpec,
       List<Interval> expectedSegmentIntervals,
       Granularity expectedSegmentGranularity
   )
@@ -1020,8 +1120,10 @@ public class CompactionTaskTest
     assertIngestionSchema(
         ingestionSchemas,
         expectedDimensionsSpecs,
+        expectedMetricsSpec,
         expectedSegmentIntervals,
         new IndexTuningConfig(
+            null,
             41943040, // automatically computed targetPartitionSize
             500000,
             1000000L,
@@ -1054,6 +1156,7 @@ public class CompactionTaskTest
   private static void assertIngestionSchema(
       List<IndexIngestionSpec> ingestionSchemas,
       List<DimensionsSpec> expectedDimensionsSpecs,
+      List<AggregatorFactory> expectedMetricsSpec,
       List<Interval> expectedSegmentIntervals,
       IndexTuningConfig expectedTuningConfig,
       Granularity expectedSegmentGranularity
@@ -1082,11 +1185,13 @@ public class CompactionTaskTest
           new HashSet<>(expectedDimensionsSpec.getDimensions()),
           new HashSet<>(parser.getParseSpec().getDimensionsSpec().getDimensions())
       );
-      final Set<AggregatorFactory> expectedAggregators = AGGREGATORS.values()
-                                                                    .stream()
-                                                                    .map(AggregatorFactory::getCombiningFactory)
-                                                                    .collect(Collectors.toSet());
-      Assert.assertEquals(expectedAggregators, new HashSet<>(Arrays.asList(dataSchema.getAggregators())));
+
+      // metrics
+      final List<AggregatorFactory> expectedAggregators = expectedMetricsSpec
+          .stream()
+          .map(AggregatorFactory::getCombiningFactory)
+          .collect(Collectors.toList());
+      Assert.assertEquals(expectedAggregators, Arrays.asList(dataSchema.getAggregators()));
       Assert.assertEquals(
           new UniformGranularitySpec(
               expectedSegmentGranularity,
@@ -1107,16 +1212,10 @@ public class CompactionTaskTest
       Assert.assertEquals(expectedSegmentIntervals.get(i), ingestSegmentFirehoseFactory.getInterval());
       Assert.assertNull(ingestSegmentFirehoseFactory.getDimensionsFilter());
 
-      // check the order of dimensions
       Assert.assertEquals(
           new HashSet<>(expectedDimensionsSpec.getDimensionNames()),
           new HashSet<>(ingestSegmentFirehoseFactory.getDimensions())
       );
-      // check the order of metrics
-      Assert.assertEquals(
-          Lists.newArrayList("agg_4", "agg_3", "agg_2", "agg_1", "agg_0"),
-          ingestSegmentFirehoseFactory.getMetrics()
-      );
 
       // assert tuningConfig
       Assert.assertEquals(expectedTuningConfig, ingestionSchema.getTuningConfig());
@@ -1221,9 +1320,14 @@ public class CompactionTaskTest
             columnMap.put(columnName, createColumn(MIXED_TYPE_COLUMN_MAP.get(segment.getInterval())));
           } else if (DIMENSIONS.containsKey(columnName)) {
             columnMap.put(columnName, createColumn(DIMENSIONS.get(columnName)));
-          } else if (AGGREGATORS.containsKey(columnName)) {
-            columnMap.put(columnName, createColumn(AGGREGATORS.get(columnName)));
-            aggregatorFactories.add(AGGREGATORS.get(columnName));
+          } else {
+            final Optional<AggregatorFactory> maybeMetric = AGGREGATORS.stream()
+                                                                       .filter(agg -> agg.getName().equals(columnName))
+                                                                       .findAny();
+            if (maybeMetric.isPresent()) {
+              columnMap.put(columnName, createColumn(maybeMetric.get()));
+              aggregatorFactories.add(maybeMetric.get());
+            }
           }
         }
 
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexTaskTest.java
index b07759e..cd05698 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexTaskTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexTaskTest.java
@@ -242,7 +242,7 @@ public class IndexTaskTest
             tmpDir,
             null,
             null,
-            createTuningConfigWithTargetPartitionSize(2, false, true),
+            createTuningConfigWithMaxRowsPerSegment(2, false, true),
             false
         ),
         null,
@@ -288,7 +288,7 @@ public class IndexTaskTest
             tmpDir,
             null,
             null,
-            createTuningConfigWithTargetPartitionSize(2, true, true),
+            createTuningConfigWithMaxRowsPerSegment(2, true, true),
             false
         ),
         null,
@@ -340,7 +340,7 @@ public class IndexTaskTest
                 )
             ),
             null,
-            createTuningConfigWithTargetPartitionSize(2, true, false),
+            createTuningConfigWithMaxRowsPerSegment(2, true, false),
             false
         ),
         null,
@@ -384,7 +384,7 @@ public class IndexTaskTest
                 Granularities.MINUTE,
                 Collections.singletonList(Intervals.of("2014-01-01/2014-01-02"))
             ),
-            createTuningConfigWithTargetPartitionSize(10, false, true),
+            createTuningConfigWithMaxRowsPerSegment(10, false, true),
             false
         ),
         null,
@@ -421,7 +421,7 @@ public class IndexTaskTest
                 Granularities.HOUR,
                 Collections.singletonList(Intervals.of("2014-01-01T08:00:00Z/2014-01-01T09:00:00Z"))
             ),
-            createTuningConfigWithTargetPartitionSize(50, false, true),
+            createTuningConfigWithMaxRowsPerSegment(50, false, true),
             false
         ),
         null,
@@ -567,7 +567,7 @@ public class IndexTaskTest
             tmpDir,
             null,
             null,
-            createTuningConfigWithTargetPartitionSize(2, false, false),
+            createTuningConfigWithMaxRowsPerSegment(2, false, false),
             true
         ),
         null,
@@ -617,7 +617,7 @@ public class IndexTaskTest
                 Granularities.MINUTE,
                 null
             ),
-            createTuningConfigWithTargetPartitionSize(2, false, true),
+            createTuningConfigWithMaxRowsPerSegment(2, false, true),
             false
         ),
         null,
@@ -680,7 +680,7 @@ public class IndexTaskTest
                 0
             ),
             null,
-            createTuningConfigWithTargetPartitionSize(2, false, true),
+            createTuningConfigWithMaxRowsPerSegment(2, false, true),
             false
         ),
         null,
@@ -732,7 +732,7 @@ public class IndexTaskTest
                 0
             ),
             null,
-            createTuningConfigWithTargetPartitionSize(2, false, true),
+            createTuningConfigWithMaxRowsPerSegment(2, false, true),
             false
         ),
         null,
@@ -1042,6 +1042,7 @@ public class IndexTaskTest
     }
 
     final IndexTask.IndexTuningConfig tuningConfig = new IndexTask.IndexTuningConfig(
+        null,
         2,
         null,
         null,
@@ -1164,6 +1165,7 @@ public class IndexTaskTest
 
     // Allow up to 3 parse exceptions, and save up to 2 parse exceptions
     final IndexTask.IndexTuningConfig tuningConfig = new IndexTask.IndexTuningConfig(
+        null,
         2,
         null,
         null,
@@ -1279,6 +1281,7 @@ public class IndexTaskTest
 
     // Allow up to 3 parse exceptions, and save up to 2 parse exceptions
     final IndexTask.IndexTuningConfig tuningConfig = new IndexTask.IndexTuningConfig(
+        null,
         2,
         null,
         null,
@@ -1700,14 +1703,14 @@ public class IndexTaskTest
     );
   }
 
-  private static IndexTuningConfig createTuningConfigWithTargetPartitionSize(
-      int targetPartitionSize,
+  private static IndexTuningConfig createTuningConfigWithMaxRowsPerSegment(
+      int maxRowsPerSegment,
       boolean forceExtendableShardSpecs,
       boolean forceGuaranteedRollup
   )
   {
     return createTuningConfig(
-        targetPartitionSize,
+        maxRowsPerSegment,
         1,
         null,
         null,
@@ -1740,7 +1743,7 @@ public class IndexTaskTest
   }
 
   static IndexTuningConfig createTuningConfig(
-      @Nullable Integer targetPartitionSize,
+      @Nullable Integer maxRowsPerSegment,
       @Nullable Integer maxRowsInMemory,
       @Nullable Long maxBytesInMemory,
       @Nullable Long maxTotalRows,
@@ -1752,7 +1755,8 @@ public class IndexTaskTest
   )
   {
     return new IndexTask.IndexTuningConfig(
-        targetPartitionSize,
+        null,
+        maxRowsPerSegment,
         maxRowsInMemory,
         maxBytesInMemory,
         maxTotalRows,
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TaskSerdeTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TaskSerdeTest.java
index c4b3070..fc1b794 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TaskSerdeTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TaskSerdeTest.java
@@ -105,7 +105,7 @@ public class TaskSerdeTest
     Assert.assertEquals(0, tuningConfig.getMaxPendingPersists());
     Assert.assertEquals(1000000, tuningConfig.getMaxRowsInMemory());
     Assert.assertNull(tuningConfig.getNumShards());
-    Assert.assertNull(tuningConfig.getTargetPartitionSize());
+    Assert.assertNull(tuningConfig.getMaxRowsPerSegment());
   }
 
   @Test
@@ -116,7 +116,22 @@ public class TaskSerdeTest
         IndexTask.IndexTuningConfig.class
     );
 
-    Assert.assertEquals(10, (int) tuningConfig.getTargetPartitionSize());
+    Assert.assertEquals(10, (int) tuningConfig.getMaxRowsPerSegment());
+    Assert.assertNull(tuningConfig.getNumShards());
+
+    tuningConfig = jsonMapper.readValue(
+        "{\"type\":\"index\"}",
+        IndexTask.IndexTuningConfig.class
+    );
+
+    Assert.assertNull(tuningConfig.getMaxRowsPerSegment());
+
+    tuningConfig = jsonMapper.readValue(
+        "{\"type\":\"index\", \"maxRowsPerSegment\":10}",
+        IndexTask.IndexTuningConfig.class
+    );
+
+    Assert.assertEquals(10, (int) tuningConfig.getMaxRowsPerSegment());
     Assert.assertNull(tuningConfig.getNumShards());
 
     tuningConfig = jsonMapper.readValue(
@@ -124,7 +139,7 @@ public class TaskSerdeTest
         IndexTask.IndexTuningConfig.class
     );
 
-    Assert.assertNull(tuningConfig.getTargetPartitionSize());
+    Assert.assertNull(tuningConfig.getMaxRowsPerSegment());
     Assert.assertEquals(10, (int) tuningConfig.getNumShards());
 
     tuningConfig = jsonMapper.readValue(
@@ -132,7 +147,7 @@ public class TaskSerdeTest
         IndexTask.IndexTuningConfig.class
     );
 
-    Assert.assertNull(tuningConfig.getTargetPartitionSize());
+    Assert.assertNull(tuningConfig.getMaxRowsPerSegment());
     Assert.assertEquals(10, (int) tuningConfig.getNumShards());
 
     tuningConfig = jsonMapper.readValue(
@@ -141,7 +156,7 @@ public class TaskSerdeTest
     );
 
     Assert.assertNull(tuningConfig.getNumShards());
-    Assert.assertEquals(10, (int) tuningConfig.getTargetPartitionSize());
+    Assert.assertEquals(10, (int) tuningConfig.getMaxRowsPerSegment());
 
     tuningConfig = jsonMapper.readValue(
         "{\"type\":\"index\", \"targetPartitionSize\":-1, \"numShards\":-1}",
@@ -149,7 +164,7 @@ public class TaskSerdeTest
     );
 
     Assert.assertNull(tuningConfig.getNumShards());
-    Assert.assertNull(tuningConfig.getTargetPartitionSize());
+    Assert.assertNull(tuningConfig.getMaxRowsPerSegment());
   }
 
   @Test
@@ -184,6 +199,7 @@ public class TaskSerdeTest
             ),
             new IndexIOConfig(new LocalFirehoseFactory(new File("lol"), "rofl", null), true),
             new IndexTuningConfig(
+                null,
                 10000,
                 10,
                 null,
@@ -241,7 +257,7 @@ public class TaskSerdeTest
     Assert.assertEquals(taskTuningConfig.getMaxPendingPersists(), task2TuningConfig.getMaxPendingPersists());
     Assert.assertEquals(taskTuningConfig.getMaxRowsInMemory(), task2TuningConfig.getMaxRowsInMemory());
     Assert.assertEquals(taskTuningConfig.getNumShards(), task2TuningConfig.getNumShards());
-    Assert.assertEquals(taskTuningConfig.getTargetPartitionSize(), task2TuningConfig.getTargetPartitionSize());
+    Assert.assertEquals(taskTuningConfig.getMaxRowsPerSegment(), task2TuningConfig.getMaxRowsPerSegment());
     Assert.assertEquals(
         taskTuningConfig.isForceExtendableShardSpecs(),
         task2TuningConfig.isForceExtendableShardSpecs()
@@ -270,6 +286,7 @@ public class TaskSerdeTest
             ),
             new IndexIOConfig(new LocalFirehoseFactory(new File("lol"), "rofl", null), true),
             new IndexTuningConfig(
+                null,
                 10000,
                 10,
                 null,
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskResourceTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskResourceTest.java
index ff3caed..cc3ca97 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskResourceTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskResourceTest.java
@@ -430,6 +430,7 @@ public class ParallelIndexSupervisorTaskResourceTest extends AbstractParallelInd
             null,
             null,
             null,
+            null,
             NUM_SUB_TASKS,
             null,
             null,
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskSerdeTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskSerdeTest.java
index dd90963..0fa747f 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskSerdeTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskSerdeTest.java
@@ -138,6 +138,7 @@ public class ParallelIndexSupervisorTaskSerdeTest
             null,
             null,
             null,
+            null,
             2,
             null,
             null,
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskTest.java
index 2f35901..1d25f73 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskTest.java
@@ -246,6 +246,7 @@ public class ParallelIndexSupervisorTaskTest extends AbstractParallelIndexSuperv
             null,
             null,
             null,
+            null,
             2,
             null,
             null,
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java
index 0f1a2f6..66f0ec9 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java
@@ -687,6 +687,7 @@ public class TaskLifecycleTest
             ),
             new IndexIOConfig(new MockFirehoseFactory(false), false),
             new IndexTuningConfig(
+                null,
                 10000,
                 10,
                 null,
@@ -768,6 +769,7 @@ public class TaskLifecycleTest
             ),
             new IndexIOConfig(new MockExceptionalFirehoseFactory(), false),
             new IndexTuningConfig(
+                null,
                 10000,
                 10,
                 null,
@@ -1156,6 +1158,7 @@ public class TaskLifecycleTest
             ),
             new IndexIOConfig(new MockFirehoseFactory(false), false),
             new IndexTuningConfig(
+                null,
                 10000,
                 10,
                 null,
diff --git a/integration-tests/src/test/resources/indexer/wikipedia_index_task.json b/integration-tests/src/test/resources/indexer/wikipedia_index_task.json
index 8b3eab8..819caae 100644
--- a/integration-tests/src/test/resources/indexer/wikipedia_index_task.json
+++ b/integration-tests/src/test/resources/indexer/wikipedia_index_task.json
@@ -64,7 +64,7 @@
         },
         "tuningConfig": {
             "type": "index",
-            "targetPartitionSize": 3
+            "maxRowsPerSegment": 3
         }
     }
 }
\ No newline at end of file
diff --git a/server/src/main/java/org/apache/druid/client/indexing/ClientCompactQuery.java b/server/src/main/java/org/apache/druid/client/indexing/ClientCompactQuery.java
index eec9004..c87ff90 100644
--- a/server/src/main/java/org/apache/druid/client/indexing/ClientCompactQuery.java
+++ b/server/src/main/java/org/apache/druid/client/indexing/ClientCompactQuery.java
@@ -23,6 +23,7 @@ import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import org.apache.druid.timeline.DataSegment;
 
+import javax.annotation.Nullable;
 import java.util.List;
 import java.util.Map;
 
@@ -31,6 +32,7 @@ public class ClientCompactQuery implements ClientQuery
   private final String dataSource;
   private final List<DataSegment> segments;
   private final boolean keepSegmentGranularity;
+  @Nullable
   private final Long targetCompactionSizeBytes;
   private final ClientCompactQueryTuningConfig tuningConfig;
   private final Map<String, Object> context;
@@ -40,7 +42,7 @@ public class ClientCompactQuery implements ClientQuery
       @JsonProperty("dataSource") String dataSource,
       @JsonProperty("segments") List<DataSegment> segments,
       @JsonProperty("keepSegmentGranularity") boolean keepSegmentGranularity,
-      @JsonProperty("targetCompactionSizeBytes") Long targetCompactionSizeBytes,
+      @JsonProperty("targetCompactionSizeBytes") @Nullable Long targetCompactionSizeBytes,
       @JsonProperty("tuningConfig") ClientCompactQueryTuningConfig tuningConfig,
       @JsonProperty("context") Map<String, Object> context
   )
@@ -80,6 +82,7 @@ public class ClientCompactQuery implements ClientQuery
   }
 
   @JsonProperty
+  @Nullable
   public Long getTargetCompactionSizeBytes()
   {
     return targetCompactionSizeBytes;
@@ -96,17 +99,4 @@ public class ClientCompactQuery implements ClientQuery
   {
     return context;
   }
-
-  @Override
-  public String toString()
-  {
-    return "ClientCompactQuery{" +
-           "dataSource='" + dataSource + '\'' +
-           ", segments=" + segments +
-           ", keepSegmentGranularity=" + keepSegmentGranularity +
-           ", targetCompactionSizeBytes=" + targetCompactionSizeBytes +
-           ", tuningConfig=" + tuningConfig +
-           ", context=" + context +
-           '}';
-  }
 }
diff --git a/server/src/main/java/org/apache/druid/client/indexing/ClientCompactQueryTuningConfig.java b/server/src/main/java/org/apache/druid/client/indexing/ClientCompactQueryTuningConfig.java
index a15d7fe..9bae161 100644
--- a/server/src/main/java/org/apache/druid/client/indexing/ClientCompactQueryTuningConfig.java
+++ b/server/src/main/java/org/apache/druid/client/indexing/ClientCompactQueryTuningConfig.java
@@ -22,6 +22,7 @@ package org.apache.druid.client.indexing;
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import org.apache.druid.segment.IndexSpec;
+import org.apache.druid.server.coordinator.DataSourceCompactionConfig.UserCompactTuningConfig;
 
 import javax.annotation.Nullable;
 import java.util.Objects;
@@ -33,28 +34,47 @@ public class ClientCompactQueryTuningConfig
   private static final int DEFAULT_MAX_TOTAL_ROWS = 20_000_000;
   private static final IndexSpec DEFAULT_INDEX_SPEC = new IndexSpec();
   private static final int DEFAULT_MAX_PENDING_PERSISTS = 0;
-  private static final long DEFAULT_PUBLISH_TIMEOUT = 0;
+  private static final long DEFAULT_PUSH_TIMEOUT = 0;
 
+  @Nullable
+  private final Integer maxRowsPerSegment;
   private final int maxRowsInMemory;
   private final int maxTotalRows;
   private final IndexSpec indexSpec;
   private final int maxPendingPersists;
-  private final long publishTimeout;
+  private final long pushTimeout;
+
+  public static ClientCompactQueryTuningConfig from(
+      @Nullable UserCompactTuningConfig userCompactTuningConfig,
+      @Nullable Integer maxRowsPerSegment
+  )
+  {
+    return new ClientCompactQueryTuningConfig(
+        maxRowsPerSegment,
+        userCompactTuningConfig == null ? null : userCompactTuningConfig.getMaxRowsInMemory(),
+        userCompactTuningConfig == null ? null : userCompactTuningConfig.getMaxTotalRows(),
+        userCompactTuningConfig == null ? null : userCompactTuningConfig.getIndexSpec(),
+        userCompactTuningConfig == null ? null : userCompactTuningConfig.getMaxPendingPersists(),
+        userCompactTuningConfig == null ? null : userCompactTuningConfig.getPushTimeout()
+    );
+  }
 
   @JsonCreator
   public ClientCompactQueryTuningConfig(
+      @JsonProperty("maxRowsPerSegment") @Nullable Integer maxRowsPerSegment,
       @JsonProperty("maxRowsInMemory") @Nullable Integer maxRowsInMemory,
       @JsonProperty("maxTotalRows") @Nullable Integer maxTotalRows,
       @JsonProperty("indexSpec") @Nullable IndexSpec indexSpec,
       @JsonProperty("maxPendingPersists") @Nullable Integer maxPendingPersists,
-      @JsonProperty("publishTimeout") @Nullable Long publishTimeout
+      @JsonProperty("pushTimeout") @Nullable Long pushTimeout
   )
   {
+    this.maxRowsPerSegment = maxRowsPerSegment;
     this.maxRowsInMemory = maxRowsInMemory == null ? DEFAULT_MAX_ROWS_IN_MEMORY : maxRowsInMemory;
     this.maxTotalRows = maxTotalRows == null ? DEFAULT_MAX_TOTAL_ROWS : maxTotalRows;
     this.indexSpec = indexSpec == null ? DEFAULT_INDEX_SPEC : indexSpec;
     this.maxPendingPersists = maxPendingPersists == null ? DEFAULT_MAX_PENDING_PERSISTS : maxPendingPersists;
-    this.publishTimeout = publishTimeout == null ? DEFAULT_PUBLISH_TIMEOUT : publishTimeout;
+    this.pushTimeout = pushTimeout == null ? DEFAULT_PUSH_TIMEOUT : pushTimeout;
   }
 
   @JsonProperty
@@ -64,6 +84,13 @@ public class ClientCompactQueryTuningConfig
   }
 
   @JsonProperty
+  @Nullable
+  public Integer getMaxRowsPerSegment()
+  {
+    return maxRowsPerSegment;
+  }
+
+  @JsonProperty
   public int getMaxRowsInMemory()
   {
     return maxRowsInMemory;
@@ -88,9 +115,9 @@ public class ClientCompactQueryTuningConfig
   }
 
   @JsonProperty
-  public long getPublishTimeout()
+  public long getPushTimeout()
   {
-    return publishTimeout;
+    return pushTimeout;
   }
 
   @Override
@@ -99,47 +126,41 @@ public class ClientCompactQueryTuningConfig
     if (this == o) {
       return true;
     }
-
     if (o == null || getClass() != o.getClass()) {
       return false;
     }
-
-    final ClientCompactQueryTuningConfig that = (ClientCompactQueryTuningConfig) o;
-
-    if (maxRowsInMemory != that.maxRowsInMemory) {
-      return false;
-    }
-
-    if (maxTotalRows != that.maxTotalRows) {
-      return false;
-    }
-
-    if (!indexSpec.equals(that.indexSpec)) {
-      return false;
-    }
-
-    if (maxPendingPersists != that.maxPendingPersists) {
-      return false;
-    }
-
-    return publishTimeout == that.publishTimeout;
+    ClientCompactQueryTuningConfig that = (ClientCompactQueryTuningConfig) o;
+    return maxRowsInMemory == that.maxRowsInMemory &&
+           maxTotalRows == that.maxTotalRows &&
+           maxPendingPersists == that.maxPendingPersists &&
+           pushTimeout == that.pushTimeout &&
+           Objects.equals(maxRowsPerSegment, that.maxRowsPerSegment) &&
+           Objects.equals(indexSpec, that.indexSpec);
   }
 
   @Override
   public int hashCode()
   {
-    return Objects.hash(maxRowsInMemory, maxTotalRows, indexSpec, maxPendingPersists, publishTimeout);
+    return Objects.hash(
+        maxRowsPerSegment,
+        maxRowsInMemory,
+        maxTotalRows,
+        indexSpec,
+        maxPendingPersists,
+        pushTimeout
+    );
   }
 
   @Override
   public String toString()
   {
     return "ClientCompactQueryTuningConfig{" +
-           "maxRowsInMemory='" + maxRowsInMemory +
-           ", maxTotalRows='" + maxTotalRows +
-           ", indexSpec='" + indexSpec +
-           ", maxPendingPersists='" + maxPendingPersists +
-           ", publishTimeout='" + publishTimeout +
-           "}";
+           "maxRowsPerSegment=" + maxRowsPerSegment +
+           ", maxRowsInMemory=" + maxRowsInMemory +
+           ", maxTotalRows=" + maxTotalRows +
+           ", indexSpec=" + indexSpec +
+           ", maxPendingPersists=" + maxPendingPersists +
+           ", pushTimeout=" + pushTimeout +
+           '}';
   }
 }
diff --git a/server/src/main/java/org/apache/druid/client/indexing/HttpIndexingServiceClient.java b/server/src/main/java/org/apache/druid/client/indexing/HttpIndexingServiceClient.java
index b8960c0..d31ef1a 100644
--- a/server/src/main/java/org/apache/druid/client/indexing/HttpIndexingServiceClient.java
+++ b/server/src/main/java/org/apache/druid/client/indexing/HttpIndexingServiceClient.java
@@ -91,7 +91,7 @@ public class HttpIndexingServiceClient implements IndexingServiceClient
   public String compactSegments(
       List<DataSegment> segments,
       boolean keepSegmentGranularity,
-      long targetCompactionSizeBytes,
+      @Nullable Long targetCompactionSizeBytes,
       int compactionTaskPriority,
       @Nullable ClientCompactQueryTuningConfig tuningConfig,
       @Nullable Map<String, Object> context
diff --git a/server/src/main/java/org/apache/druid/client/indexing/IndexingServiceClient.java b/server/src/main/java/org/apache/druid/client/indexing/IndexingServiceClient.java
index 8a54bb4..905b810 100644
--- a/server/src/main/java/org/apache/druid/client/indexing/IndexingServiceClient.java
+++ b/server/src/main/java/org/apache/druid/client/indexing/IndexingServiceClient.java
@@ -39,7 +39,7 @@ public interface IndexingServiceClient
   String compactSegments(
       List<DataSegment> segments,
       boolean keepSegmentGranularity,
-      long targetCompactionSizeBytes,
+      @Nullable Long targetCompactionSizeBytes,
       int compactionTaskPriority,
       @Nullable ClientCompactQueryTuningConfig tuningConfig,
       @Nullable Map<String, Object> context
diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorConfig.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorConfig.java
index cefc421..1e81792 100644
--- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorConfig.java
+++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorConfig.java
@@ -45,7 +45,8 @@ public interface AppenderatorConfig
   /**
    * Maximum number of rows in a single segment before pushing to deep storage
    */
-  default int getMaxRowsPerSegment()
+  @Nullable
+  default Integer getMaxRowsPerSegment()
   {
     return Integer.MAX_VALUE;
   }
diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorDriverAddResult.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorDriverAddResult.java
index e80658e..7b45479 100644
--- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorDriverAddResult.java
+++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorDriverAddResult.java
@@ -102,10 +102,17 @@ public class AppenderatorDriverAddResult
 
   public boolean isPushRequired(AppenderatorConfig tuningConfig)
   {
-    boolean overThreshold = getNumRowsInSegment() >= tuningConfig.getMaxRowsPerSegment();
-    Long maxTotal = tuningConfig.getMaxTotalRows();
-    if (maxTotal != null) {
-      overThreshold |= getTotalNumRowsInAppenderator() >= maxTotal;
+    return isPushRequired(tuningConfig.getMaxRowsPerSegment(), tuningConfig.getMaxTotalRows());
+  }
+
+  public boolean isPushRequired(@Nullable Integer maxRowsPerSegment, @Nullable Long maxTotalRows)
+  {
+    boolean overThreshold = false;
+    if (maxRowsPerSegment != null) {
+      overThreshold = getNumRowsInSegment() >= maxRowsPerSegment;
+    }
+    if (maxTotalRows != null) {
+      overThreshold |= getTotalNumRowsInAppenderator() >= maxTotalRows;
     }
     return overThreshold;
   }
diff --git a/server/src/main/java/org/apache/druid/server/coordinator/DataSourceCompactionConfig.java b/server/src/main/java/org/apache/druid/server/coordinator/DataSourceCompactionConfig.java
index 8a56e31..ef33fb0 100644
--- a/server/src/main/java/org/apache/druid/server/coordinator/DataSourceCompactionConfig.java
+++ b/server/src/main/java/org/apache/druid/server/coordinator/DataSourceCompactionConfig.java
@@ -20,9 +20,11 @@
 package org.apache.druid.server.coordinator;
 
 import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnore;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.google.common.base.Preconditions;
 import org.apache.druid.client.indexing.ClientCompactQueryTuningConfig;
+import org.apache.druid.segment.IndexSpec;
 import org.joda.time.Period;
 
 import javax.annotation.Nullable;
@@ -44,27 +46,35 @@ public class DataSourceCompactionConfig
   private final boolean keepSegmentGranularity;
   private final int taskPriority;
   private final long inputSegmentSizeBytes;
-  private final long targetCompactionSizeBytes;
+  @Nullable
+  private final Long targetCompactionSizeBytes;
   // The number of input segments is limited because the byte size of a serialized task spec is limited by
   // RemoteTaskRunnerConfig.maxZnodeBytes.
+  @Nullable
+  private final Integer maxRowsPerSegment;
   private final int maxNumSegmentsToCompact;
   private final Period skipOffsetFromLatest;
-  private final ClientCompactQueryTuningConfig tuningConfig;
+  private final UserCompactTuningConfig tuningConfig;
   private final Map<String, Object> taskContext;
 
   @JsonCreator
   public DataSourceCompactionConfig(
       @JsonProperty("dataSource") String dataSource,
-      @JsonProperty("keepSegmentGranularity") Boolean keepSegmentGranularity,
+      @JsonProperty("keepSegmentGranularity") @Nullable Boolean keepSegmentGranularity,
       @JsonProperty("taskPriority") @Nullable Integer taskPriority,
       @JsonProperty("inputSegmentSizeBytes") @Nullable Long inputSegmentSizeBytes,
       @JsonProperty("targetCompactionSizeBytes") @Nullable Long targetCompactionSizeBytes,
+      @JsonProperty("maxRowsPerSegment") @Nullable Integer maxRowsPerSegment,
       @JsonProperty("maxNumSegmentsToCompact") @Nullable Integer maxNumSegmentsToCompact,
       @JsonProperty("skipOffsetFromLatest") @Nullable Period skipOffsetFromLatest,
-      @JsonProperty("tuningConfig") @Nullable ClientCompactQueryTuningConfig tuningConfig,
+      @JsonProperty("tuningConfig") @Nullable UserCompactTuningConfig tuningConfig,
       @JsonProperty("taskContext") @Nullable Map<String, Object> taskContext
   )
   {
+    Preconditions.checkArgument(
+        targetCompactionSizeBytes == null || maxRowsPerSegment == null,
+        "targetCompactionSizeBytes and maxRowsPerSegment in tuningConfig can't be used together"
+    );
     this.dataSource = Preconditions.checkNotNull(dataSource, "dataSource");
     this.keepSegmentGranularity = keepSegmentGranularity == null
                                   ? DEFAULT_KEEP_SEGMENT_GRANULARITY
@@ -75,9 +85,12 @@ public class DataSourceCompactionConfig
     this.inputSegmentSizeBytes = inputSegmentSizeBytes == null
                                  ? DEFAULT_INPUT_SEGMENT_SIZE_BYTES
                                  : inputSegmentSizeBytes;
-    this.targetCompactionSizeBytes = targetCompactionSizeBytes == null
-                                     ? DEFAULT_TARGET_COMPACTION_SIZE_BYTES
-                                     : targetCompactionSizeBytes;
+    if (targetCompactionSizeBytes == null && maxRowsPerSegment == null) {
+      this.targetCompactionSizeBytes = DEFAULT_TARGET_COMPACTION_SIZE_BYTES;
+    } else {
+      this.targetCompactionSizeBytes = targetCompactionSizeBytes;
+    }
+    this.maxRowsPerSegment = maxRowsPerSegment;
     this.maxNumSegmentsToCompact = maxNumSegmentsToCompact == null
                                    ? DEFAULT_NUM_INPUT_SEGMENTS
                                    : maxNumSegmentsToCompact;
@@ -122,12 +135,20 @@ public class DataSourceCompactionConfig
   }
 
   @JsonProperty
-  public long getTargetCompactionSizeBytes()
+  @Nullable
+  public Long getTargetCompactionSizeBytes()
   {
     return targetCompactionSizeBytes;
   }
 
   @JsonProperty
+  @Nullable
+  public Integer getMaxRowsPerSegment()
+  {
+    return maxRowsPerSegment;
+  }
+
+  @JsonProperty
   public Period getSkipOffsetFromLatest()
   {
     return skipOffsetFromLatest;
@@ -135,7 +156,7 @@ public class DataSourceCompactionConfig
 
   @JsonProperty
   @Nullable
-  public ClientCompactQueryTuningConfig getTuningConfig()
+  public UserCompactTuningConfig getTuningConfig()
   {
     return tuningConfig;
   }
@@ -150,49 +171,22 @@ public class DataSourceCompactionConfig
   @Override
   public boolean equals(Object o)
   {
-    if (o == this) {
+    if (this == o) {
       return true;
     }
-
     if (o == null || getClass() != o.getClass()) {
       return false;
     }
-
-    final DataSourceCompactionConfig that = (DataSourceCompactionConfig) o;
-
-    if (!dataSource.equals(that.dataSource)) {
-      return false;
-    }
-
-    if (keepSegmentGranularity != that.keepSegmentGranularity) {
-      return false;
-    }
-
-    if (taskPriority != that.taskPriority) {
-      return false;
-    }
-
-    if (inputSegmentSizeBytes != that.inputSegmentSizeBytes) {
-      return false;
-    }
-
-    if (maxNumSegmentsToCompact != that.maxNumSegmentsToCompact) {
-      return false;
-    }
-
-    if (targetCompactionSizeBytes != that.targetCompactionSizeBytes) {
-      return false;
-    }
-
-    if (!skipOffsetFromLatest.equals(that.skipOffsetFromLatest)) {
-      return false;
-    }
-
-    if (!Objects.equals(tuningConfig, that.tuningConfig)) {
-      return false;
-    }
-
-    return Objects.equals(taskContext, that.taskContext);
+    DataSourceCompactionConfig that = (DataSourceCompactionConfig) o;
+    return keepSegmentGranularity == that.keepSegmentGranularity &&
+           taskPriority == that.taskPriority &&
+           inputSegmentSizeBytes == that.inputSegmentSizeBytes &&
+           maxNumSegmentsToCompact == that.maxNumSegmentsToCompact &&
+           Objects.equals(dataSource, that.dataSource) &&
+           Objects.equals(targetCompactionSizeBytes, that.targetCompactionSizeBytes) &&
+           Objects.equals(skipOffsetFromLatest, that.skipOffsetFromLatest) &&
+           Objects.equals(tuningConfig, that.tuningConfig) &&
+           Objects.equals(taskContext, that.taskContext);
   }
 
   @Override
@@ -203,11 +197,34 @@ public class DataSourceCompactionConfig
         keepSegmentGranularity,
         taskPriority,
         inputSegmentSizeBytes,
-        maxNumSegmentsToCompact,
         targetCompactionSizeBytes,
+        maxNumSegmentsToCompact,
         skipOffsetFromLatest,
         tuningConfig,
         taskContext
     );
   }
+
+  public static class UserCompactTuningConfig extends ClientCompactQueryTuningConfig
+  {
+    @JsonCreator
+    public UserCompactTuningConfig(
+        @JsonProperty("maxRowsInMemory") @Nullable Integer maxRowsInMemory,
+        @JsonProperty("maxTotalRows") @Nullable Integer maxTotalRows,
+        @JsonProperty("indexSpec") @Nullable IndexSpec indexSpec,
+        @JsonProperty("maxPendingPersists") @Nullable Integer maxPendingPersists,
+        @JsonProperty("pushTimeout") @Nullable Long pushTimeout
+    )
+    {
+      super(null, maxRowsInMemory, maxTotalRows, indexSpec, maxPendingPersists, pushTimeout);
+    }
+
+    @Override
+    @Nullable
+    @JsonIgnore
+    public Integer getMaxRowsPerSegment()
+    {
+      throw new UnsupportedOperationException();
+    }
+  }
 }
diff --git a/server/src/main/java/org/apache/druid/server/coordinator/helper/DruidCoordinatorSegmentCompactor.java b/server/src/main/java/org/apache/druid/server/coordinator/helper/DruidCoordinatorSegmentCompactor.java
index ec189c0..937f5b0 100644
--- a/server/src/main/java/org/apache/druid/server/coordinator/helper/DruidCoordinatorSegmentCompactor.java
+++ b/server/src/main/java/org/apache/druid/server/coordinator/helper/DruidCoordinatorSegmentCompactor.java
@@ -22,6 +22,7 @@ package org.apache.druid.server.coordinator.helper;
 import com.google.inject.Inject;
 import it.unimi.dsi.fastutil.objects.Object2LongMap;
 import org.apache.druid.client.indexing.ClientCompactQuery;
+import org.apache.druid.client.indexing.ClientCompactQueryTuningConfig;
 import org.apache.druid.client.indexing.IndexingServiceClient;
 import org.apache.druid.client.indexing.TaskPayloadResponse;
 import org.apache.druid.indexer.TaskStatusPlus;
@@ -179,14 +180,13 @@ public class DruidCoordinatorSegmentCompactor implements DruidCoordinatorHelper
 
       if (segmentsToCompact.size() > 1) {
         final DataSourceCompactionConfig config = compactionConfigs.get(dataSourceName);
-        // Currently set keepSegmentGranularity to false because it breaks the algorithm of CompactionSegmentIterator to
-        // find segments to be compacted.
+        // make tuningConfig
         final String taskId = indexingServiceClient.compactSegments(
             segmentsToCompact,
             config.isKeepSegmentGranularity(),
             config.getTargetCompactionSizeBytes(),
             config.getTaskPriority(),
-            config.getTuningConfig(),
+            ClientCompactQueryTuningConfig.from(config.getTuningConfig(), config.getMaxRowsPerSegment()),
             config.getTaskContext()
         );
         LOG.info(
diff --git a/server/src/main/java/org/apache/druid/server/coordinator/helper/NewestSegmentFirstIterator.java b/server/src/main/java/org/apache/druid/server/coordinator/helper/NewestSegmentFirstIterator.java
index 8c0d80d..6a09f5b 100644
--- a/server/src/main/java/org/apache/druid/server/coordinator/helper/NewestSegmentFirstIterator.java
+++ b/server/src/main/java/org/apache/druid/server/coordinator/helper/NewestSegmentFirstIterator.java
@@ -303,7 +303,7 @@ public class NewestSegmentFirstIterator implements CompactionSegmentIterator
             segmentsToCompact.clear();
             log.warn(
                 "The number of segments[%d] for dataSource[%s] and interval[%s] is larger than "
-                + "numTargetCompactSegments[%d]. If you see lots of shards are being skipped due to too many "
+                + "maxNumSegmentsToCompact[%d]. If you see lots of shards are being skipped due to too many "
                 + "segments, consider increasing 'numTargetCompactionSegments' and "
                 + "'druid.indexer.runner.maxZnodeBytes'. Continue to the next shard.",
                 chunks.size(),
diff --git a/server/src/test/java/org/apache/druid/client/indexing/NoopIndexingServiceClient.java b/server/src/test/java/org/apache/druid/client/indexing/NoopIndexingServiceClient.java
index 01f84df..91e1a8a 100644
--- a/server/src/test/java/org/apache/druid/client/indexing/NoopIndexingServiceClient.java
+++ b/server/src/test/java/org/apache/druid/client/indexing/NoopIndexingServiceClient.java
@@ -52,7 +52,7 @@ public class NoopIndexingServiceClient implements IndexingServiceClient
   public String compactSegments(
       List<DataSegment> segments,
       boolean keepSegmentGranularity,
-      long targetCompactionSizeBytes,
+      @Nullable Long targetCompactionSizeBytes,
       int compactionTaskPriority,
       @Nullable ClientCompactQueryTuningConfig tuningConfig,
       @Nullable Map<String, Object> context
diff --git a/server/src/test/java/org/apache/druid/server/coordinator/DataSourceCompactionConfigTest.java b/server/src/test/java/org/apache/druid/server/coordinator/DataSourceCompactionConfigTest.java
new file mode 100644
index 0000000..eae6043
--- /dev/null
+++ b/server/src/test/java/org/apache/druid/server/coordinator/DataSourceCompactionConfigTest.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.server.coordinator;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.ImmutableMap;
+import org.apache.druid.jackson.DefaultObjectMapper;
+import org.apache.druid.server.coordinator.DataSourceCompactionConfig.UserCompactTuningConfig;
+import org.joda.time.Period;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.IOException;
+
+public class DataSourceCompactionConfigTest
+{
+  private static final ObjectMapper objectMapper = new DefaultObjectMapper();
+
+  @Test
+  public void testSerdeBasic() throws IOException
+  {
+    final DataSourceCompactionConfig config = new DataSourceCompactionConfig(
+        "dataSource",
+        null,
+        null,
+        500L,
+        100L,
+        null,
+        20,
+        new Period(3600),
+        null,
+        ImmutableMap.of("key", "val")
+    );
+    final String json = objectMapper.writeValueAsString(config);
+    final DataSourceCompactionConfig fromJson = objectMapper.readValue(json, DataSourceCompactionConfig.class);
+
+    Assert.assertEquals(config.getDataSource(), fromJson.getDataSource());
+    Assert.assertTrue(fromJson.isKeepSegmentGranularity());
+    Assert.assertEquals(25, fromJson.getTaskPriority());
+    Assert.assertEquals(config.getInputSegmentSizeBytes(), fromJson.getInputSegmentSizeBytes());
+    Assert.assertEquals(config.getTargetCompactionSizeBytes(), fromJson.getTargetCompactionSizeBytes());
+    Assert.assertEquals(config.getMaxNumSegmentsToCompact(), fromJson.getMaxNumSegmentsToCompact());
+    Assert.assertEquals(config.getSkipOffsetFromLatest(), fromJson.getSkipOffsetFromLatest());
+    Assert.assertEquals(config.getTuningConfig(), fromJson.getTuningConfig());
+    Assert.assertEquals(config.getTaskContext(), fromJson.getTaskContext());
+  }
+
+  @Test
+  public void testSerdeWithMaxRowsPerSegment() throws IOException
+  {
+    final DataSourceCompactionConfig config = new DataSourceCompactionConfig(
+        "dataSource",
+        null,
+        null,
+        500L,
+        null,
+        30,
+        20,
+        new Period(3600),
+        null,
+        ImmutableMap.of("key", "val")
+    );
+    final String json = objectMapper.writeValueAsString(config);
+    final DataSourceCompactionConfig fromJson = objectMapper.readValue(json, DataSourceCompactionConfig.class);
+
+    Assert.assertEquals(config.getDataSource(), fromJson.getDataSource());
+    Assert.assertTrue(fromJson.isKeepSegmentGranularity());
+    Assert.assertEquals(25, fromJson.getTaskPriority());
+    Assert.assertEquals(config.getInputSegmentSizeBytes(), fromJson.getInputSegmentSizeBytes());
+    Assert.assertNull(fromJson.getTargetCompactionSizeBytes());
+    Assert.assertEquals(config.getMaxNumSegmentsToCompact(), fromJson.getMaxNumSegmentsToCompact());
+    Assert.assertEquals(config.getSkipOffsetFromLatest(), fromJson.getSkipOffsetFromLatest());
+    Assert.assertEquals(config.getTuningConfig(), fromJson.getTuningConfig());
+    Assert.assertEquals(config.getTaskContext(), fromJson.getTaskContext());
+  }
+
+  @Test
+  public void testSerdeUserCompactTuningConfig() throws IOException
+  {
+    final UserCompactTuningConfig config = new UserCompactTuningConfig(null, null, null, null, null);
+    final String json = objectMapper.writeValueAsString(config);
+    // Check maxRowsPerSegment doesn't exist in the JSON string
+    Assert.assertFalse(json.contains("maxRowsPerSegment"));
+    final UserCompactTuningConfig fromJson = objectMapper.readValue(json, UserCompactTuningConfig.class);
+    Assert.assertEquals(config, fromJson);
+  }
+}
diff --git a/server/src/test/java/org/apache/druid/server/coordinator/helper/DruidCoordinatorSegmentCompactorTest.java b/server/src/test/java/org/apache/druid/server/coordinator/helper/DruidCoordinatorSegmentCompactorTest.java
index 42518ea..711c181 100644
--- a/server/src/test/java/org/apache/druid/server/coordinator/helper/DruidCoordinatorSegmentCompactorTest.java
+++ b/server/src/test/java/org/apache/druid/server/coordinator/helper/DruidCoordinatorSegmentCompactorTest.java
@@ -45,6 +45,7 @@ import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 
+import javax.annotation.Nullable;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
@@ -65,7 +66,7 @@ public class DruidCoordinatorSegmentCompactorTest
     public String compactSegments(
         List<DataSegment> segments,
         boolean keepSegmentGranularity,
-        long targetCompactionSizeBytes,
+        @Nullable Long targetCompactionSizeBytes,
         int compactionTaskPriority,
         ClientCompactQueryTuningConfig tuningConfig,
         Map<String, Object> context
@@ -463,6 +464,7 @@ public class DruidCoordinatorSegmentCompactorTest
               50L,
               50L,
               null,
+              null,
               new Period("PT1H"), // smaller than segment interval
               null,
               null
diff --git a/server/src/test/java/org/apache/druid/server/coordinator/helper/NewestSegmentFirstPolicyTest.java b/server/src/test/java/org/apache/druid/server/coordinator/helper/NewestSegmentFirstPolicyTest.java
index 8fe3e37..6c6722f 100644
--- a/server/src/test/java/org/apache/druid/server/coordinator/helper/NewestSegmentFirstPolicyTest.java
+++ b/server/src/test/java/org/apache/druid/server/coordinator/helper/NewestSegmentFirstPolicyTest.java
@@ -689,6 +689,7 @@ public class NewestSegmentFirstPolicyTest
         0,
         targetCompactionSizeBytes,
         targetCompactionSizeBytes,
+        null,
         numTargetCompactionSegments,
         skipOffsetFromLatest,
         null,


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org