You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by ji...@apache.org on 2019/07/31 00:24:48 UTC

[incubator-druid] branch master updated: Use PartitionsSpec for all task types (#8141)

This is an automated email from the ASF dual-hosted git repository.

jihoonson pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 385f492  Use PartitionsSpec for all task types (#8141)
385f492 is described below

commit 385f492a555add279a8c6dd368954fde18c41dcb
Author: Jihoon Son <ji...@apache.org>
AuthorDate: Tue Jul 30 17:24:39 2019 -0700

    Use PartitionsSpec for all task types (#8141)
    
    * Use partitionsSpec for all task types
    
    * fix doc
    
    * fix typos and revert to use isPushRequired
    
    * address comments
    
    * move partitionsSpec to core
    
    * remove hadoopPartitionsSpec
---
 .../partitions/DimensionBasedPartitionsSpec.java   |  28 +-
 .../indexer/partitions/DynamicPartitionsSpec.java  |  97 ++++++
 .../indexer/partitions/HashedPartitionsSpec.java   | 179 ++++++++++
 .../druid/indexer/partitions/PartitionsSpec.java   |  70 ++++
 .../partitions/SingleDimensionPartitionsSpec.java  | 147 +++++++++
 docs/content/ingestion/hadoop.md                   |  14 +-
 docs/content/ingestion/native_tasks.md             |  50 ++-
 .../kafka/KafkaIndexTaskTuningConfigTest.java      |   3 +-
 indexing-hadoop/pom.xml                            |  12 +-
 .../druid/indexer/DeterminePartitionsJob.java      |  11 +-
 .../HadoopDruidDetermineConfigurationJob.java      |  40 ++-
 .../druid/indexer/HadoopDruidIndexerConfig.java    |  16 +-
 .../apache/druid/indexer/HadoopTuningConfig.java   |  10 +-
 .../apache/druid/indexer/IndexingHadoopModule.java |   2 +-
 .../indexer/partitions/AbstractPartitionsSpec.java |  87 -----
 .../indexer/partitions/HashedPartitionsSpec.java   |  70 ----
 .../druid/indexer/partitions/PartitionsSpec.java   |  58 ----
 .../partitions/SingleDimensionPartitionsSpec.java  |  69 ----
 .../indexer/DetermineHashedPartitionsJobTest.java  |  14 +-
 .../druid/indexer/DeterminePartitionsJobTest.java  |  10 +-
 .../druid/indexer/HadoopIngestionSpecTest.java     |  30 +-
 .../partitions/HashedPartitionsSpecTest.java       |  82 ++---
 .../index/RealtimeAppenderatorTuningConfig.java    |  22 +-
 .../task/AppenderatorDriverRealtimeIndexTask.java  |   6 +-
 .../druid/indexing/common/task/CompactionTask.java |  26 +-
 .../druid/indexing/common/task/IndexTask.java      | 364 ++++++++++-----------
 .../task/batch/parallel/ParallelIndexSubTask.java  |  26 +-
 .../parallel/ParallelIndexSupervisorTask.java      |   8 +-
 .../batch/parallel/ParallelIndexTuningConfig.java  |  26 +-
 .../SeekableStreamIndexTaskRunner.java             |   6 +-
 .../SeekableStreamIndexTaskTuningConfig.java       |  24 +-
 .../indexing/common/task/CompactionTaskTest.java   |  23 +-
 .../druid/indexing/common/task/IndexTaskTest.java  |  16 +-
 .../druid/indexing/common/task/TaskSerdeTest.java  |  17 +-
 .../ParallelIndexSupervisorTaskKillTest.java       |   1 +
 .../ParallelIndexSupervisorTaskResourceTest.java   |   1 +
 .../ParallelIndexSupervisorTaskSerdeTest.java      |   1 +
 .../parallel/ParallelIndexSupervisorTaskTest.java  |   2 +
 .../parallel/ParallelIndexTuningConfigTest.java    |   6 +-
 .../druid/indexing/overlord/TaskLifecycleTest.java |   6 +-
 .../appenderator/AppenderatorDriverAddResult.java  |   5 -
 41 files changed, 958 insertions(+), 727 deletions(-)

diff --git a/indexing-hadoop/src/main/java/org/apache/druid/indexer/IndexingHadoopModule.java b/core/src/main/java/org/apache/druid/indexer/partitions/DimensionBasedPartitionsSpec.java
similarity index 53%
copy from indexing-hadoop/src/main/java/org/apache/druid/indexer/IndexingHadoopModule.java
copy to core/src/main/java/org/apache/druid/indexer/partitions/DimensionBasedPartitionsSpec.java
index 9e0d29c..b1f4e32 100644
--- a/indexing-hadoop/src/main/java/org/apache/druid/indexer/IndexingHadoopModule.java
+++ b/core/src/main/java/org/apache/druid/indexer/partitions/DimensionBasedPartitionsSpec.java
@@ -17,34 +17,14 @@
  * under the License.
  */
 
-package org.apache.druid.indexer;
+package org.apache.druid.indexer.partitions;
 
-import com.fasterxml.jackson.databind.Module;
-import com.fasterxml.jackson.databind.jsontype.NamedType;
-import com.fasterxml.jackson.databind.module.SimpleModule;
-import com.google.inject.Binder;
-import org.apache.druid.initialization.DruidModule;
-
-import java.util.Collections;
 import java.util.List;
 
 /**
+ * PartitionsSpec based on dimension values.
  */
-public class IndexingHadoopModule implements DruidModule
+public interface DimensionBasedPartitionsSpec extends PartitionsSpec
 {
-  @Override
-  public List<? extends Module> getJacksonModules()
-  {
-    return Collections.<Module>singletonList(
-        new SimpleModule("IndexingHadoopModule")
-            .registerSubtypes(
-                new NamedType(HadoopyStringInputRowParser.class, "hadoopyString")
-            )
-    );
-  }
-
-  @Override
-  public void configure(Binder binder)
-  {
-  }
+  List<String> getPartitionDimensions();
 }
diff --git a/core/src/main/java/org/apache/druid/indexer/partitions/DynamicPartitionsSpec.java b/core/src/main/java/org/apache/druid/indexer/partitions/DynamicPartitionsSpec.java
new file mode 100644
index 0000000..fb32663
--- /dev/null
+++ b/core/src/main/java/org/apache/druid/indexer/partitions/DynamicPartitionsSpec.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexer.partitions;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+import javax.annotation.Nullable;
+import java.util.Objects;
+
+/**
+ * Dynamically determine partitions in the middle of indexing.
+ */
+public class DynamicPartitionsSpec implements PartitionsSpec
+{
+  public static final long DEFAULT_MAX_TOTAL_ROWS = 20_000_000;
+
+  private final int maxRowsPerSegment;
+  private final long maxTotalRows;
+
+  @JsonCreator
+  public DynamicPartitionsSpec(
+      @JsonProperty("maxRowsPerSegment") @Nullable Integer maxRowsPerSegment,
+      @JsonProperty("maxTotalRows") @Nullable Long maxTotalRows
+  )
+  {
+    this.maxRowsPerSegment = PartitionsSpec.isEffectivelyNull(maxRowsPerSegment)
+                             ? DEFAULT_MAX_ROWS_PER_SEGMENT
+                             : maxRowsPerSegment;
+    this.maxTotalRows = PartitionsSpec.isEffectivelyNull(maxTotalRows) ? DEFAULT_MAX_TOTAL_ROWS : maxTotalRows;
+  }
+
+  @Override
+  @JsonProperty
+  public Integer getMaxRowsPerSegment()
+  {
+    return maxRowsPerSegment;
+  }
+
+  @JsonProperty
+  public long getMaxTotalRows()
+  {
+    return maxTotalRows;
+  }
+
+  @Override
+  public boolean needsDeterminePartitions(boolean useForHadoopTask)
+  {
+    return false;
+  }
+
+  @Override
+  public boolean equals(Object o)
+  {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    DynamicPartitionsSpec that = (DynamicPartitionsSpec) o;
+    return maxRowsPerSegment == that.maxRowsPerSegment &&
+           maxTotalRows == that.maxTotalRows;
+  }
+
+  @Override
+  public int hashCode()
+  {
+    return Objects.hash(maxRowsPerSegment, maxTotalRows);
+  }
+
+  @Override
+  public String toString()
+  {
+    return "DynamicPartitionsSpec{" +
+           "maxRowsPerSegment=" + maxRowsPerSegment +
+           ", maxTotalRows=" + maxTotalRows +
+           '}';
+  }
+}
diff --git a/core/src/main/java/org/apache/druid/indexer/partitions/HashedPartitionsSpec.java b/core/src/main/java/org/apache/druid/indexer/partitions/HashedPartitionsSpec.java
new file mode 100644
index 0000000..8002fb5
--- /dev/null
+++ b/core/src/main/java/org/apache/druid/indexer/partitions/HashedPartitionsSpec.java
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexer.partitions;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.base.Preconditions;
+import org.apache.druid.java.util.common.logger.Logger;
+
+import javax.annotation.Nullable;
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+
+public class HashedPartitionsSpec implements DimensionBasedPartitionsSpec
+{
+  private static final Logger LOG = new Logger(HashedPartitionsSpec.class);
+
+  @Nullable
+  private final Integer maxRowsPerSegment;
+  @Nullable
+  private final Integer numShards;
+  private final List<String> partitionDimensions;
+
+  public static HashedPartitionsSpec defaultSpec()
+  {
+    return new HashedPartitionsSpec(null, null, null, null);
+  }
+
+  public HashedPartitionsSpec(
+      @Nullable Integer maxRowsPerSegment,
+      @Nullable Integer numShards,
+      @Nullable List<String> partitionDimensions
+  )
+  {
+    this(null, maxRowsPerSegment, numShards, partitionDimensions);
+  }
+
+  @JsonCreator
+  public HashedPartitionsSpec(
+      @JsonProperty("targetPartitionSize") @Deprecated @Nullable Integer targetPartitionSize,
+      @JsonProperty("maxRowsPerSegment") @Nullable Integer maxRowsPerSegment,
+      @JsonProperty("numShards") @Nullable Integer numShards,
+      @JsonProperty("partitionDimensions") @Nullable List<String> partitionDimensions
+  )
+  {
+    Preconditions.checkArgument(
+        PartitionsSpec.isEffectivelyNull(targetPartitionSize) || PartitionsSpec.isEffectivelyNull(maxRowsPerSegment),
+        "Can't set both targetPartitionSize and maxRowsPerSegment"
+    );
+    final Integer realMaxRowsPerSegment = targetPartitionSize == null ? maxRowsPerSegment : targetPartitionSize;
+    Preconditions.checkArgument(
+        PartitionsSpec.isEffectivelyNull(realMaxRowsPerSegment) || PartitionsSpec.isEffectivelyNull(numShards),
+        "Can't use maxRowsPerSegment or targetPartitionSize and numShards together"
+    );
+    // Needs to determine partitions if the _given_ numShards is null
+    this.maxRowsPerSegment = getValidMaxRowsPerSegment(realMaxRowsPerSegment, numShards);
+    this.numShards = PartitionsSpec.isEffectivelyNull(numShards) ? null : numShards;
+    this.partitionDimensions = partitionDimensions == null ? Collections.emptyList() : partitionDimensions;
+
+    Preconditions.checkArgument(
+        this.maxRowsPerSegment == null || this.maxRowsPerSegment > 0,
+        "maxRowsPerSegment[%s] should be positive",
+        this.maxRowsPerSegment
+    );
+    Preconditions.checkArgument(
+        this.numShards == null || this.numShards > 0,
+        "numShards[%s] should be positive",
+        this.numShards
+    );
+
+    final boolean needsPartitionDetermination = needsDeterminePartitions(numShards);
+    if (!needsPartitionDetermination) {
+      Preconditions.checkState(
+          this.maxRowsPerSegment == null,
+          "maxRowsPerSegment[%s] must be null if we don't need to determine partitions",
+          this.maxRowsPerSegment
+      );
+      Preconditions.checkState(
+          this.numShards != null,
+          "numShards must not be null if we don't need to determine partitions"
+      );
+    }
+  }
+
+  private static boolean needsDeterminePartitions(@Nullable Integer numShards)
+  {
+    return PartitionsSpec.isEffectivelyNull(numShards);
+  }
+
+  @Nullable
+  private static Integer getValidMaxRowsPerSegment(@Nullable Integer maxRowsPerSegment, @Nullable Integer numShards)
+  {
+    if (needsDeterminePartitions(numShards)) {
+      return PartitionsSpec.isEffectivelyNull(maxRowsPerSegment) ? null : maxRowsPerSegment;
+    } else {
+      if (!PartitionsSpec.isEffectivelyNull(maxRowsPerSegment)) {
+        LOG.warn("maxRowsPerSegment[%s] is ignored since numShards[%s] is specified", maxRowsPerSegment, numShards);
+      }
+      return null;
+    }
+  }
+
+  @Nullable
+  @Override
+  @JsonProperty
+  public Integer getMaxRowsPerSegment()
+  {
+    return maxRowsPerSegment;
+  }
+
+  @Override
+  public boolean needsDeterminePartitions(boolean useForHadoopTask)
+  {
+    return useForHadoopTask ? maxRowsPerSegment != null : numShards == null;
+  }
+
+  @Nullable
+  @JsonProperty
+  public Integer getNumShards()
+  {
+    return numShards;
+  }
+
+  @Override
+  @JsonProperty
+  public List<String> getPartitionDimensions()
+  {
+    return partitionDimensions;
+  }
+
+  @Override
+  public boolean equals(Object o)
+  {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    HashedPartitionsSpec that = (HashedPartitionsSpec) o;
+    return Objects.equals(maxRowsPerSegment, that.maxRowsPerSegment) &&
+           Objects.equals(numShards, that.numShards) &&
+           Objects.equals(partitionDimensions, that.partitionDimensions);
+  }
+
+  @Override
+  public int hashCode()
+  {
+    return Objects.hash(maxRowsPerSegment, numShards, partitionDimensions);
+  }
+
+  @Override
+  public String toString()
+  {
+    return "HashedPartitionsSpec{" +
+           "maxRowsPerSegment=" + maxRowsPerSegment +
+           ", numShards=" + numShards +
+           ", partitionDimensions=" + partitionDimensions +
+           '}';
+  }
+}
diff --git a/core/src/main/java/org/apache/druid/indexer/partitions/PartitionsSpec.java b/core/src/main/java/org/apache/druid/indexer/partitions/PartitionsSpec.java
new file mode 100644
index 0000000..2f7396d
--- /dev/null
+++ b/core/src/main/java/org/apache/druid/indexer/partitions/PartitionsSpec.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexer.partitions;
+
+import com.fasterxml.jackson.annotation.JsonSubTypes;
+import com.fasterxml.jackson.annotation.JsonTypeInfo;
+
+import javax.annotation.Nullable;
+
+/**
+ * PartitionsSpec describes the secondary partitioning method for data ingestion.
+ */
+@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type", defaultImpl = HashedPartitionsSpec.class)
+@JsonSubTypes(value = {
+    @JsonSubTypes.Type(name = "single_dim", value = SingleDimensionPartitionsSpec.class),
+    @JsonSubTypes.Type(name = "dimension", value = SingleDimensionPartitionsSpec.class), // for backward compatibility
+    @JsonSubTypes.Type(name = "hashed", value = HashedPartitionsSpec.class),
+    @JsonSubTypes.Type(name = "dynamic", value = DynamicPartitionsSpec.class)
+})
+public interface PartitionsSpec
+{
+  int DEFAULT_MAX_ROWS_PER_SEGMENT = 5_000_000;
+
+  /**
+   * Returns the max number of rows per segment.
+   * Implementations can have different default values which it could be even null.
+   * Callers should use the right value depending on the context if this returns null.
+   */
+  @Nullable
+  Integer getMaxRowsPerSegment();
+
+  /**
+   * Returns true if this partitionsSpec needs to determine the number of partitions to start data ingestion.
+   * It should usually return true if perfect rollup is enforced but number of partitions is not specified.
+   */
+  boolean needsDeterminePartitions(boolean useForHadoopTask);
+
+  /**
+   * '-1' regarded as null for some historical reason.
+   */
+  static boolean isEffectivelyNull(@Nullable Integer val)
+  {
+    return val == null || val == -1;
+  }
+
+  /**
+   * '-1' regarded as null for some historical reason.
+   */
+  static boolean isEffectivelyNull(@Nullable Long val)
+  {
+    return val == null || val == -1;
+  }
+}
diff --git a/core/src/main/java/org/apache/druid/indexer/partitions/SingleDimensionPartitionsSpec.java b/core/src/main/java/org/apache/druid/indexer/partitions/SingleDimensionPartitionsSpec.java
new file mode 100644
index 0000000..aa6ef87
--- /dev/null
+++ b/core/src/main/java/org/apache/druid/indexer/partitions/SingleDimensionPartitionsSpec.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexer.partitions;
+
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.base.Preconditions;
+
+import javax.annotation.Nullable;
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+
+public class SingleDimensionPartitionsSpec implements DimensionBasedPartitionsSpec
+{
+  private final int maxRowsPerSegment;
+  private final int maxPartitionSize;
+  @Nullable
+  private final String partitionDimension;
+  private final boolean assumeGrouped;
+
+  public SingleDimensionPartitionsSpec(
+      int maxRowsPerSegment,
+      @Nullable Integer maxPartitionSize,
+      @Nullable String partitionDimension,
+      boolean assumeGrouped
+  )
+  {
+    this(null, maxRowsPerSegment, maxPartitionSize, partitionDimension, assumeGrouped);
+  }
+
+  @JsonCreator
+  public SingleDimensionPartitionsSpec(
+      @JsonProperty("targetPartitionSize") @Nullable Integer targetPartitionSize,
+      @JsonProperty("maxRowsPerSegment") @Nullable Integer maxRowsPerSegment,
+      @JsonProperty("maxPartitionSize") @Nullable Integer maxPartitionSize,
+      @JsonProperty("partitionDimension") @Nullable String partitionDimension,
+      @JsonProperty("assumeGrouped") boolean assumeGrouped // false by default
+  )
+  {
+    Preconditions.checkArgument(
+        PartitionsSpec.isEffectivelyNull(targetPartitionSize) || PartitionsSpec.isEffectivelyNull(maxRowsPerSegment),
+        "Can't set both targetPartitionSize and maxRowsPerSegment"
+    );
+    Preconditions.checkArgument(
+        !PartitionsSpec.isEffectivelyNull(targetPartitionSize) || !PartitionsSpec.isEffectivelyNull(maxRowsPerSegment),
+        "Either targetPartitionSize or maxRowsPerSegment must be specified"
+    );
+    final int realMaxRowsPerSegment = targetPartitionSize == null ? maxRowsPerSegment : targetPartitionSize;
+    Preconditions.checkArgument(realMaxRowsPerSegment > 0, "maxRowsPerSegment must be specified");
+    this.maxRowsPerSegment = realMaxRowsPerSegment;
+    this.maxPartitionSize = PartitionsSpec.isEffectivelyNull(maxPartitionSize)
+                            ? Math.addExact(realMaxRowsPerSegment, (int) (realMaxRowsPerSegment * 0.5))
+                            : maxPartitionSize;
+    this.partitionDimension = partitionDimension;
+    this.assumeGrouped = assumeGrouped;
+  }
+
+  @Override
+  @JsonProperty
+  public Integer getMaxRowsPerSegment()
+  {
+    return maxRowsPerSegment;
+  }
+
+  @Override
+  public boolean needsDeterminePartitions(boolean useForHadoopTask)
+  {
+    return true;
+  }
+
+  @JsonProperty
+  public int getMaxPartitionSize()
+  {
+    return maxPartitionSize;
+  }
+
+  @JsonProperty
+  @Nullable
+  public String getPartitionDimension()
+  {
+    return partitionDimension;
+  }
+
+  @JsonProperty
+  public boolean isAssumeGrouped()
+  {
+    return assumeGrouped;
+  }
+
+  @Override
+  public List<String> getPartitionDimensions()
+  {
+    return partitionDimension == null ? Collections.emptyList() : Collections.singletonList(partitionDimension);
+  }
+
+  @Override
+  public boolean equals(Object o)
+  {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    SingleDimensionPartitionsSpec that = (SingleDimensionPartitionsSpec) o;
+    return maxRowsPerSegment == that.maxRowsPerSegment &&
+           maxPartitionSize == that.maxPartitionSize &&
+           assumeGrouped == that.assumeGrouped &&
+           Objects.equals(partitionDimension, that.partitionDimension);
+  }
+
+  @Override
+  public int hashCode()
+  {
+    return Objects.hash(maxRowsPerSegment, maxPartitionSize, partitionDimension, assumeGrouped);
+  }
+
+  @Override
+  public String toString()
+  {
+    return "SingleDimensionPartitionsSpec{" +
+           "maxRowsPerSegment=" + maxRowsPerSegment +
+           ", maxPartitionSize=" + maxPartitionSize +
+           ", partitionDimension='" + partitionDimension + '\'' +
+           ", assumeGrouped=" + assumeGrouped +
+           '}';
+  }
+}
diff --git a/docs/content/ingestion/hadoop.md b/docs/content/ingestion/hadoop.md
index 1c0bfbc..1005260 100644
--- a/docs/content/ingestion/hadoop.md
+++ b/docs/content/ingestion/hadoop.md
@@ -182,7 +182,7 @@ The tuningConfig is optional and default parameters will be used if no tuningCon
 |-----|----|-----------|--------|
 |workingPath|String|The working path to use for intermediate results (results between Hadoop jobs).|Only used by the [CLI Hadoop Indexer](../ingestion/command-line-hadoop-indexer.html). The default is '/tmp/druid-indexing'. This field must be null otherwise.|
 |version|String|The version of created segments. Ignored for HadoopIndexTask unless useExplicitVersion is set to true|no (default == datetime that indexing starts at)|
-|partitionsSpec|Object|A specification of how to partition each time bucket into segments. Absence of this property means no partitioning will occur. See 'Partitioning specification' below.|no (default == 'hashed')|
+|partitionsSpec|Object|A specification of how to partition each time bucket into segments. Absence of this property means no partitioning will occur. See 'Partitioning specification' below.|no (default == 'hadoop_hashed_partitions')|
 |maxRowsInMemory|Integer|The number of rows to aggregate before persisting. Note that this is the number of post-aggregation rows which may not be equal to the number of input events due to roll-up. This is used to manage the required JVM heap size. Normally user does not need to set this, but depending on the nature of data, if rows are short in terms of bytes, user may not want to store a million rows in memory and this value should be set.|no (default == 1000000)|
 |maxBytesInMemory|Long|The number of bytes to aggregate in heap memory before persisting. Normally this is computed internally and user does not need to set it. This is based on a rough estimate of memory usage and not actual usage. The maximum heap memory usage for indexing is maxBytesInMemory * (2 + maxPendingPersists).|no (default == One-sixth of max JVM memory)|
 |leaveIntermediate|Boolean|Leave behind intermediate files (for debugging) in the workingPath when a job completes, whether it passes or fails.|no (default == false)|
@@ -246,8 +246,8 @@ For Roaring bitmaps:
 ## Partitioning specification
 
 Segments are always partitioned based on timestamp (according to the granularitySpec) and may be further partitioned in
-some other way depending on partition type. Druid supports two types of partitioning strategies: "hashed" (based on the
-hash of all dimensions in each row), and "dimension" (based on ranges of a single dimension).
+some other way depending on partition type. Druid supports two types of partitioning strategies: `hadoop_hashed_partitions` (based on the
+hash of all dimensions in each row), and `hadoop_single_dim_partitions` (based on ranges of a single dimension).
 
 Hashed partitioning is recommended in most cases, as it will improve indexing performance and create more uniformly
 sized data segments relative to single-dimension partitioning.
@@ -256,7 +256,7 @@ sized data segments relative to single-dimension partitioning.
 
 ```json
   "partitionsSpec": {
-     "type": "hashed",
+     "type": "hadoop_hashed_partitions",
      "targetPartitionSize": 5000000
    }
 ```
@@ -269,7 +269,7 @@ The configuration options are:
 
 |Field|Description|Required|
 |--------|-----------|---------|
-|type|Type of partitionSpec to be used.|"hashed"|
+|type|Type of partitionSpec to be used.|"hadoop_hashed_partitions"|
 |targetPartitionSize|Target number of rows to include in a partition, should be a number that targets segments of 500MB\~1GB.|either this or numShards|
 |numShards|Specify the number of partitions directly, instead of a target partition size. Ingestion will run faster, since it can skip the step necessary to select a number of partitions automatically.|either this or targetPartitionSize|
 |partitionDimensions|The dimensions to partition on. Leave blank to select all dimensions. Only used with numShards, will be ignored when targetPartitionSize is set|no|
@@ -278,7 +278,7 @@ The configuration options are:
 
 ```json
   "partitionsSpec": {
-     "type": "dimension",
+     "type": "hadoop_single_dim_partitions",
      "targetPartitionSize": 5000000
    }
 ```
@@ -293,7 +293,7 @@ The configuration options are:
 
 |Field|Description|Required|
 |--------|-----------|---------|
-|type|Type of partitionSpec to be used.|"dimension"|
+|type|Type of partitionSpec to be used.|"hadoop_single_dim_partitions"|
 |targetPartitionSize|Target number of rows to include in a partition, should be a number that targets segments of 500MB\~1GB.|yes|
 |maxPartitionSize|Maximum number of rows to include in a partition. Defaults to 50% larger than the targetPartitionSize.|no|
 |partitionDimension|The dimension to partition on. Leave blank to select a dimension automatically.|no|
diff --git a/docs/content/ingestion/native_tasks.md b/docs/content/ingestion/native_tasks.md
index 1cf5e01..76dfd20 100644
--- a/docs/content/ingestion/native_tasks.md
+++ b/docs/content/ingestion/native_tasks.md
@@ -180,13 +180,11 @@ The tuningConfig is optional and default parameters will be used if no tuningCon
 |property|description|default|required?|
 |--------|-----------|-------|---------|
 |type|The task type, this should always be `index_parallel`.|none|yes|
-|maxRowsPerSegment|Used in sharding. Determines how many rows are in each segment.|5000000|no|
 |maxRowsInMemory|Used in determining when intermediate persists to disk should occur. Normally user does not need to set this, but depending on the nature of data, if rows are short in terms of bytes, user may not want to store a million rows in memory and this value should be set.|1000000|no|
 |maxBytesInMemory|Used in determining when intermediate persists to disk should occur. Normally this is computed internally and user does not need to set it. This value represents number of bytes to aggregate in heap memory before persisting. This is based on a rough estimate of memory usage and not actual usage. The maximum heap memory usage for indexing is maxBytesInMemory * (2 + maxPendingPersists)|1/6 of max JVM memory|no|
-|maxTotalRows|Total number of rows in segments waiting for being pushed. Used in determining when intermediate pushing should occur.|20000000|no|
-|numShards|Directly specify the number of shards to create. If this is specified and 'intervals' is specified in the granularitySpec, the index task can skip the determine intervals/partitions pass through the data. numShards cannot be specified if maxRowsPerSegment is set.|null|no|
-|indexSpec|defines segment storage format options to be used at indexing time, see [IndexSpec](#indexspec)|null|no|
-|indexSpecForIntermediatePersists|defines segment storage format options to be used at indexing time for intermediate persisted temporary segments. this can be used to disable dimension/metric compression on intermediate segments to reduce memory required for final merging. however, disabling compression on intermediate segments might increase page cache use while they are used before getting merged into final segment published, see [IndexSpec](#indexspec) for possible values.|same as in [...]
+|partitionsSpec|Defines how to partition the segments in timeChunk, see [PartitionsSpec](#partitionsspec)|`dynamic_partitions`|no|
+|indexSpec|Defines segment storage format options to be used at indexing time, see [IndexSpec](#indexspec)|null|no|
+|indexSpecForIntermediatePersists|Defines segment storage format options to be used at indexing time for intermediate persisted temporary segments. this can be used to disable dimension/metric compression on intermediate segments to reduce memory required for final merging. however, disabling compression on intermediate segments might increase page cache use while they are used before getting merged into final segment published, see [IndexSpec](#indexspec) for possible values.|same as in [...]
 |maxPendingPersists|Maximum number of persists that can be pending but not started. If this limit would be exceeded by a new intermediate persist, ingestion will block until the currently-running persist finishes. Maximum heap memory usage for indexing scales with maxRowsInMemory * (2 + maxPendingPersists).|0 (meaning one persist can be running concurrently with ingestion, and none can be queued up)|no|
 |reportParseExceptions|If true, exceptions encountered during parsing will be thrown and will halt ingestion; if false, unparseable rows and fields will be skipped.|false|no|
 |pushTimeout|Milliseconds to wait for pushing segments. It must be >= 0, where 0 means to wait forever.|0|no|
@@ -197,6 +195,18 @@ The tuningConfig is optional and default parameters will be used if no tuningCon
 |chatHandlerTimeout|Timeout for reporting the pushed segments in worker tasks.|PT10S|no|
 |chatHandlerNumRetries|Retries for reporting the pushed segments in worker tasks.|5|no|
 
+#### PartitionsSpec
+
+PartitionsSpec is to describe the secondary partitioning method.
+Parallel Index Task supports only the best-effort rollup mode,
+and thus `dynamic_partitions` is only available option currently.
+
+|property|description|default|required?|
+|--------|-----------|-------|---------|
+|type|This should always be `dynamic_partitions`|none|yes|
+|maxRowsPerSegment|Used in sharding. Determines how many rows are in each segment.|5000000|no|
+|maxTotalRows|Total number of rows in segments waiting for being pushed. Used in determining when intermediate segment push should occur.|20000000|no|
+
 #### HTTP Endpoints
 
 The supervisor task provides some HTTP endpoints to get running status.
@@ -558,14 +568,11 @@ The tuningConfig is optional and default parameters will be used if no tuningCon
 |property|description|default|required?|
 |--------|-----------|-------|---------|
 |type|The task type, this should always be "index".|none|yes|
-|maxRowsPerSegment|Used in sharding. Determines how many rows are in each segment.|5000000|no|
 |maxRowsInMemory|Used in determining when intermediate persists to disk should occur. Normally user does not need to set this, but depending on the nature of data, if rows are short in terms of bytes, user may not want to store a million rows in memory and this value should be set.|1000000|no|
 |maxBytesInMemory|Used in determining when intermediate persists to disk should occur. Normally this is computed internally and user does not need to set it. This value represents number of bytes to aggregate in heap memory before persisting. This is based on a rough estimate of memory usage and not actual usage. The maximum heap memory usage for indexing is maxBytesInMemory * (2 + maxPendingPersists)|1/6 of max JVM memory|no|
-|maxTotalRows|Total number of rows in segments waiting for being pushed. Used in determining when intermediate pushing should occur.|20000000|no|
-|numShards|Directly specify the number of shards to create. If this is specified and 'intervals' is specified in the granularitySpec, the index task can skip the determine intervals/partitions pass through the data. numShards cannot be specified if maxRowsPerSegment is set.|null|no|
-|partitionDimensions|The dimensions to partition on. Leave blank to select all dimensions. Only used with `forceGuaranteedRollup` = true, will be ignored otherwise.|null|no|
-|indexSpec|defines segment storage format options to be used at indexing time, see [IndexSpec](#indexspec)|null|no|
-|indexSpecForIntermediatePersists|defines segment storage format options to be used at indexing time for intermediate persisted temporary segments. this can be used to disable dimension/metric compression on intermediate segments to reduce memory required for final merging. however, disabling compression on intermediate segments might increase page cache use while they are used before getting merged into final segment published, see [IndexSpec](#indexspec) for possible values.|same as in [...]
+|partitionsSpec|Defines how to partition the segments in timeChunk, see [PartitionsSpec](#partitionsspec)|`dynamic_partitions` if `forceGuaranteedRollup` = false, `hashed_partitions` if `forceGuaranteedRollup` = true|no|
+|indexSpec|Defines segment storage format options to be used at indexing time, see [IndexSpec](#indexspec)|null|no|
+|indexSpecForIntermediatePersists|Defines segment storage format options to be used at indexing time for intermediate persisted temporary segments. this can be used to disable dimension/metric compression on intermediate segments to reduce memory required for final merging. however, disabling compression on intermediate segments might increase page cache use while they are used before getting merged into final segment published, see [IndexSpec](#indexspec) for possible values.|same as in [...]
 |maxPendingPersists|Maximum number of persists that can be pending but not started. If this limit would be exceeded by a new intermediate persist, ingestion will block until the currently-running persist finishes. Maximum heap memory usage for indexing scales with maxRowsInMemory * (2 + maxPendingPersists).|0 (meaning one persist can be running concurrently with ingestion, and none can be queued up)|no|
 |forceGuaranteedRollup|Forces guaranteeing the [perfect rollup](../ingestion/index.html#roll-up-modes). The perfect rollup optimizes the total size of generated segments and querying time while indexing time will be increased. If this is set to true, the index task will read the entire input data twice: one for finding the optimal number of partitions per time chunk and one for generating segments. Note that the result segments would be hash-partitioned. You can set `forceExtendableShard [...]
 |reportParseExceptions|DEPRECATED. If true, exceptions encountered during parsing will be thrown and will halt ingestion; if false, unparseable rows and fields will be skipped. Setting `reportParseExceptions` to true will override existing configurations for `maxParseExceptions` and `maxSavedParseExceptions`, setting `maxParseExceptions` to 0 and limiting `maxSavedParseExceptions` to no more than 1.|false|no|
@@ -575,6 +582,27 @@ The tuningConfig is optional and default parameters will be used if no tuningCon
 |maxParseExceptions|The maximum number of parse exceptions that can occur before the task halts ingestion and fails. Overridden if `reportParseExceptions` is set.|unlimited|no|
 |maxSavedParseExceptions|When a parse exception occurs, Druid can keep track of the most recent parse exceptions. "maxSavedParseExceptions" limits how many exception instances will be saved. These saved exceptions will be made available after the task finishes in the [task completion report](../ingestion/reports.html). Overridden if `reportParseExceptions` is set.|0|no|
 
+#### PartitionsSpec
+
+PartitionsSpec is to describe the secondary partitioning method.
+You should use different partitionsSpec depending on the [rollup mode](../ingestion/index.html#roll-up-modes) you want.
+For perfect rollup, you should use `hashed_partitions`.
+
+|property|description|default|required?|
+|--------|-----------|-------|---------|
+|type|This should always be `hashed_partitions`|none|yes|
+|maxRowsPerSegment|Used in sharding. Determines how many rows are in each segment.|5000000|no|
+|numShards|Directly specify the number of shards to create. If this is specified and 'intervals' is specified in the granularitySpec, the index task can skip the determine intervals/partitions pass through the data. numShards cannot be specified if maxRowsPerSegment is set.|null|no|
+|partitionDimensions|The dimensions to partition on. Leave blank to select all dimensions.|null|no|
+
+For best-effort rollup, you should use `dynamic_partitions`.
+
+|property|description|default|required?|
+|--------|-----------|-------|---------|
+|type|This should always be `dynamic_partitions`|none|yes|
+|maxRowsPerSegment|Used in sharding. Determines how many rows are in each segment.|5000000|no|
+|maxTotalRows|Total number of rows in segments waiting for being pushed. Used in determining when intermediate segment push should occur.|20000000|no|
+
 #### IndexSpec
 
 The indexSpec defines segment storage format options to be used at indexing time, such as bitmap type and column
diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTuningConfigTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTuningConfigTest.java
index 57b4f37..9388493 100644
--- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTuningConfigTest.java
+++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTuningConfigTest.java
@@ -21,6 +21,7 @@ package org.apache.druid.indexing.kafka;
 
 import com.fasterxml.jackson.databind.Module;
 import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.druid.indexer.partitions.DynamicPartitionsSpec;
 import org.apache.druid.indexing.kafka.supervisor.KafkaSupervisorTuningConfig;
 import org.apache.druid.indexing.kafka.test.TestModifiedKafkaIndexTaskTuningConfig;
 import org.apache.druid.jackson.DefaultObjectMapper;
@@ -62,7 +63,7 @@ public class KafkaIndexTaskTuningConfigTest
     Assert.assertNotNull(config.getBasePersistDirectory());
     Assert.assertEquals(1000000, config.getMaxRowsInMemory());
     Assert.assertEquals(5_000_000, config.getMaxRowsPerSegment().intValue());
-    Assert.assertEquals(null, config.getMaxTotalRows());
+    Assert.assertEquals(DynamicPartitionsSpec.DEFAULT_MAX_TOTAL_ROWS, config.getMaxTotalRows().longValue());
     Assert.assertEquals(new Period("PT10M"), config.getIntermediatePersistPeriod());
     Assert.assertEquals(0, config.getMaxPendingPersists());
     Assert.assertEquals(new IndexSpec(), config.getIndexSpec());
diff --git a/indexing-hadoop/pom.xml b/indexing-hadoop/pom.xml
index 3a3f5ff..ea6f6c3 100644
--- a/indexing-hadoop/pom.xml
+++ b/indexing-hadoop/pom.xml
@@ -42,12 +42,12 @@
             <groupId>org.apache.druid</groupId>
             <artifactId>druid-core</artifactId>
             <version>${project.parent.version}</version>
-                <exclusions>
-                    <exclusion>
-                        <groupId>org.slf4j</groupId>
-                        <artifactId>slf4j-api</artifactId>
-                    </exclusion>
-                </exclusions>
+            <exclusions>
+                <exclusion>
+                    <groupId>org.slf4j</groupId>
+                    <artifactId>slf4j-api</artifactId>
+                </exclusion>
+            </exclusions>
         </dependency>
 
         <dependency>
diff --git a/indexing-hadoop/src/main/java/org/apache/druid/indexer/DeterminePartitionsJob.java b/indexing-hadoop/src/main/java/org/apache/druid/indexer/DeterminePartitionsJob.java
index 669366c5..984d104 100644
--- a/indexing-hadoop/src/main/java/org/apache/druid/indexer/DeterminePartitionsJob.java
+++ b/indexing-hadoop/src/main/java/org/apache/druid/indexer/DeterminePartitionsJob.java
@@ -126,7 +126,10 @@ public class DeterminePartitionsJob implements Jobby
         );
       }
 
-      if (!config.getPartitionsSpec().isAssumeGrouped()) {
+      final SingleDimensionPartitionsSpec partitionsSpec =
+          (SingleDimensionPartitionsSpec) config.getPartitionsSpec();
+
+      if (!partitionsSpec.isAssumeGrouped()) {
         groupByJob = Job.getInstance(
             new Configuration(),
             StringUtils.format("%s-determine_partitions_groupby-%s", config.getDataSource(), config.getIntervals())
@@ -191,7 +194,7 @@ public class DeterminePartitionsJob implements Jobby
       JobHelper.injectSystemProperties(dimSelectionJob);
       config.addJobProperties(dimSelectionJob);
 
-      if (!config.getPartitionsSpec().isAssumeGrouped()) {
+      if (!partitionsSpec.isAssumeGrouped()) {
         // Read grouped data from the groupByJob.
         dimSelectionJob.setMapperClass(DeterminePartitionsDimSelectionPostGroupByMapper.class);
         dimSelectionJob.setInputFormatClass(SequenceFileInputFormat.class);
@@ -764,8 +767,10 @@ public class DeterminePartitionsJob implements Jobby
 
         // Make sure none of these shards are oversized
         boolean oversized = false;
+        final SingleDimensionPartitionsSpec partitionsSpec =
+            (SingleDimensionPartitionsSpec) config.getPartitionsSpec();
         for (final DimPartition partition : dimPartitions.partitions) {
-          if (partition.rows > config.getMaxPartitionSize()) {
+          if (partition.rows > partitionsSpec.getMaxPartitionSize()) {
             log.info("Dimension[%s] has an oversized shard: %s", dimPartitions.dim, partition.shardSpec);
             oversized = true;
           }
diff --git a/indexing-hadoop/src/main/java/org/apache/druid/indexer/HadoopDruidDetermineConfigurationJob.java b/indexing-hadoop/src/main/java/org/apache/druid/indexer/HadoopDruidDetermineConfigurationJob.java
index 534afcb..a25f274 100644
--- a/indexing-hadoop/src/main/java/org/apache/druid/indexer/HadoopDruidDetermineConfigurationJob.java
+++ b/indexing-hadoop/src/main/java/org/apache/druid/indexer/HadoopDruidDetermineConfigurationJob.java
@@ -21,6 +21,10 @@ package org.apache.druid.indexer;
 
 import com.google.common.collect.Lists;
 import com.google.inject.Inject;
+import org.apache.druid.indexer.partitions.HashedPartitionsSpec;
+import org.apache.druid.indexer.partitions.PartitionsSpec;
+import org.apache.druid.indexer.partitions.SingleDimensionPartitionsSpec;
+import org.apache.druid.java.util.common.ISE;
 import org.apache.druid.java.util.common.logger.Logger;
 import org.apache.druid.timeline.partition.HashBasedNumberedShardSpec;
 import org.joda.time.DateTime;
@@ -40,9 +44,7 @@ public class HadoopDruidDetermineConfigurationJob implements Jobby
   private String hadoopJobIdFile;
 
   @Inject
-  public HadoopDruidDetermineConfigurationJob(
-      HadoopDruidIndexerConfig config
-  )
+  public HadoopDruidDetermineConfigurationJob(HadoopDruidIndexerConfig config)
   {
     this.config = config;
   }
@@ -53,24 +55,32 @@ public class HadoopDruidDetermineConfigurationJob implements Jobby
     JobHelper.ensurePaths(config);
 
     if (config.isDeterminingPartitions()) {
-      job = config.getPartitionsSpec().getPartitionJob(config);
+      job = createPartitionJob(config);
       config.setHadoopJobIdFileName(hadoopJobIdFile);
       return JobHelper.runSingleJob(job, config);
     } else {
-      int shardsPerInterval = config.getPartitionsSpec().getNumShards();
+      final PartitionsSpec partitionsSpec = config.getPartitionsSpec();
+      final int shardsPerInterval;
+      if (partitionsSpec instanceof HashedPartitionsSpec) {
+        final HashedPartitionsSpec hashedPartitionsSpec = (HashedPartitionsSpec) partitionsSpec;
+        shardsPerInterval = PartitionsSpec.isEffectivelyNull(hashedPartitionsSpec.getNumShards())
+                            ? 1
+                            : hashedPartitionsSpec.getNumShards();
+      } else {
+        shardsPerInterval = 1;
+      }
       Map<Long, List<HadoopyShardSpec>> shardSpecs = new TreeMap<>();
       int shardCount = 0;
       for (Interval segmentGranularity : config.getSegmentGranularIntervals().get()) {
         DateTime bucket = segmentGranularity.getStart();
         // negative shardsPerInterval means a single shard
-        final int realShardsPerInterval = shardsPerInterval < 0 ? 1 : shardsPerInterval;
-        List<HadoopyShardSpec> specs = Lists.newArrayListWithCapacity(realShardsPerInterval);
-        for (int i = 0; i < realShardsPerInterval; i++) {
+        List<HadoopyShardSpec> specs = Lists.newArrayListWithCapacity(shardsPerInterval);
+        for (int i = 0; i < shardsPerInterval; i++) {
           specs.add(
               new HadoopyShardSpec(
                   new HashBasedNumberedShardSpec(
                       i,
-                      realShardsPerInterval,
+                      shardsPerInterval,
                       config.getPartitionsSpec().getPartitionDimensions(),
                       HadoopDruidIndexerConfig.JSON_MAPPER
                   ),
@@ -86,6 +96,18 @@ public class HadoopDruidDetermineConfigurationJob implements Jobby
     }
   }
 
+  private static Jobby createPartitionJob(HadoopDruidIndexerConfig config)
+  {
+    final PartitionsSpec partitionsSpec = config.getPartitionsSpec();
+    if (partitionsSpec instanceof HashedPartitionsSpec) {
+      return new DetermineHashedPartitionsJob(config);
+    } else if (partitionsSpec instanceof SingleDimensionPartitionsSpec) {
+      return new DeterminePartitionsJob(config);
+    } else {
+      throw new ISE("Unknown partitionsSpec[%s]", partitionsSpec);
+    }
+  }
+
   @Override
   public Map<String, Object> getStats()
   {
diff --git a/indexing-hadoop/src/main/java/org/apache/druid/indexer/HadoopDruidIndexerConfig.java b/indexing-hadoop/src/main/java/org/apache/druid/indexer/HadoopDruidIndexerConfig.java
index 4aa8cd2..37e8117 100644
--- a/indexing-hadoop/src/main/java/org/apache/druid/indexer/HadoopDruidIndexerConfig.java
+++ b/indexing-hadoop/src/main/java/org/apache/druid/indexer/HadoopDruidIndexerConfig.java
@@ -39,7 +39,7 @@ import org.apache.druid.data.input.impl.InputRowParser;
 import org.apache.druid.guice.GuiceInjectors;
 import org.apache.druid.guice.JsonConfigProvider;
 import org.apache.druid.guice.annotations.Self;
-import org.apache.druid.indexer.partitions.PartitionsSpec;
+import org.apache.druid.indexer.partitions.DimensionBasedPartitionsSpec;
 import org.apache.druid.indexer.path.PathSpec;
 import org.apache.druid.initialization.Initialization;
 import org.apache.druid.java.util.common.DateTimes;
@@ -289,7 +289,7 @@ public class HadoopDruidIndexerConfig
     this.pathSpec = JSON_MAPPER.convertValue(schema.getIOConfig().getPathSpec(), PathSpec.class);
   }
 
-  public PartitionsSpec getPartitionsSpec()
+  public DimensionBasedPartitionsSpec getPartitionsSpec()
   {
     return schema.getTuningConfig().getPartitionsSpec();
   }
@@ -327,12 +327,13 @@ public class HadoopDruidIndexerConfig
 
   public boolean isDeterminingPartitions()
   {
-    return schema.getTuningConfig().getPartitionsSpec().isDeterminingPartitions();
+    return schema.getTuningConfig().getPartitionsSpec().needsDeterminePartitions(true);
   }
 
-  public Long getTargetPartitionSize()
+  public int getTargetPartitionSize()
   {
-    return schema.getTuningConfig().getPartitionsSpec().getTargetPartitionSize();
+    final Integer targetPartitionSize = schema.getTuningConfig().getPartitionsSpec().getMaxRowsPerSegment();
+    return targetPartitionSize == null ? -1 : targetPartitionSize;
   }
 
   public boolean isForceExtendableShardSpecs()
@@ -340,11 +341,6 @@ public class HadoopDruidIndexerConfig
     return schema.getTuningConfig().isForceExtendableShardSpecs();
   }
 
-  public long getMaxPartitionSize()
-  {
-    return schema.getTuningConfig().getPartitionsSpec().getMaxPartitionSize();
-  }
-
   public boolean isUpdaterJobSpecSet()
   {
     return (schema.getIOConfig().getMetadataUpdateSpec() != null);
diff --git a/indexing-hadoop/src/main/java/org/apache/druid/indexer/HadoopTuningConfig.java b/indexing-hadoop/src/main/java/org/apache/druid/indexer/HadoopTuningConfig.java
index e61f912..704f4c7 100644
--- a/indexing-hadoop/src/main/java/org/apache/druid/indexer/HadoopTuningConfig.java
+++ b/indexing-hadoop/src/main/java/org/apache/druid/indexer/HadoopTuningConfig.java
@@ -25,8 +25,8 @@ import com.fasterxml.jackson.annotation.JsonTypeName;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
+import org.apache.druid.indexer.partitions.DimensionBasedPartitionsSpec;
 import org.apache.druid.indexer.partitions.HashedPartitionsSpec;
-import org.apache.druid.indexer.partitions.PartitionsSpec;
 import org.apache.druid.java.util.common.DateTimes;
 import org.apache.druid.segment.IndexSpec;
 import org.apache.druid.segment.indexing.TuningConfig;
@@ -40,7 +40,7 @@ import java.util.Map;
 @JsonTypeName("hadoop")
 public class HadoopTuningConfig implements TuningConfig
 {
-  private static final PartitionsSpec DEFAULT_PARTITIONS_SPEC = HashedPartitionsSpec.makeDefaultHashedPartitionsSpec();
+  private static final DimensionBasedPartitionsSpec DEFAULT_PARTITIONS_SPEC = HashedPartitionsSpec.defaultSpec();
   private static final Map<Long, List<HadoopyShardSpec>> DEFAULT_SHARD_SPECS = ImmutableMap.of();
   private static final IndexSpec DEFAULT_INDEX_SPEC = new IndexSpec();
   private static final int DEFAULT_ROW_FLUSH_BOUNDARY = TuningConfig.DEFAULT_MAX_ROWS_IN_MEMORY;
@@ -79,7 +79,7 @@ public class HadoopTuningConfig implements TuningConfig
 
   private final String workingPath;
   private final String version;
-  private final PartitionsSpec partitionsSpec;
+  private final DimensionBasedPartitionsSpec partitionsSpec;
   private final Map<Long, List<HadoopyShardSpec>> shardSpecs;
   private final IndexSpec indexSpec;
   private final IndexSpec indexSpecForIntermediatePersists;
@@ -104,7 +104,7 @@ public class HadoopTuningConfig implements TuningConfig
   public HadoopTuningConfig(
       final @JsonProperty("workingPath") String workingPath,
       final @JsonProperty("version") String version,
-      final @JsonProperty("partitionsSpec") PartitionsSpec partitionsSpec,
+      final @JsonProperty("partitionsSpec") DimensionBasedPartitionsSpec partitionsSpec,
       final @JsonProperty("shardSpecs") Map<Long, List<HadoopyShardSpec>> shardSpecs,
       final @JsonProperty("indexSpec") IndexSpec indexSpec,
       final @JsonProperty("indexSpecForIntermediatePersists") @Nullable IndexSpec indexSpecForIntermediatePersists,
@@ -187,7 +187,7 @@ public class HadoopTuningConfig implements TuningConfig
   }
 
   @JsonProperty
-  public PartitionsSpec getPartitionsSpec()
+  public DimensionBasedPartitionsSpec getPartitionsSpec()
   {
     return partitionsSpec;
   }
diff --git a/indexing-hadoop/src/main/java/org/apache/druid/indexer/IndexingHadoopModule.java b/indexing-hadoop/src/main/java/org/apache/druid/indexer/IndexingHadoopModule.java
index 9e0d29c..1d7185e 100644
--- a/indexing-hadoop/src/main/java/org/apache/druid/indexer/IndexingHadoopModule.java
+++ b/indexing-hadoop/src/main/java/org/apache/druid/indexer/IndexingHadoopModule.java
@@ -36,7 +36,7 @@ public class IndexingHadoopModule implements DruidModule
   public List<? extends Module> getJacksonModules()
   {
     return Collections.<Module>singletonList(
-        new SimpleModule("IndexingHadoopModule")
+        new SimpleModule(getClass().getSimpleName())
             .registerSubtypes(
                 new NamedType(HadoopyStringInputRowParser.class, "hadoopyString")
             )
diff --git a/indexing-hadoop/src/main/java/org/apache/druid/indexer/partitions/AbstractPartitionsSpec.java b/indexing-hadoop/src/main/java/org/apache/druid/indexer/partitions/AbstractPartitionsSpec.java
deleted file mode 100644
index 2881279..0000000
--- a/indexing-hadoop/src/main/java/org/apache/druid/indexer/partitions/AbstractPartitionsSpec.java
+++ /dev/null
@@ -1,87 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.druid.indexer.partitions;
-
-import com.fasterxml.jackson.annotation.JsonProperty;
-import com.google.common.base.Preconditions;
-
-
-public abstract class AbstractPartitionsSpec implements PartitionsSpec
-{
-  private static final double DEFAULT_OVERSIZE_THRESHOLD = 1.5;
-  private static final long DEFAULT_TARGET_PARTITION_SIZE = -1;
-
-  private final long targetPartitionSize;
-  private final long maxPartitionSize;
-  private final boolean assumeGrouped;
-  private final int numShards;
-
-  public AbstractPartitionsSpec(
-      Long targetPartitionSize,
-      Long maxPartitionSize,
-      Boolean assumeGrouped,
-      Integer numShards
-  )
-  {
-    this.targetPartitionSize = targetPartitionSize == null ? DEFAULT_TARGET_PARTITION_SIZE : targetPartitionSize;
-    this.maxPartitionSize = maxPartitionSize == null
-                            ? (long) (this.targetPartitionSize * DEFAULT_OVERSIZE_THRESHOLD)
-                            : maxPartitionSize;
-    this.assumeGrouped = assumeGrouped == null ? false : assumeGrouped;
-    this.numShards = numShards == null ? -1 : numShards;
-    Preconditions.checkArgument(
-        this.targetPartitionSize == -1 || this.numShards == -1,
-        "targetPartitionsSize and shardCount both cannot be set"
-    );
-  }
-
-  @Override
-  @JsonProperty
-  public long getTargetPartitionSize()
-  {
-    return targetPartitionSize;
-  }
-
-  @Override
-  @JsonProperty
-  public long getMaxPartitionSize()
-  {
-    return maxPartitionSize;
-  }
-
-  @Override
-  @JsonProperty
-  public boolean isAssumeGrouped()
-  {
-    return assumeGrouped;
-  }
-
-  @Override
-  public boolean isDeterminingPartitions()
-  {
-    return targetPartitionSize > 0;
-  }
-
-  @Override
-  public int getNumShards()
-  {
-    return numShards;
-  }
-}
diff --git a/indexing-hadoop/src/main/java/org/apache/druid/indexer/partitions/HashedPartitionsSpec.java b/indexing-hadoop/src/main/java/org/apache/druid/indexer/partitions/HashedPartitionsSpec.java
deleted file mode 100644
index 31905e4..0000000
--- a/indexing-hadoop/src/main/java/org/apache/druid/indexer/partitions/HashedPartitionsSpec.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.druid.indexer.partitions;
-
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonIgnore;
-import com.fasterxml.jackson.annotation.JsonProperty;
-import com.google.common.collect.ImmutableList;
-import org.apache.druid.indexer.DetermineHashedPartitionsJob;
-import org.apache.druid.indexer.HadoopDruidIndexerConfig;
-import org.apache.druid.indexer.Jobby;
-
-import javax.annotation.Nullable;
-import java.util.List;
-
-public class HashedPartitionsSpec extends AbstractPartitionsSpec
-{
-  private static final List<String> DEFAULT_PARTITION_DIMENSIONS = ImmutableList.of();
-
-  public static HashedPartitionsSpec makeDefaultHashedPartitionsSpec()
-  {
-    return new HashedPartitionsSpec(null, null, null, null, null);
-  }
-
-  @JsonIgnore
-  private final List<String> partitionDimensions;
-
-  @JsonCreator
-  public HashedPartitionsSpec(
-      @JsonProperty("targetPartitionSize") @Nullable Long targetPartitionSize,
-      @JsonProperty("maxPartitionSize") @Nullable Long maxPartitionSize,
-      @JsonProperty("assumeGrouped") @Nullable Boolean assumeGrouped,
-      @JsonProperty("numShards") @Nullable Integer numShards,
-      @JsonProperty("partitionDimensions") @Nullable List<String> partitionDimensions
-  )
-  {
-    super(targetPartitionSize, maxPartitionSize, assumeGrouped, numShards);
-    this.partitionDimensions = partitionDimensions == null ? DEFAULT_PARTITION_DIMENSIONS : partitionDimensions;
-  }
-
-  @Override
-  public Jobby getPartitionJob(HadoopDruidIndexerConfig config)
-  {
-    return new DetermineHashedPartitionsJob(config);
-  }
-
-  @Override
-  @JsonProperty
-  public List<String> getPartitionDimensions()
-  {
-    return partitionDimensions;
-  }
-}
diff --git a/indexing-hadoop/src/main/java/org/apache/druid/indexer/partitions/PartitionsSpec.java b/indexing-hadoop/src/main/java/org/apache/druid/indexer/partitions/PartitionsSpec.java
deleted file mode 100644
index 44268e2..0000000
--- a/indexing-hadoop/src/main/java/org/apache/druid/indexer/partitions/PartitionsSpec.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.druid.indexer.partitions;
-
-import com.fasterxml.jackson.annotation.JsonIgnore;
-import com.fasterxml.jackson.annotation.JsonProperty;
-import com.fasterxml.jackson.annotation.JsonSubTypes;
-import com.fasterxml.jackson.annotation.JsonTypeInfo;
-import org.apache.druid.indexer.HadoopDruidIndexerConfig;
-import org.apache.druid.indexer.Jobby;
-
-import java.util.List;
-
-@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type", defaultImpl = HashedPartitionsSpec.class)
-@JsonSubTypes(value = {
-    @JsonSubTypes.Type(name = "dimension", value = SingleDimensionPartitionsSpec.class),
-    @JsonSubTypes.Type(name = "hashed", value = HashedPartitionsSpec.class)
-})
-public interface PartitionsSpec
-{
-  @JsonIgnore
-  Jobby getPartitionJob(HadoopDruidIndexerConfig config);
-
-  @JsonProperty
-  long getTargetPartitionSize();
-
-  @JsonProperty
-  long getMaxPartitionSize();
-
-  @JsonProperty
-  boolean isAssumeGrouped();
-
-  @JsonIgnore
-  boolean isDeterminingPartitions();
-
-  @JsonProperty
-  int getNumShards();
-
-  @JsonProperty
-  List<String> getPartitionDimensions();
-}
diff --git a/indexing-hadoop/src/main/java/org/apache/druid/indexer/partitions/SingleDimensionPartitionsSpec.java b/indexing-hadoop/src/main/java/org/apache/druid/indexer/partitions/SingleDimensionPartitionsSpec.java
deleted file mode 100644
index 5013044..0000000
--- a/indexing-hadoop/src/main/java/org/apache/druid/indexer/partitions/SingleDimensionPartitionsSpec.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.druid.indexer.partitions;
-
-
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonProperty;
-import com.google.common.collect.ImmutableList;
-import org.apache.druid.indexer.DeterminePartitionsJob;
-import org.apache.druid.indexer.HadoopDruidIndexerConfig;
-import org.apache.druid.indexer.Jobby;
-
-import javax.annotation.Nullable;
-import java.util.List;
-
-public class SingleDimensionPartitionsSpec extends AbstractPartitionsSpec
-{
-  @Nullable
-  private final String partitionDimension;
-
-  @JsonCreator
-  public SingleDimensionPartitionsSpec(
-      @JsonProperty("partitionDimension") @Nullable String partitionDimension,
-      @JsonProperty("targetPartitionSize") @Nullable Long targetPartitionSize,
-      @JsonProperty("maxPartitionSize") @Nullable Long maxPartitionSize,
-      @JsonProperty("assumeGrouped") @Nullable Boolean assumeGrouped
-  )
-  {
-    super(targetPartitionSize, maxPartitionSize, assumeGrouped, null);
-    this.partitionDimension = partitionDimension;
-  }
-
-  @JsonProperty
-  @Nullable
-  public String getPartitionDimension()
-  {
-    return partitionDimension;
-  }
-
-  @Override
-  public Jobby getPartitionJob(HadoopDruidIndexerConfig config)
-  {
-    return new DeterminePartitionsJob(config);
-  }
-
-  @Override
-  @JsonProperty
-  public List<String> getPartitionDimensions()
-  {
-    return ImmutableList.of();
-  }
-}
diff --git a/indexing-hadoop/src/test/java/org/apache/druid/indexer/DetermineHashedPartitionsJobTest.java b/indexing-hadoop/src/test/java/org/apache/druid/indexer/DetermineHashedPartitionsJobTest.java
index 2649874..c513f0c 100644
--- a/indexing-hadoop/src/test/java/org/apache/druid/indexer/DetermineHashedPartitionsJobTest.java
+++ b/indexing-hadoop/src/test/java/org/apache/druid/indexer/DetermineHashedPartitionsJobTest.java
@@ -73,7 +73,7 @@ public class DetermineHashedPartitionsJobTest
         new Object[][]{
             {
                 DetermineHashedPartitionsJobTest.class.getResource("/druid.test.data.with.duplicate.rows.tsv").getPath(),
-                1L,
+                1,
                 "2011-04-10T00:00:00.000Z/2011-04-11T00:00:00.000Z",
                 0,
                 1,
@@ -82,7 +82,7 @@ public class DetermineHashedPartitionsJobTest
             },
             {
                 DetermineHashedPartitionsJobTest.class.getResource("/druid.test.data.with.duplicate.rows.tsv").getPath(),
-                100L,
+                100,
                 "2011-04-10T00:00:00.000Z/2011-04-16T00:00:00.000Z",
                 0,
                 6,
@@ -91,7 +91,7 @@ public class DetermineHashedPartitionsJobTest
             },
             {
                 DetermineHashedPartitionsJobTest.class.getResource("/druid.test.data.with.duplicate.rows.tsv").getPath(),
-                1L,
+                1,
                 "2011-04-10T00:00:00.000Z/2011-04-16T00:00:00.000Z",
                 0,
                 6,
@@ -100,7 +100,7 @@ public class DetermineHashedPartitionsJobTest
             },
             {
                 DetermineHashedPartitionsJobTest.class.getResource("/druid.test.data.with.duplicate.rows.tsv").getPath(),
-                1L,
+                1,
                 null,
                 0,
                 6,
@@ -109,7 +109,7 @@ public class DetermineHashedPartitionsJobTest
             },
             {
                 DetermineHashedPartitionsJobTest.class.getResource("/druid.test.data.with.rows.in.timezone.tsv").getPath(),
-                1L,
+                1,
                 null,
                 0,
                 1,
@@ -122,7 +122,7 @@ public class DetermineHashedPartitionsJobTest
 
   public DetermineHashedPartitionsJobTest(
       String dataFilePath,
-      long targetPartitionSize,
+      int targetPartitionSize,
       String interval,
       int errorMargin,
       int expectedNumTimeBuckets,
@@ -194,7 +194,7 @@ public class DetermineHashedPartitionsJobTest
         new HadoopTuningConfig(
             tmpDir.getAbsolutePath(),
             null,
-            new HashedPartitionsSpec(targetPartitionSize, null, true, null, null),
+            new HashedPartitionsSpec(targetPartitionSize, null, null),
             null,
             null,
             null,
diff --git a/indexing-hadoop/src/test/java/org/apache/druid/indexer/DeterminePartitionsJobTest.java b/indexing-hadoop/src/test/java/org/apache/druid/indexer/DeterminePartitionsJobTest.java
index 3d8b06b..2c35b75 100644
--- a/indexing-hadoop/src/test/java/org/apache/druid/indexer/DeterminePartitionsJobTest.java
+++ b/indexing-hadoop/src/test/java/org/apache/druid/indexer/DeterminePartitionsJobTest.java
@@ -72,7 +72,7 @@ public class DeterminePartitionsJobTest
         new Object[][]{
             {
                 true,
-                3L,
+                3,
                 "2014-10-22T00:00:00Z/P1D",
                 1,
                 new int[]{5},
@@ -100,7 +100,7 @@ public class DeterminePartitionsJobTest
             },
             {
                 false,
-                3L,
+                3,
                 "2014-10-20T00:00:00Z/P1D",
                 1,
                 new int[]{5},
@@ -138,7 +138,7 @@ public class DeterminePartitionsJobTest
             },
             {
                 true,
-                6L,
+                6,
                 "2014-10-20T00:00:00Z/P3D",
                 3,
                 new int[]{2, 2, 2},
@@ -198,7 +198,7 @@ public class DeterminePartitionsJobTest
 
   public DeterminePartitionsJobTest(
       boolean assumeGrouped,
-      Long targetPartitionSize,
+      Integer targetPartitionSize,
       String interval,
       int expectedNumOfSegments,
       int[] expectedNumOfShardsForEachSegment,
@@ -257,7 +257,7 @@ public class DeterminePartitionsJobTest
             new HadoopTuningConfig(
                 tmpDir.getCanonicalPath(),
                 null,
-                new SingleDimensionPartitionsSpec(null, targetPartitionSize, null, assumeGrouped),
+                new SingleDimensionPartitionsSpec(targetPartitionSize, null, null, assumeGrouped),
                 null,
                 null,
                 null,
diff --git a/indexing-hadoop/src/test/java/org/apache/druid/indexer/HadoopIngestionSpecTest.java b/indexing-hadoop/src/test/java/org/apache/druid/indexer/HadoopIngestionSpecTest.java
index 3b17d41..fbd13c9 100644
--- a/indexing-hadoop/src/test/java/org/apache/druid/indexer/HadoopIngestionSpecTest.java
+++ b/indexing-hadoop/src/test/java/org/apache/druid/indexer/HadoopIngestionSpecTest.java
@@ -41,11 +41,11 @@ import java.util.Collections;
 
 public class HadoopIngestionSpecTest
 {
-  private static final ObjectMapper jsonMapper;
+  private static final ObjectMapper JSON_MAPPER;
 
   static {
-    jsonMapper = new DefaultObjectMapper();
-    jsonMapper.setInjectableValues(new InjectableValues.Std().addValue(ObjectMapper.class, jsonMapper));
+    JSON_MAPPER = new DefaultObjectMapper();
+    JSON_MAPPER.setInjectableValues(new InjectableValues.Std().addValue(ObjectMapper.class, JSON_MAPPER));
   }
 
   @Test
@@ -146,15 +146,11 @@ public class HadoopIngestionSpecTest
 
     final PartitionsSpec partitionsSpec = schema.getTuningConfig().getPartitionsSpec();
 
-    Assert.assertEquals(
-        "isDeterminingPartitions",
-        partitionsSpec.isDeterminingPartitions(),
-        true
-    );
+    Assert.assertTrue("isDeterminingPartitions", partitionsSpec.needsDeterminePartitions(true));
 
     Assert.assertEquals(
         "getTargetPartitionSize",
-        partitionsSpec.getTargetPartitionSize(),
+        partitionsSpec.getMaxRowsPerSegment().intValue(),
         100
     );
 
@@ -190,17 +186,18 @@ public class HadoopIngestionSpecTest
       throw new RuntimeException(e);
     }
 
-    final PartitionsSpec partitionsSpec = schema.getTuningConfig().getPartitionsSpec();
+    final SingleDimensionPartitionsSpec partitionsSpec =
+        (SingleDimensionPartitionsSpec) schema.getTuningConfig().getPartitionsSpec();
 
     Assert.assertEquals(
         "isDeterminingPartitions",
-        partitionsSpec.isDeterminingPartitions(),
+        partitionsSpec.needsDeterminePartitions(true),
         true
     );
 
     Assert.assertEquals(
         "getTargetPartitionSize",
-        partitionsSpec.getTargetPartitionSize(),
+        partitionsSpec.getMaxRowsPerSegment().intValue(),
         100
     );
 
@@ -213,7 +210,7 @@ public class HadoopIngestionSpecTest
     Assert.assertTrue("partitionsSpec", partitionsSpec instanceof SingleDimensionPartitionsSpec);
     Assert.assertEquals(
         "getPartitionDimension",
-        ((SingleDimensionPartitionsSpec) partitionsSpec).getPartitionDimension(),
+        partitionsSpec.getPartitionDimension(),
         "foo"
     );
   }
@@ -275,10 +272,9 @@ public class HadoopIngestionSpecTest
         false
     );
 
-    Assert.assertEquals(
+    Assert.assertFalse(
         "isDeterminingPartitions",
-        schema.getTuningConfig().getPartitionsSpec().isDeterminingPartitions(),
-        false
+        schema.getTuningConfig().getPartitionsSpec().needsDeterminePartitions(true)
     );
 
     Assert.assertFalse(Strings.isNullOrEmpty(schema.getUniqueId()));
@@ -338,7 +334,7 @@ public class HadoopIngestionSpecTest
   private <T> T jsonReadWriteRead(String s, Class<T> klass)
   {
     try {
-      return jsonMapper.readValue(jsonMapper.writeValueAsBytes(jsonMapper.readValue(s, klass)), klass);
+      return JSON_MAPPER.readValue(JSON_MAPPER.writeValueAsBytes(JSON_MAPPER.readValue(s, klass)), klass);
     }
     catch (Exception e) {
       throw new RuntimeException(e);
diff --git a/indexing-hadoop/src/test/java/org/apache/druid/indexer/partitions/HashedPartitionsSpecTest.java b/indexing-hadoop/src/test/java/org/apache/druid/indexer/partitions/HashedPartitionsSpecTest.java
index e8074ea..3e13fa2 100644
--- a/indexing-hadoop/src/test/java/org/apache/druid/indexer/partitions/HashedPartitionsSpecTest.java
+++ b/indexing-hadoop/src/test/java/org/apache/druid/indexer/partitions/HashedPartitionsSpecTest.java
@@ -29,110 +29,84 @@ import org.junit.Test;
  */
 public class HashedPartitionsSpecTest
 {
-  private static final ObjectMapper jsonMapper = new DefaultObjectMapper();
+  private static final ObjectMapper JSON_MAPPER = new DefaultObjectMapper();
 
   @Test
   public void testHashedPartitionsSpec()
   {
     {
-      final PartitionsSpec partitionsSpec;
-
-      try {
-        partitionsSpec = jsonReadWriteRead(
-            "{"
-            + "   \"targetPartitionSize\":100,"
-            + "   \"type\":\"hashed\""
-            + "}",
-            PartitionsSpec.class
-        );
-      }
-      catch (Exception e) {
-        throw new RuntimeException(e);
-      }
+      final PartitionsSpec partitionsSpec = jsonReadWriteRead(
+          "{"
+          + "   \"targetPartitionSize\":100,"
+          + "   \"type\":\"hashed\""
+          + "}",
+          PartitionsSpec.class
+      );
+      Assert.assertTrue("partitionsSpec", partitionsSpec instanceof HashedPartitionsSpec);
+      final HashedPartitionsSpec hadoopHashedPartitionsSpec = (HashedPartitionsSpec) partitionsSpec;
 
       Assert.assertEquals(
           "isDeterminingPartitions",
-          partitionsSpec.isDeterminingPartitions(),
+          hadoopHashedPartitionsSpec.needsDeterminePartitions(true),
           true
       );
 
       Assert.assertEquals(
           "getTargetPartitionSize",
-          partitionsSpec.getTargetPartitionSize(),
+          hadoopHashedPartitionsSpec.getMaxRowsPerSegment().intValue(),
           100
       );
 
       Assert.assertEquals(
-          "getMaxPartitionSize",
-          partitionsSpec.getMaxPartitionSize(),
-          150
-      );
-
-      Assert.assertEquals(
           "getPartitionDimensions",
-          partitionsSpec.getPartitionDimensions(),
+          hadoopHashedPartitionsSpec.getPartitionDimensions(),
           ImmutableList.of()
       );
-
-      Assert.assertTrue("partitionsSpec", partitionsSpec instanceof HashedPartitionsSpec);
     }
   }
 
   @Test
   public void testHashedPartitionsSpecShardCount()
   {
-    final PartitionsSpec partitionsSpec;
-
-    try {
-      partitionsSpec = jsonReadWriteRead(
-          "{"
-          + "   \"type\":\"hashed\","
-          + "   \"numShards\":2"
-          + "}",
-          PartitionsSpec.class
-      );
-    }
-    catch (Exception e) {
-      throw new RuntimeException(e);
-    }
+    final PartitionsSpec partitionsSpec = jsonReadWriteRead(
+        "{"
+        + "   \"type\":\"hashed\","
+        + "   \"numShards\":2"
+        + "}",
+        PartitionsSpec.class
+    );
+    Assert.assertTrue("partitionsSpec", partitionsSpec instanceof HashedPartitionsSpec);
+    final HashedPartitionsSpec hadoopHashedPartitionsSpec = (HashedPartitionsSpec) partitionsSpec;
 
     Assert.assertEquals(
         "isDeterminingPartitions",
-        partitionsSpec.isDeterminingPartitions(),
+        hadoopHashedPartitionsSpec.needsDeterminePartitions(true),
         false
     );
 
-    Assert.assertEquals(
+    Assert.assertNull(
         "getTargetPartitionSize",
-        partitionsSpec.getTargetPartitionSize(),
-        -1
-    );
-
-    Assert.assertEquals(
-        "getMaxPartitionSize",
-        partitionsSpec.getMaxPartitionSize(),
-        -1
+        hadoopHashedPartitionsSpec.getMaxRowsPerSegment()
     );
 
     Assert.assertEquals(
         "shardCount",
-        partitionsSpec.getNumShards(),
+        hadoopHashedPartitionsSpec.getNumShards().intValue(),
         2
     );
 
     Assert.assertEquals(
         "getPartitionDimensions",
-        partitionsSpec.getPartitionDimensions(),
+        hadoopHashedPartitionsSpec.getPartitionDimensions(),
         ImmutableList.of()
     );
 
-    Assert.assertTrue("partitionsSpec", partitionsSpec instanceof HashedPartitionsSpec);
   }
   
   private <T> T jsonReadWriteRead(String s, Class<T> klass)
   {
     try {
-      return jsonMapper.readValue(jsonMapper.writeValueAsBytes(jsonMapper.readValue(s, klass)), klass);
+      return JSON_MAPPER.readValue(JSON_MAPPER.writeValueAsBytes(JSON_MAPPER.readValue(s, klass)), klass);
     }
     catch (Exception e) {
       throw new RuntimeException(e);
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/index/RealtimeAppenderatorTuningConfig.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/index/RealtimeAppenderatorTuningConfig.java
index 1676a2f..3a96070 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/index/RealtimeAppenderatorTuningConfig.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/index/RealtimeAppenderatorTuningConfig.java
@@ -24,6 +24,7 @@ import com.fasterxml.jackson.annotation.JsonProperty;
 import com.fasterxml.jackson.annotation.JsonTypeName;
 import com.google.common.base.Preconditions;
 import com.google.common.io.Files;
+import org.apache.druid.indexer.partitions.DynamicPartitionsSpec;
 import org.apache.druid.segment.IndexSpec;
 import org.apache.druid.segment.indexing.TuningConfig;
 import org.apache.druid.segment.realtime.appenderator.AppenderatorConfig;
@@ -39,7 +40,6 @@ import java.io.File;
 public class RealtimeAppenderatorTuningConfig implements TuningConfig, AppenderatorConfig
 {
   private static final int defaultMaxRowsInMemory = TuningConfig.DEFAULT_MAX_ROWS_IN_MEMORY;
-  private static final int defaultMaxRowsPerSegment = 5_000_000;
   private static final Period defaultIntermediatePersistPeriod = new Period("PT10M");
   private static final int defaultMaxPendingPersists = 0;
   private static final ShardSpec defaultShardSpec = new NumberedShardSpec(0, 1);
@@ -55,9 +55,7 @@ public class RealtimeAppenderatorTuningConfig implements TuningConfig, Appendera
 
   private final int maxRowsInMemory;
   private final long maxBytesInMemory;
-  private final int maxRowsPerSegment;
-  @Nullable
-  private final Long maxTotalRows;
+  private final DynamicPartitionsSpec partitionsSpec;
   private final Period intermediatePersistPeriod;
   private final File basePersistDirectory;
   private final int maxPendingPersists;
@@ -96,11 +94,10 @@ public class RealtimeAppenderatorTuningConfig implements TuningConfig, Appendera
   )
   {
     this.maxRowsInMemory = maxRowsInMemory == null ? defaultMaxRowsInMemory : maxRowsInMemory;
-    this.maxRowsPerSegment = maxRowsPerSegment == null ? defaultMaxRowsPerSegment : maxRowsPerSegment;
     // initializing this to 0, it will be lazily intialized to a value
     // @see server.src.main.java.org.apache.druid.segment.indexing.TuningConfigs#getMaxBytesInMemoryOrDefault(long)
     this.maxBytesInMemory = maxBytesInMemory == null ? 0 : maxBytesInMemory;
-    this.maxTotalRows = maxTotalRows;
+    this.partitionsSpec = new DynamicPartitionsSpec(maxRowsPerSegment, maxTotalRows);
     this.intermediatePersistPeriod = intermediatePersistPeriod == null
                                      ? defaultIntermediatePersistPeriod
                                      : intermediatePersistPeriod;
@@ -155,7 +152,7 @@ public class RealtimeAppenderatorTuningConfig implements TuningConfig, Appendera
   @JsonProperty
   public Integer getMaxRowsPerSegment()
   {
-    return maxRowsPerSegment;
+    return partitionsSpec.getMaxRowsPerSegment();
   }
 
   @Override
@@ -163,7 +160,12 @@ public class RealtimeAppenderatorTuningConfig implements TuningConfig, Appendera
   @Nullable
   public Long getMaxTotalRows()
   {
-    return maxTotalRows;
+    return partitionsSpec.getMaxTotalRows();
+  }
+
+  public DynamicPartitionsSpec getPartitionsSpec()
+  {
+    return partitionsSpec;
   }
 
   @Override
@@ -257,8 +259,8 @@ public class RealtimeAppenderatorTuningConfig implements TuningConfig, Appendera
     return new RealtimeAppenderatorTuningConfig(
         maxRowsInMemory,
         maxBytesInMemory,
-        maxRowsPerSegment,
-        maxTotalRows,
+        partitionsSpec.getMaxRowsPerSegment(),
+        partitionsSpec.getMaxTotalRows(),
         intermediatePersistPeriod,
         dir,
         maxPendingPersists,
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java
index 9ec84de..3c22d20 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java
@@ -368,7 +368,11 @@ public class AppenderatorDriverRealtimeIndexTask extends AbstractTask implements
             AppenderatorDriverAddResult addResult = driver.add(inputRow, sequenceName, committerSupplier);
 
             if (addResult.isOk()) {
-              if (addResult.isPushRequired(tuningConfig)) {
+              final boolean isPushRequired = addResult.isPushRequired(
+                  tuningConfig.getPartitionsSpec().getMaxRowsPerSegment(),
+                  tuningConfig.getPartitionsSpec().getMaxTotalRows()
+              );
+              if (isPushRequired) {
                 publishSegments(driver, publisher, committerSupplier, sequenceName);
                 sequenceNumber++;
                 sequenceName = makeSequenceName(getId(), sequenceNumber);
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java
index 6dc1e0f..1f16f01 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java
@@ -41,6 +41,8 @@ import org.apache.druid.data.input.impl.NoopInputRowParser;
 import org.apache.druid.data.input.impl.StringDimensionSchema;
 import org.apache.druid.data.input.impl.TimeAndDimsParseSpec;
 import org.apache.druid.indexer.TaskStatus;
+import org.apache.druid.indexer.partitions.DynamicPartitionsSpec;
+import org.apache.druid.indexer.partitions.HashedPartitionsSpec;
 import org.apache.druid.indexing.common.RetryPolicyFactory;
 import org.apache.druid.indexing.common.SegmentLoaderFactory;
 import org.apache.druid.indexing.common.TaskToolbox;
@@ -853,8 +855,14 @@ public class CompactionTask extends AbstractBatchIndexTask
         // Setting maxTotalRows to Long.MAX_VALUE to respect the computed maxRowsPerSegment.
         // If this is set to something too small, compactionTask can generate small segments
         // which need to be compacted again, which in turn making auto compaction stuck in the same interval.
-        return (tuningConfig == null ? IndexTuningConfig.createDefault() : tuningConfig)
-            .withMaxRowsPerSegment(maxRowsPerSegment).withMaxTotalRows(Long.MAX_VALUE);
+        final IndexTuningConfig newTuningConfig = tuningConfig == null
+                                                       ? IndexTuningConfig.createDefault()
+                                                       : tuningConfig;
+        if (newTuningConfig.isForceGuaranteedRollup()) {
+          return newTuningConfig.withPartitionsSpec(new HashedPartitionsSpec(maxRowsPerSegment, null, null));
+        } else {
+          return newTuningConfig.withPartitionsSpec(new DynamicPartitionsSpec(maxRowsPerSegment, Long.MAX_VALUE));
+        }
       } else {
         return tuningConfig;
       }
@@ -862,8 +870,7 @@ public class CompactionTask extends AbstractBatchIndexTask
 
     /**
      * Check the validity of {@link #targetCompactionSizeBytes} and return a valid value. Note that
-     * targetCompactionSizeBytes cannot be used with {@link IndexTuningConfig#maxRowsPerSegment},
-     * {@link IndexTuningConfig#maxTotalRows}, or {@link IndexTuningConfig#numShards} together.
+     * targetCompactionSizeBytes cannot be used with {@link IndexTuningConfig#getPartitionsSpec} together.
      * {@link #hasPartitionConfig} checks one of those configs is set.
      * <p>
      * This throws an {@link IllegalArgumentException} if targetCompactionSizeBytes is set and hasPartitionConfig
@@ -880,12 +887,9 @@ public class CompactionTask extends AbstractBatchIndexTask
       if (targetCompactionSizeBytes != null && tuningConfig != null) {
         Preconditions.checkArgument(
             !hasPartitionConfig(tuningConfig),
-            "targetCompactionSizeBytes[%s] cannot be used with maxRowsPerSegment[%s], maxTotalRows[%s],"
-            + " or numShards[%s] of tuningConfig",
+            "targetCompactionSizeBytes[%s] cannot be used with partitionsSpec[%s]",
             targetCompactionSizeBytes,
-            tuningConfig.getMaxRowsPerSegment(),
-            tuningConfig.getMaxTotalRows(),
-            tuningConfig.getNumShards()
+            tuningConfig.getPartitionsSpec()
         );
         return targetCompactionSizeBytes;
       } else {
@@ -898,9 +902,7 @@ public class CompactionTask extends AbstractBatchIndexTask
     private static boolean hasPartitionConfig(@Nullable IndexTuningConfig tuningConfig)
     {
       if (tuningConfig != null) {
-        return tuningConfig.getMaxRowsPerSegment() != null
-               || tuningConfig.getMaxTotalRows() != null
-               || tuningConfig.getNumShards() != null;
+        return tuningConfig.getPartitionsSpec() != null;
       } else {
         return false;
       }
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java
index b89772c..16254de 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java
@@ -39,6 +39,9 @@ import org.apache.druid.data.input.Rows;
 import org.apache.druid.hll.HyperLogLogCollector;
 import org.apache.druid.indexer.IngestionState;
 import org.apache.druid.indexer.TaskStatus;
+import org.apache.druid.indexer.partitions.DynamicPartitionsSpec;
+import org.apache.druid.indexer.partitions.HashedPartitionsSpec;
+import org.apache.druid.indexer.partitions.PartitionsSpec;
 import org.apache.druid.indexing.appenderator.ActionBasedUsedSegmentChecker;
 import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReport;
 import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReportData;
@@ -460,19 +463,13 @@ public class IndexTask extends AbstractBatchIndexTask implements ChatHandler
 
       // Initialize maxRowsPerSegment and maxTotalRows lazily
       final IndexTuningConfig tuningConfig = ingestionSchema.tuningConfig;
-      @Nullable
-      final Integer maxRowsPerSegment = getValidMaxRowsPerSegment(tuningConfig);
-      @Nullable
-      final Long maxTotalRows = getValidMaxTotalRows(tuningConfig);
-      // Spec for segment allocation. This is used only for perfect rollup mode.
-      // See createSegmentAllocator().
+      final PartitionsSpec partitionsSpec = tuningConfig.getGivenOrDefaultPartitionsSpec();
       final Map<Interval, Pair<ShardSpecFactory, Integer>> allocateSpec = determineShardSpecs(
           toolbox,
           firehoseFactory,
           firehoseTempDir,
-          maxRowsPerSegment
+          partitionsSpec
       );
-
       final List<Interval> allocateIntervals = new ArrayList<>(allocateSpec.keySet());
       final DataSchema dataSchema;
       if (determineIntervals) {
@@ -496,8 +493,7 @@ public class IndexTask extends AbstractBatchIndexTask implements ChatHandler
           allocateSpec,
           firehoseFactory,
           firehoseTempDir,
-          maxRowsPerSegment,
-          maxTotalRows
+          partitionsSpec
       );
     }
     catch (Exception e) {
@@ -597,7 +593,8 @@ public class IndexTask extends AbstractBatchIndexTask implements ChatHandler
    * specified in {@link IndexTuningConfig}.
    * <p/>
    * If both intervals and shardSpecs don't have to be determined, this method simply returns {@link ShardSpecs} for the
-   * given intervals.  Here, if {@link IndexTuningConfig#numShards} is not specified, {@link NumberedShardSpec} is used.
+   * given intervals.  Here, if {@link HashedPartitionsSpec#numShards} is not specified, {@link NumberedShardSpec} is
+   * used.
    * <p/>
    * If one of intervals or shardSpecs need to be determined, this method reads the entire input for determining one of
    * them.  If the perfect rollup must be guaranteed, {@link HashBasedNumberedShardSpec} is used for hash partitioning
@@ -609,7 +606,7 @@ public class IndexTask extends AbstractBatchIndexTask implements ChatHandler
       final TaskToolbox toolbox,
       final FirehoseFactory firehoseFactory,
       final File firehoseTempDir,
-      @Nullable final Integer maxRowsPerSegment
+      final PartitionsSpec nonNullPartitionsSpec
   ) throws IOException
   {
     final ObjectMapper jsonMapper = toolbox.getObjectMapper();
@@ -622,13 +619,17 @@ public class IndexTask extends AbstractBatchIndexTask implements ChatHandler
     final boolean determineIntervals = !granularitySpec.bucketIntervals().isPresent();
 
     // Must determine partitions if rollup is guaranteed and the user didn't provide a specific value.
-    final boolean determineNumPartitions = tuningConfig.getNumShards() == null
-                                           && isGuaranteedRollup(ioConfig, tuningConfig);
+    final boolean determineNumPartitions = nonNullPartitionsSpec.needsDeterminePartitions(false);
 
     // if we were given number of shards per interval and the intervals, we don't need to scan the data
     if (!determineNumPartitions && !determineIntervals) {
       log.info("Skipping determine partition scan");
-      return createShardSpecWithoutInputScan(granularitySpec, ioConfig, tuningConfig);
+      return createShardSpecWithoutInputScan(
+          granularitySpec,
+          ioConfig,
+          tuningConfig,
+          nonNullPartitionsSpec
+      );
     } else {
       // determine intervals containing data and prime HLL collectors
       return createShardSpecsFromInput(
@@ -637,10 +638,8 @@ public class IndexTask extends AbstractBatchIndexTask implements ChatHandler
           firehoseFactory,
           firehoseTempDir,
           granularitySpec,
-          tuningConfig,
-          determineIntervals,
-          determineNumPartitions,
-          maxRowsPerSegment
+          nonNullPartitionsSpec,
+          determineIntervals
       );
     }
   }
@@ -648,7 +647,8 @@ public class IndexTask extends AbstractBatchIndexTask implements ChatHandler
   private Map<Interval, Pair<ShardSpecFactory, Integer>> createShardSpecWithoutInputScan(
       GranularitySpec granularitySpec,
       IndexIOConfig ioConfig,
-      IndexTuningConfig tuningConfig
+      IndexTuningConfig tuningConfig,
+      PartitionsSpec nonNullPartitionsSpec
   )
   {
     final Map<Interval, Pair<ShardSpecFactory, Integer>> allocateSpec = new HashMap<>();
@@ -656,11 +656,14 @@ public class IndexTask extends AbstractBatchIndexTask implements ChatHandler
 
     if (isGuaranteedRollup(ioConfig, tuningConfig)) {
       // Overwrite mode, guaranteed rollup: shardSpecs must be known in advance.
-      final int numShards = tuningConfig.getNumShards() == null ? 1 : tuningConfig.getNumShards();
+      assert nonNullPartitionsSpec instanceof HashedPartitionsSpec;
+      final HashedPartitionsSpec partitionsSpec = (HashedPartitionsSpec) nonNullPartitionsSpec;
+      final int numShards = partitionsSpec.getNumShards() == null ? 1 : partitionsSpec.getNumShards();
+
       for (Interval interval : intervals) {
         allocateSpec.put(
             interval,
-            createShardSpecFactoryForGuaranteedRollup(numShards, tuningConfig.partitionDimensions)
+            createShardSpecFactoryForGuaranteedRollup(numShards, partitionsSpec.getPartitionDimensions())
         );
       }
     } else {
@@ -678,10 +681,8 @@ public class IndexTask extends AbstractBatchIndexTask implements ChatHandler
       FirehoseFactory firehoseFactory,
       File firehoseTempDir,
       GranularitySpec granularitySpec,
-      IndexTuningConfig tuningConfig,
-      boolean determineIntervals,
-      boolean determineNumPartitions,
-      @Nullable Integer maxRowsPerSegment
+      PartitionsSpec nonNullPartitionsSpec,
+      boolean determineIntervals
   ) throws IOException
   {
     log.info("Determining intervals and shardSpecs");
@@ -693,33 +694,42 @@ public class IndexTask extends AbstractBatchIndexTask implements ChatHandler
         firehoseFactory,
         firehoseTempDir,
         granularitySpec,
-        determineIntervals,
-        determineNumPartitions
+        nonNullPartitionsSpec,
+        determineIntervals
     );
 
     final Map<Interval, Pair<ShardSpecFactory, Integer>> allocateSpecs = new HashMap<>();
-    final int defaultNumShards = tuningConfig.getNumShards() == null ? 1 : tuningConfig.getNumShards();
     for (final Map.Entry<Interval, Optional<HyperLogLogCollector>> entry : hllCollectors.entrySet()) {
       final Interval interval = entry.getKey();
-      final HyperLogLogCollector collector = entry.getValue().orNull();
-
-      final int numShards;
-      if (determineNumPartitions) {
-        final long numRows = Preconditions.checkNotNull(collector, "HLL collector").estimateCardinalityRound();
-        numShards = (int) Math.ceil(
-            (double) numRows / Preconditions.checkNotNull(maxRowsPerSegment, "maxRowsPerSegment")
-        );
-        log.info("Estimated [%,d] rows of data for interval [%s], creating [%,d] shards", numRows, interval, numShards);
-      } else {
-        numShards = defaultNumShards;
-        log.info("Creating [%,d] shards for interval [%s]", numShards, interval);
-      }
 
       if (isGuaranteedRollup(ingestionSchema.getIOConfig(), ingestionSchema.getTuningConfig())) {
+        assert nonNullPartitionsSpec instanceof HashedPartitionsSpec;
+        final HashedPartitionsSpec partitionsSpec = (HashedPartitionsSpec) nonNullPartitionsSpec;
+
+        final HyperLogLogCollector collector = entry.getValue().orNull();
+
+        final int numShards;
+        if (partitionsSpec.needsDeterminePartitions(false)) {
+          final long numRows = Preconditions.checkNotNull(collector, "HLL collector").estimateCardinalityRound();
+          final int nonNullMaxRowsPerSegment = partitionsSpec.getMaxRowsPerSegment() == null
+                                               ? PartitionsSpec.DEFAULT_MAX_ROWS_PER_SEGMENT
+                                               : partitionsSpec.getMaxRowsPerSegment();
+          numShards = (int) Math.ceil((double) numRows / nonNullMaxRowsPerSegment);
+          log.info(
+              "Estimated [%,d] rows of data for interval [%s], creating [%,d] shards",
+              numRows,
+              interval,
+              numShards
+          );
+        } else {
+          numShards = partitionsSpec.getNumShards() == null ? 1 : partitionsSpec.getNumShards();
+          log.info("Creating [%,d] shards for interval [%s]", numShards, interval);
+        }
+
         // Overwrite mode, guaranteed rollup: # of shards must be known in advance.
         allocateSpecs.put(
             interval,
-            createShardSpecFactoryForGuaranteedRollup(numShards, tuningConfig.partitionDimensions)
+            createShardSpecFactoryForGuaranteedRollup(numShards, partitionsSpec.getPartitionDimensions())
         );
       } else {
         allocateSpecs.put(interval, null);
@@ -744,8 +754,8 @@ public class IndexTask extends AbstractBatchIndexTask implements ChatHandler
       FirehoseFactory firehoseFactory,
       File firehoseTempDir,
       GranularitySpec granularitySpec,
-      boolean determineIntervals,
-      boolean determineNumPartitions
+      PartitionsSpec nonNullPartitionsSpec,
+      boolean determineIntervals
   ) throws IOException
   {
     final Map<Interval, Optional<HyperLogLogCollector>> hllCollectors = new TreeMap<>(
@@ -787,7 +797,7 @@ public class IndexTask extends AbstractBatchIndexTask implements ChatHandler
             interval = optInterval.get();
           }
 
-          if (determineNumPartitions) {
+          if (nonNullPartitionsSpec.needsDeterminePartitions(false)) {
             hllCollectors.computeIfAbsent(interval, intv -> Optional.of(HyperLogLogCollector.makeLatestCollector()));
 
             List<Object> groupKey = Rows.toGroupKey(
@@ -860,14 +870,14 @@ public class IndexTask extends AbstractBatchIndexTask implements ChatHandler
   /**
    * This method reads input data row by row and adds the read row to a proper segment using {@link BaseAppenderatorDriver}.
    * If there is no segment for the row, a new one is created.  Segments can be published in the middle of reading inputs
-   * if one of below conditions are satisfied.
+   * if {@link DynamicPartitionsSpec} is used and one of below conditions are satisfied.
    *
    * <ul>
    * <li>
-   * If the number of rows in a segment exceeds {@link IndexTuningConfig#maxRowsPerSegment}
+   * If the number of rows in a segment exceeds {@link DynamicPartitionsSpec#maxRowsPerSegment}
    * </li>
    * <li>
-   * If the number of rows added to {@link BaseAppenderatorDriver} so far exceeds {@link IndexTuningConfig#maxTotalRows}
+   * If the number of rows added to {@link BaseAppenderatorDriver} so far exceeds {@link DynamicPartitionsSpec#maxTotalRows}
    * </li>
    * </ul>
    * <p>
@@ -881,10 +891,13 @@ public class IndexTask extends AbstractBatchIndexTask implements ChatHandler
       final Map<Interval, Pair<ShardSpecFactory, Integer>> allocateSpec,
       final FirehoseFactory firehoseFactory,
       final File firehoseTempDir,
-      @Nullable final Integer maxRowsPerSegment,
-      @Nullable final Long maxTotalRows
+      final PartitionsSpec partitionsSpec
   ) throws IOException, InterruptedException
   {
+    @Nullable
+    final DynamicPartitionsSpec dynamicPartitionsSpec = partitionsSpec instanceof DynamicPartitionsSpec
+                                                        ? (DynamicPartitionsSpec) partitionsSpec
+                                                        : null;
     final GranularitySpec granularitySpec = dataSchema.getGranularitySpec();
     final FireDepartment fireDepartmentForMetrics =
         new FireDepartment(dataSchema, new RealtimeIOConfig(null, null), null);
@@ -899,10 +912,8 @@ public class IndexTask extends AbstractBatchIndexTask implements ChatHandler
       toolbox.getMonitorScheduler().addMonitor(metricsMonitor);
     }
 
-    final IndexIOConfig ioConfig = ingestionSchema.getIOConfig();
     final IndexTuningConfig tuningConfig = ingestionSchema.getTuningConfig();
     final long pushTimeout = tuningConfig.getPushTimeout();
-    final boolean isGuaranteedRollup = isGuaranteedRollup(ioConfig, tuningConfig);
 
     final IndexTaskSegmentAllocator segmentAllocator = createSegmentAllocator(
         toolbox,
@@ -956,13 +967,20 @@ public class IndexTask extends AbstractBatchIndexTask implements ChatHandler
           final AppenderatorDriverAddResult addResult = driver.add(inputRow, sequenceName);
 
           if (addResult.isOk()) {
+
             // incremental segment publishment is allowed only when rollup don't have to be perfect.
-            if (!isGuaranteedRollup && addResult.isPushRequired(maxRowsPerSegment, maxTotalRows)) {
-              // There can be some segments waiting for being published even though any rows won't be added to them.
-              // If those segments are not published here, the available space in appenderator will be kept to be small
-              // which makes the size of segments smaller.
-              final SegmentsAndMetadata pushed = driver.pushAllAndClear(pushTimeout);
-              log.info("Pushed segments[%s]", pushed.getSegments());
+            if (dynamicPartitionsSpec != null) {
+              final boolean isPushRequired = addResult.isPushRequired(
+                  dynamicPartitionsSpec.getMaxRowsPerSegment(),
+                  dynamicPartitionsSpec.getMaxTotalRows()
+              );
+              if (isPushRequired) {
+                // There can be some segments waiting for being published even though any rows won't be added to them.
+                // If those segments are not published here, the available space in appenderator will be kept to be
+                // small which makes the size of segments smaller.
+                final SegmentsAndMetadata pushed = driver.pushAllAndClear(pushTimeout);
+                log.info("Pushed segments[%s]", pushed.getSegments());
+              }
             }
           } else {
             throw new ISE("Failed to add a row with timestamp[%s]", inputRow.getTimestamp());
@@ -1017,44 +1035,6 @@ public class IndexTask extends AbstractBatchIndexTask implements ChatHandler
     }
   }
 
-  /**
-   * Return the valid target partition size. If {@link IndexTuningConfig#numShards} is valid, this returns null.
-   * Otherwise, this returns {@link IndexTuningConfig#DEFAULT_MAX_ROWS_PER_SEGMENT} or the given
-   * {@link IndexTuningConfig#maxRowsPerSegment}.
-   */
-  public static Integer getValidMaxRowsPerSegment(IndexTuningConfig tuningConfig)
-  {
-    @Nullable
-    final Integer numShards = tuningConfig.numShards;
-    @Nullable
-    final Integer maxRowsPerSegment = tuningConfig.maxRowsPerSegment;
-    if (numShards == null || numShards == -1) {
-      return maxRowsPerSegment == null || maxRowsPerSegment.equals(-1)
-             ? IndexTuningConfig.DEFAULT_MAX_ROWS_PER_SEGMENT
-             : maxRowsPerSegment;
-    } else {
-      return null;
-    }
-  }
-
-  /**
-   * Return the valid target partition size. If {@link IndexTuningConfig#numShards} is valid, this returns null.
-   * Otherwise, this returns {@link IndexTuningConfig#DEFAULT_MAX_TOTAL_ROWS} or the given
-   * {@link IndexTuningConfig#maxTotalRows}.
-   */
-  public static Long getValidMaxTotalRows(IndexTuningConfig tuningConfig)
-  {
-    @Nullable
-    final Integer numShards = tuningConfig.numShards;
-    @Nullable
-    final Long maxTotalRows = tuningConfig.maxTotalRows;
-    if (numShards == null || numShards == -1) {
-      return maxTotalRows == null ? IndexTuningConfig.DEFAULT_MAX_TOTAL_ROWS : maxTotalRows;
-    } else {
-      return null;
-    }
-  }
-
   private void handleParseException(ParseException e)
   {
     if (e.isFromPartiallyValidRow()) {
@@ -1244,24 +1224,18 @@ public class IndexTask extends AbstractBatchIndexTask implements ChatHandler
   @JsonTypeName("index")
   public static class IndexTuningConfig implements TuningConfig, AppenderatorConfig
   {
-    static final int DEFAULT_MAX_ROWS_PER_SEGMENT = 5_000_000;
-    static final int DEFAULT_MAX_TOTAL_ROWS = 20_000_000;
-
     private static final IndexSpec DEFAULT_INDEX_SPEC = new IndexSpec();
     private static final int DEFAULT_MAX_PENDING_PERSISTS = 0;
     private static final boolean DEFAULT_GUARANTEE_ROLLUP = false;
     private static final boolean DEFAULT_REPORT_PARSE_EXCEPTIONS = false;
     private static final long DEFAULT_PUSH_TIMEOUT = 0;
 
-    @Nullable
-    private final Integer maxRowsPerSegment;
     private final int maxRowsInMemory;
     private final long maxBytesInMemory;
+
+    // null if all partitionsSpec related params are null. see getDefaultPartitionsSpec() for details.
     @Nullable
-    private final Long maxTotalRows;
-    @Nullable
-    private final Integer numShards;
-    private final List<String> partitionDimensions;
+    private final PartitionsSpec partitionsSpec;
     private final IndexSpec indexSpec;
     private final IndexSpec indexSpecForIntermediatePersists;
     private final File basePersistDirectory;
@@ -1288,21 +1262,60 @@ public class IndexTask extends AbstractBatchIndexTask implements ChatHandler
       return new IndexTuningConfig();
     }
 
+    @Nullable
+    private static PartitionsSpec getDefaultPartitionsSpec(
+        boolean forceGuaranteedRollup,
+        @Nullable PartitionsSpec partitionsSpec,
+        @Nullable Integer maxRowsPerSegment,
+        @Nullable Long maxTotalRows,
+        @Nullable Integer numShards,
+        @Nullable List<String> partitionDimensions
+    )
+    {
+      if (partitionsSpec == null) {
+        if (forceGuaranteedRollup) {
+          if (maxRowsPerSegment != null
+              || numShards != null
+              || (partitionDimensions != null && !partitionDimensions.isEmpty())) {
+            return new HashedPartitionsSpec(maxRowsPerSegment, numShards, partitionDimensions);
+          } else {
+            return null;
+          }
+        } else {
+          if (maxRowsPerSegment != null || maxTotalRows != null) {
+            return new DynamicPartitionsSpec(maxRowsPerSegment, maxTotalRows);
+          } else {
+            return null;
+          }
+        }
+      } else {
+        if (forceGuaranteedRollup) {
+          if (!(partitionsSpec instanceof HashedPartitionsSpec)) {
+            throw new ISE("HashedPartitionsSpec must be used for perfect rollup");
+          }
+        } else {
+          if (!(partitionsSpec instanceof DynamicPartitionsSpec)) {
+            throw new ISE("DynamicPartitionsSpec must be used for best-effort rollup");
+          }
+        }
+        return partitionsSpec;
+      }
+    }
+
     @JsonCreator
     public IndexTuningConfig(
         @JsonProperty("targetPartitionSize") @Deprecated @Nullable Integer targetPartitionSize,
-        @JsonProperty("maxRowsPerSegment") @Nullable Integer maxRowsPerSegment,
+        @JsonProperty("maxRowsPerSegment") @Deprecated @Nullable Integer maxRowsPerSegment,
         @JsonProperty("maxRowsInMemory") @Nullable Integer maxRowsInMemory,
         @JsonProperty("maxBytesInMemory") @Nullable Long maxBytesInMemory,
-        @JsonProperty("maxTotalRows") @Nullable Long maxTotalRows,
+        @JsonProperty("maxTotalRows") @Deprecated @Nullable Long maxTotalRows,
         @JsonProperty("rowFlushBoundary") @Deprecated @Nullable Integer rowFlushBoundary_forBackCompatibility,
-        @JsonProperty("numShards") @Nullable Integer numShards,
-        @JsonProperty("partitionDimensions") @Nullable List<String> partitionDimensions,
+        @JsonProperty("numShards") @Deprecated @Nullable Integer numShards,
+        @JsonProperty("partitionDimensions") @Deprecated @Nullable List<String> partitionDimensions,
+        @JsonProperty("partitionsSpec") @Nullable PartitionsSpec partitionsSpec,
         @JsonProperty("indexSpec") @Nullable IndexSpec indexSpec,
         @JsonProperty("indexSpecForIntermediatePersists") @Nullable IndexSpec indexSpecForIntermediatePersists,
         @JsonProperty("maxPendingPersists") @Nullable Integer maxPendingPersists,
-        // This parameter is left for compatibility when reading existing JSONs, to be removed in Druid 0.12.
-        @JsonProperty("buildV9Directly") @Nullable Boolean buildV9Directly,
         @JsonProperty("forceGuaranteedRollup") @Nullable Boolean forceGuaranteedRollup,
         @JsonProperty("reportParseExceptions") @Deprecated @Nullable Boolean reportParseExceptions,
         @JsonProperty("publishTimeout") @Deprecated @Nullable Long publishTimeout,
@@ -1315,12 +1328,16 @@ public class IndexTask extends AbstractBatchIndexTask implements ChatHandler
     )
     {
       this(
-          maxRowsPerSegment == null ? targetPartitionSize : maxRowsPerSegment,
           maxRowsInMemory != null ? maxRowsInMemory : rowFlushBoundary_forBackCompatibility,
           maxBytesInMemory != null ? maxBytesInMemory : 0,
-          maxTotalRows,
-          numShards,
-          partitionDimensions,
+          getDefaultPartitionsSpec(
+              forceGuaranteedRollup == null ? DEFAULT_GUARANTEE_ROLLUP : forceGuaranteedRollup,
+              partitionsSpec,
+              maxRowsPerSegment == null ? targetPartitionSize : maxRowsPerSegment,
+              maxTotalRows,
+              numShards,
+              partitionDimensions
+          ),
           indexSpec,
           indexSpecForIntermediatePersists,
           maxPendingPersists,
@@ -1342,16 +1359,13 @@ public class IndexTask extends AbstractBatchIndexTask implements ChatHandler
 
     private IndexTuningConfig()
     {
-      this(null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null);
+      this(null, null, null, null, null, null, null, null, null, null, null, null, null, null);
     }
 
     private IndexTuningConfig(
-        @Nullable Integer maxRowsPerSegment,
         @Nullable Integer maxRowsInMemory,
         @Nullable Long maxBytesInMemory,
-        @Nullable Long maxTotalRows,
-        @Nullable Integer numShards,
-        @Nullable List<String> partitionDimensions,
+        @Nullable PartitionsSpec partitionsSpec,
         @Nullable IndexSpec indexSpec,
         @Nullable IndexSpec indexSpecForIntermediatePersists,
         @Nullable Integer maxPendingPersists,
@@ -1365,21 +1379,11 @@ public class IndexTask extends AbstractBatchIndexTask implements ChatHandler
         @Nullable Integer maxSavedParseExceptions
     )
     {
-      Preconditions.checkArgument(
-          maxRowsPerSegment == null || maxRowsPerSegment.equals(-1) || numShards == null || numShards.equals(-1),
-          "maxRowsPerSegment and numShards cannot both be set"
-      );
-
-      this.maxRowsPerSegment = (maxRowsPerSegment != null && maxRowsPerSegment == -1)
-                               ? null
-                               : maxRowsPerSegment;
       this.maxRowsInMemory = maxRowsInMemory == null ? TuningConfig.DEFAULT_MAX_ROWS_IN_MEMORY : maxRowsInMemory;
       // initializing this to 0, it will be lazily initialized to a value
       // @see server.src.main.java.org.apache.druid.segment.indexing.TuningConfigs#getMaxBytesInMemoryOrDefault(long)
       this.maxBytesInMemory = maxBytesInMemory == null ? 0 : maxBytesInMemory;
-      this.maxTotalRows = maxTotalRows;
-      this.numShards = numShards == null || numShards.equals(-1) ? null : numShards;
-      this.partitionDimensions = partitionDimensions == null ? Collections.emptyList() : partitionDimensions;
+      this.partitionsSpec = partitionsSpec;
       this.indexSpec = indexSpec == null ? DEFAULT_INDEX_SPEC : indexSpec;
       this.indexSpecForIntermediatePersists = indexSpecForIntermediatePersists == null ?
                                               this.indexSpec : indexSpecForIntermediatePersists;
@@ -1412,12 +1416,9 @@ public class IndexTask extends AbstractBatchIndexTask implements ChatHandler
     public IndexTuningConfig withBasePersistDirectory(File dir)
     {
       return new IndexTuningConfig(
-          maxRowsPerSegment,
           maxRowsInMemory,
           maxBytesInMemory,
-          maxTotalRows,
-          numShards,
-          partitionDimensions,
+          partitionsSpec,
           indexSpec,
           indexSpecForIntermediatePersists,
           maxPendingPersists,
@@ -1432,38 +1433,12 @@ public class IndexTask extends AbstractBatchIndexTask implements ChatHandler
       );
     }
 
-    public IndexTuningConfig withMaxTotalRows(Long maxTotalRows)
+    public IndexTuningConfig withPartitionsSpec(PartitionsSpec partitionsSpec)
     {
       return new IndexTuningConfig(
-          maxRowsPerSegment,
           maxRowsInMemory,
           maxBytesInMemory,
-          maxTotalRows,
-          numShards,
-          partitionDimensions,
-          indexSpec,
-          indexSpecForIntermediatePersists,
-          maxPendingPersists,
-          forceGuaranteedRollup,
-          reportParseExceptions,
-          pushTimeout,
-          basePersistDirectory,
-          segmentWriteOutMediumFactory,
-          logParseExceptions,
-          maxParseExceptions,
-          maxSavedParseExceptions
-      );
-    }
-
-    public IndexTuningConfig withMaxRowsPerSegment(int maxRowsPerSegment)
-    {
-      return new IndexTuningConfig(
-          maxRowsPerSegment,
-          maxRowsInMemory,
-          maxBytesInMemory,
-          maxTotalRows,
-          numShards,
-          partitionDimensions,
+          partitionsSpec,
           indexSpec,
           indexSpecForIntermediatePersists,
           maxPendingPersists,
@@ -1480,14 +1455,14 @@ public class IndexTask extends AbstractBatchIndexTask implements ChatHandler
 
     /**
      * Return the max number of rows per segment. This returns null if it's not specified in tuningConfig.
-     * Please use {@link IndexTask#getValidMaxRowsPerSegment} instead to get the valid value.
+     * Deprecated in favor of {@link #getGivenOrDefaultPartitionsSpec()}.
      */
     @Nullable
-    @JsonProperty
     @Override
+    @Deprecated
     public Integer getMaxRowsPerSegment()
     {
-      return maxRowsPerSegment;
+      return partitionsSpec == null ? null : partitionsSpec.getMaxRowsPerSegment();
     }
 
     @JsonProperty
@@ -1506,26 +1481,50 @@ public class IndexTask extends AbstractBatchIndexTask implements ChatHandler
 
     /**
      * Return the max number of total rows in appenderator. This returns null if it's not specified in tuningConfig.
-     * Please use {@link IndexTask#getValidMaxTotalRows} instead to get the valid value.
+     * Deprecated in favor of {@link #getGivenOrDefaultPartitionsSpec()}.
      */
-    @JsonProperty
     @Override
     @Nullable
+    @Deprecated
     public Long getMaxTotalRows()
     {
-      return maxTotalRows;
+      return partitionsSpec instanceof DynamicPartitionsSpec
+             ? ((DynamicPartitionsSpec) partitionsSpec).getMaxTotalRows()
+             : null;
     }
 
-    @JsonProperty
+    @Deprecated
+    @Nullable
     public Integer getNumShards()
     {
-      return numShards;
+      return partitionsSpec instanceof HashedPartitionsSpec
+             ? ((HashedPartitionsSpec) partitionsSpec).getNumShards()
+             : null;
     }
 
-    @JsonProperty
+    @Deprecated
     public List<String> getPartitionDimensions()
     {
-      return partitionDimensions;
+      return partitionsSpec instanceof HashedPartitionsSpec
+             ? ((HashedPartitionsSpec) partitionsSpec).getPartitionDimensions()
+             : Collections.emptyList();
+    }
+
+    @JsonProperty
+    @Nullable
+    public PartitionsSpec getPartitionsSpec()
+    {
+      return partitionsSpec;
+    }
+
+    public PartitionsSpec getGivenOrDefaultPartitionsSpec()
+    {
+      if (partitionsSpec != null) {
+        return partitionsSpec;
+      }
+      return forceGuaranteedRollup
+             ? new HashedPartitionsSpec(null, null, null)
+             : new DynamicPartitionsSpec(null, null);
     }
 
     @JsonProperty
@@ -1635,10 +1634,7 @@ public class IndexTask extends AbstractBatchIndexTask implements ChatHandler
              logParseExceptions == that.logParseExceptions &&
              maxParseExceptions == that.maxParseExceptions &&
              maxSavedParseExceptions == that.maxSavedParseExceptions &&
-             Objects.equals(maxRowsPerSegment, that.maxRowsPerSegment) &&
-             Objects.equals(maxTotalRows, that.maxTotalRows) &&
-             Objects.equals(numShards, that.numShards) &&
-             Objects.equals(partitionDimensions, that.partitionDimensions) &&
+             Objects.equals(partitionsSpec, that.partitionsSpec) &&
              Objects.equals(indexSpec, that.indexSpec) &&
              Objects.equals(indexSpecForIntermediatePersists, that.indexSpecForIntermediatePersists) &&
              Objects.equals(basePersistDirectory, that.basePersistDirectory) &&
@@ -1649,12 +1645,9 @@ public class IndexTask extends AbstractBatchIndexTask implements ChatHandler
     public int hashCode()
     {
       return Objects.hash(
-          maxRowsPerSegment,
           maxRowsInMemory,
           maxBytesInMemory,
-          maxTotalRows,
-          numShards,
-          partitionDimensions,
+          partitionsSpec,
           indexSpec,
           indexSpecForIntermediatePersists,
           basePersistDirectory,
@@ -1673,12 +1666,9 @@ public class IndexTask extends AbstractBatchIndexTask implements ChatHandler
     public String toString()
     {
       return "IndexTuningConfig{" +
-             "maxRowsPerSegment=" + maxRowsPerSegment +
-             ", maxRowsInMemory=" + maxRowsInMemory +
+             "maxRowsInMemory=" + maxRowsInMemory +
              ", maxBytesInMemory=" + maxBytesInMemory +
-             ", maxTotalRows=" + maxTotalRows +
-             ", numShards=" + numShards +
-             ", partitionDimensions=" + partitionDimensions +
+             ", partitionsSpec=" + partitionsSpec +
              ", indexSpec=" + indexSpec +
              ", indexSpecForIntermediatePersists=" + indexSpecForIntermediatePersists +
              ", basePersistDirectory=" + basePersistDirectory +
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSubTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSubTask.java
index ae40153..644b1c1 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSubTask.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSubTask.java
@@ -30,6 +30,7 @@ import org.apache.druid.data.input.Firehose;
 import org.apache.druid.data.input.FirehoseFactory;
 import org.apache.druid.data.input.InputRow;
 import org.apache.druid.indexer.TaskStatus;
+import org.apache.druid.indexer.partitions.DynamicPartitionsSpec;
 import org.apache.druid.indexing.appenderator.ActionBasedSegmentAllocator;
 import org.apache.druid.indexing.appenderator.ActionBasedUsedSegmentChecker;
 import org.apache.druid.indexing.common.LockGranularity;
@@ -49,7 +50,6 @@ import org.apache.druid.indexing.common.task.Tasks;
 import org.apache.druid.java.util.common.ISE;
 import org.apache.druid.java.util.common.Intervals;
 import org.apache.druid.java.util.common.StringUtils;
-import org.apache.druid.java.util.common.granularity.Granularities;
 import org.apache.druid.java.util.common.granularity.Granularity;
 import org.apache.druid.java.util.common.logger.Logger;
 import org.apache.druid.java.util.common.parsers.ParseException;
@@ -58,7 +58,6 @@ import org.apache.druid.segment.indexing.DataSchema;
 import org.apache.druid.segment.indexing.RealtimeIOConfig;
 import org.apache.druid.segment.indexing.granularity.ArbitraryGranularitySpec;
 import org.apache.druid.segment.indexing.granularity.GranularitySpec;
-import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec;
 import org.apache.druid.segment.realtime.FireDepartment;
 import org.apache.druid.segment.realtime.FireDepartmentMetrics;
 import org.apache.druid.segment.realtime.RealtimeMetricsMonitor;
@@ -369,10 +368,10 @@ public class ParallelIndexSubTask extends AbstractBatchIndexTask
    *
    * <ul>
    * <li>
-   * If the number of rows in a segment exceeds {@link ParallelIndexTuningConfig#maxRowsPerSegment}
+   * If the number of rows in a segment exceeds {@link DynamicPartitionsSpec#maxRowsPerSegment}
    * </li>
    * <li>
-   * If the number of rows added to {@link BaseAppenderatorDriver} so far exceeds {@link ParallelIndexTuningConfig#maxTotalRows}
+   * If the number of rows added to {@link BaseAppenderatorDriver} so far exceeds {@link DynamicPartitionsSpec#maxTotalRows}
    * </li>
    * </ul>
    *
@@ -404,8 +403,7 @@ public class ParallelIndexSubTask extends AbstractBatchIndexTask
 
     // Initialize maxRowsPerSegment and maxTotalRows lazily
     final ParallelIndexTuningConfig tuningConfig = ingestionSchema.getTuningConfig();
-    @Nullable final Integer maxRowsPerSegment = IndexTask.getValidMaxRowsPerSegment(tuningConfig);
-    @Nullable final Long maxTotalRows = IndexTask.getValidMaxTotalRows(tuningConfig);
+    final DynamicPartitionsSpec partitionsSpec = (DynamicPartitionsSpec) tuningConfig.getGivenOrDefaultPartitionsSpec();
     final long pushTimeout = tuningConfig.getPushTimeout();
     final boolean explicitIntervals = granularitySpec.bucketIntervals().isPresent();
     final SegmentAllocator segmentAllocator = createSegmentAllocator(toolbox, taskClient);
@@ -451,7 +449,11 @@ public class ParallelIndexSubTask extends AbstractBatchIndexTask
           final AppenderatorDriverAddResult addResult = driver.add(inputRow, sequenceName);
 
           if (addResult.isOk()) {
-            if (addResult.isPushRequired(maxRowsPerSegment, maxTotalRows)) {
+            final boolean isPushRequired = addResult.isPushRequired(
+                partitionsSpec.getMaxRowsPerSegment(),
+                partitionsSpec.getMaxTotalRows()
+            );
+            if (isPushRequired) {
               // There can be some segments waiting for being published even though any rows won't be added to them.
               // If those segments are not published here, the available space in appenderator will be kept to be small
               // which makes the size of segments smaller.
@@ -485,16 +487,6 @@ public class ParallelIndexSubTask extends AbstractBatchIndexTask
     }
   }
 
-
-  private static Granularity findSegmentGranularity(GranularitySpec granularitySpec)
-  {
-    if (granularitySpec instanceof UniformGranularitySpec) {
-      return granularitySpec.getSegmentGranularity();
-    } else {
-      return Granularities.ALL;
-    }
-  }
-
   private Appenderator newAppenderator(
       FireDepartmentMetrics metrics,
       TaskToolbox toolbox,
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java
index 1f1b571..535b046 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java
@@ -383,17 +383,17 @@ public class ParallelIndexSupervisorTask extends AbstractBatchIndexTask implemen
   {
     return new IndexTuningConfig(
         null,
-        tuningConfig.getMaxRowsPerSegment(),
+        null,
         tuningConfig.getMaxRowsInMemory(),
         tuningConfig.getMaxBytesInMemory(),
-        tuningConfig.getMaxTotalRows(),
         null,
-        tuningConfig.getNumShards(),
         null,
+        null,
+        null,
+        tuningConfig.getPartitionsSpec(),
         tuningConfig.getIndexSpec(),
         tuningConfig.getIndexSpecForIntermediatePersists(),
         tuningConfig.getMaxPendingPersists(),
-        true,
         false,
         tuningConfig.isReportParseExceptions(),
         null,
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexTuningConfig.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexTuningConfig.java
index 9c480d2..86f1e3f 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexTuningConfig.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexTuningConfig.java
@@ -23,6 +23,8 @@ import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.fasterxml.jackson.annotation.JsonTypeName;
 import com.google.common.base.Preconditions;
+import org.apache.druid.indexer.partitions.DynamicPartitionsSpec;
+import org.apache.druid.indexer.partitions.PartitionsSpec;
 import org.apache.druid.indexing.common.task.IndexTask.IndexTuningConfig;
 import org.apache.druid.segment.IndexSpec;
 import org.apache.druid.segment.writeout.SegmentWriteOutMediumFactory;
@@ -72,6 +74,7 @@ public class ParallelIndexTuningConfig extends IndexTuningConfig
         null,
         null,
         null,
+        null,
         null
     );
   }
@@ -79,11 +82,12 @@ public class ParallelIndexTuningConfig extends IndexTuningConfig
   @JsonCreator
   public ParallelIndexTuningConfig(
       @JsonProperty("targetPartitionSize") @Deprecated @Nullable Integer targetPartitionSize,
-      @JsonProperty("maxRowsPerSegment") @Nullable Integer maxRowsPerSegment,
+      @JsonProperty("maxRowsPerSegment") @Deprecated @Nullable Integer maxRowsPerSegment,
       @JsonProperty("maxRowsInMemory") @Nullable Integer maxRowsInMemory,
       @JsonProperty("maxBytesInMemory") @Nullable Long maxBytesInMemory,
-      @JsonProperty("maxTotalRows") @Nullable Long maxTotalRows,
+      @JsonProperty("maxTotalRows") @Deprecated @Nullable Long maxTotalRows,
       @JsonProperty("numShards") @Nullable Integer numShards,
+      @JsonProperty("partitionsSpec") @Nullable PartitionsSpec partitionsSpec,
       @JsonProperty("indexSpec") @Nullable IndexSpec indexSpec,
       @JsonProperty("indexSpecForIntermediatePersists") @Nullable IndexSpec indexSpecForIntermediatePersists,
       @JsonProperty("maxPendingPersists") @Nullable Integer maxPendingPersists,
@@ -110,10 +114,10 @@ public class ParallelIndexTuningConfig extends IndexTuningConfig
         null,
         numShards,
         null,
+        getValidPartitionsSpec(maxRowsPerSegment, maxTotalRows, partitionsSpec),
         indexSpec,
         indexSpecForIntermediatePersists,
         maxPendingPersists,
-        null,
         forceGuaranteedRollup,
         reportParseExceptions,
         null,
@@ -138,6 +142,22 @@ public class ParallelIndexTuningConfig extends IndexTuningConfig
     Preconditions.checkArgument(this.maxNumSubTasks > 0, "maxNumSubTasks must be positive");
   }
 
+  private static PartitionsSpec getValidPartitionsSpec(
+      @Nullable Integer maxRowsPerSegment,
+      @Nullable Long maxTotalRows,
+      @Nullable PartitionsSpec partitionsSpec
+  )
+  {
+    if (partitionsSpec != null) {
+      if (!(partitionsSpec instanceof DynamicPartitionsSpec)) {
+        throw new UnsupportedOperationException("Parallel index task supports only dynamic partitionsSpec yet");
+      }
+      return partitionsSpec;
+    } else {
+      return new DynamicPartitionsSpec(maxRowsPerSegment, maxTotalRows);
+    }
+  }
+
   @JsonProperty
   public int getMaxNumSubTasks()
   {
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java
index 3560192..ec7f8dc 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java
@@ -636,7 +636,11 @@ public abstract class SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
                     if (addResult.isOk()) {
                       // If the number of rows in the segment exceeds the threshold after adding a row,
                       // move the segment out from the active segments of BaseAppenderatorDriver to make a new segment.
-                      if (addResult.isPushRequired(tuningConfig) && !sequenceToUse.isCheckpointed()) {
+                      final boolean isPushRequired = addResult.isPushRequired(
+                          tuningConfig.getPartitionsSpec().getMaxRowsPerSegment(),
+                          tuningConfig.getPartitionsSpec().getMaxTotalRows()
+                      );
+                      if (isPushRequired && !sequenceToUse.isCheckpointed()) {
                         sequenceToCheckpoint = sequenceToUse;
                       }
                       isPersistRequired |= addResult.isPersistRequired();
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskTuningConfig.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskTuningConfig.java
index 1d48ad9..bcd4e31 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskTuningConfig.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskTuningConfig.java
@@ -20,6 +20,7 @@
 package org.apache.druid.indexing.seekablestream;
 
 import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.druid.indexer.partitions.DynamicPartitionsSpec;
 import org.apache.druid.segment.IndexSpec;
 import org.apache.druid.segment.indexing.RealtimeTuningConfig;
 import org.apache.druid.segment.indexing.TuningConfig;
@@ -33,15 +34,12 @@ import java.util.Objects;
 
 public abstract class SeekableStreamIndexTaskTuningConfig implements TuningConfig, AppenderatorConfig
 {
-  private static final int DEFAULT_MAX_ROWS_PER_SEGMENT = 5_000_000;
   private static final boolean DEFAULT_RESET_OFFSET_AUTOMATICALLY = false;
   private static final boolean DEFAULT_SKIP_SEQUENCE_NUMBER_AVAILABILITY_CHECK = false;
 
   private final int maxRowsInMemory;
   private final long maxBytesInMemory;
-  private final int maxRowsPerSegment;
-  @Nullable
-  private final Long maxTotalRows;
+  private final DynamicPartitionsSpec partitionsSpec;
   private final Period intermediatePersistPeriod;
   private final File basePersistDirectory;
   @Deprecated
@@ -87,8 +85,7 @@ public abstract class SeekableStreamIndexTaskTuningConfig implements TuningConfi
     final RealtimeTuningConfig defaults = RealtimeTuningConfig.makeDefaultTuningConfig(basePersistDirectory);
 
     this.maxRowsInMemory = maxRowsInMemory == null ? defaults.getMaxRowsInMemory() : maxRowsInMemory;
-    this.maxRowsPerSegment = maxRowsPerSegment == null ? DEFAULT_MAX_ROWS_PER_SEGMENT : maxRowsPerSegment;
-    this.maxTotalRows = maxTotalRows;
+    this.partitionsSpec = new DynamicPartitionsSpec(maxRowsPerSegment, maxTotalRows);
     // initializing this to 0, it will be lazily initialized to a value
     // @see server.src.main.java.org.apache.druid.segment.indexing.TuningConfigs#getMaxBytesInMemoryOrDefault(long)
     this.maxBytesInMemory = maxBytesInMemory == null ? 0 : maxBytesInMemory;
@@ -151,7 +148,7 @@ public abstract class SeekableStreamIndexTaskTuningConfig implements TuningConfi
   @JsonProperty
   public Integer getMaxRowsPerSegment()
   {
-    return maxRowsPerSegment;
+    return partitionsSpec.getMaxRowsPerSegment();
   }
 
   @JsonProperty
@@ -159,7 +156,12 @@ public abstract class SeekableStreamIndexTaskTuningConfig implements TuningConfi
   @Nullable
   public Long getMaxTotalRows()
   {
-    return maxTotalRows;
+    return partitionsSpec.getMaxTotalRows();
+  }
+
+  public DynamicPartitionsSpec getPartitionsSpec()
+  {
+    return partitionsSpec;
   }
 
   @Override
@@ -279,7 +281,6 @@ public abstract class SeekableStreamIndexTaskTuningConfig implements TuningConfi
     SeekableStreamIndexTaskTuningConfig that = (SeekableStreamIndexTaskTuningConfig) o;
     return maxRowsInMemory == that.maxRowsInMemory &&
            maxBytesInMemory == that.maxBytesInMemory &&
-           maxRowsPerSegment == that.maxRowsPerSegment &&
            maxPendingPersists == that.maxPendingPersists &&
            reportParseExceptions == that.reportParseExceptions &&
            handoffConditionTimeout == that.handoffConditionTimeout &&
@@ -288,7 +289,7 @@ public abstract class SeekableStreamIndexTaskTuningConfig implements TuningConfi
            logParseExceptions == that.logParseExceptions &&
            maxParseExceptions == that.maxParseExceptions &&
            maxSavedParseExceptions == that.maxSavedParseExceptions &&
-           Objects.equals(maxTotalRows, that.maxTotalRows) &&
+           Objects.equals(partitionsSpec, that.partitionsSpec) &&
            Objects.equals(intermediatePersistPeriod, that.intermediatePersistPeriod) &&
            Objects.equals(basePersistDirectory, that.basePersistDirectory) &&
            Objects.equals(indexSpec, that.indexSpec) &&
@@ -303,8 +304,7 @@ public abstract class SeekableStreamIndexTaskTuningConfig implements TuningConfi
     return Objects.hash(
         maxRowsInMemory,
         maxBytesInMemory,
-        maxRowsPerSegment,
-        maxTotalRows,
+        partitionsSpec,
         intermediatePersistPeriod,
         basePersistDirectory,
         maxPendingPersists,
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java
index 80eb893..b76c998 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java
@@ -42,6 +42,7 @@ import org.apache.druid.data.input.impl.TimeAndDimsParseSpec;
 import org.apache.druid.guice.GuiceAnnotationIntrospector;
 import org.apache.druid.guice.GuiceInjectableValues;
 import org.apache.druid.guice.GuiceInjectors;
+import org.apache.druid.indexer.partitions.HashedPartitionsSpec;
 import org.apache.druid.indexing.common.RetryPolicyConfig;
 import org.apache.druid.indexing.common.RetryPolicyFactory;
 import org.apache.druid.indexing.common.SegmentLoaderFactory;
@@ -285,6 +286,7 @@ public class CompactionTaskTest
         null,
         null,
         null,
+        null,
         new IndexSpec(
             new RoaringBitmapSerdeFactory(true),
             CompressionStrategy.LZ4,
@@ -294,7 +296,6 @@ public class CompactionTaskTest
         null,
         5000,
         true,
-        true,
         false,
         null,
         100L,
@@ -456,13 +457,14 @@ public class CompactionTaskTest
   {
     final IndexTuningConfig tuningConfig = new IndexTuningConfig(
         null,
-        6,
+        null,
         500000,
         1000000L,
         null,
         null,
         null,
         null,
+        new HashedPartitionsSpec(6, null, null),
         new IndexSpec(
             new RoaringBitmapSerdeFactory(true),
             CompressionStrategy.LZ4,
@@ -472,7 +474,6 @@ public class CompactionTaskTest
         null,
         5000,
         true,
-        true,
         false,
         null,
         100L,
@@ -520,10 +521,11 @@ public class CompactionTaskTest
         null,
         500000,
         1000000L,
-        6L,
         null,
         null,
         null,
+        null,
+        new HashedPartitionsSpec(null, 6, null),
         new IndexSpec(
             new RoaringBitmapSerdeFactory(true),
             CompressionStrategy.LZ4,
@@ -533,7 +535,6 @@ public class CompactionTaskTest
         null,
         5000,
         true,
-        true,
         false,
         null,
         100L,
@@ -583,8 +584,9 @@ public class CompactionTaskTest
         1000000L,
         null,
         null,
-        3,
         null,
+        null,
+        new HashedPartitionsSpec(null, 3, null),
         new IndexSpec(
             new RoaringBitmapSerdeFactory(true),
             CompressionStrategy.LZ4,
@@ -594,7 +596,6 @@ public class CompactionTaskTest
         null,
         5000,
         true,
-        true,
         false,
         null,
         100L,
@@ -844,13 +845,14 @@ public class CompactionTaskTest
   {
     final IndexTuningConfig tuningConfig = new IndexTuningConfig(
         null,
-        6,
+        null,
         500000,
         1000000L,
         null,
         null,
         null,
         null,
+        new HashedPartitionsSpec(6, null, null),
         new IndexSpec(
             new RoaringBitmapSerdeFactory(true),
             CompressionStrategy.LZ4,
@@ -860,7 +862,6 @@ public class CompactionTaskTest
         null,
         5000,
         true,
-        true,
         false,
         null,
         100L,
@@ -1033,13 +1034,14 @@ public class CompactionTaskTest
         expectedSegmentIntervals,
         new IndexTuningConfig(
             null,
-            41943040, // automatically computed targetPartitionSize
+            null,
             500000,
             1000000L,
             Long.MAX_VALUE,
             null,
             null,
             null,
+            new HashedPartitionsSpec(41943040, null, null), // automatically computed targetPartitionSize
             new IndexSpec(
                 new RoaringBitmapSerdeFactory(true),
                 CompressionStrategy.LZ4,
@@ -1049,7 +1051,6 @@ public class CompactionTaskTest
             null,
             5000,
             true,
-            true,
             false,
             null,
             100L,
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexTaskTest.java
index d3150ae..337da8c 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexTaskTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexTaskTest.java
@@ -37,6 +37,8 @@ import org.apache.druid.data.input.impl.StringInputRowParser;
 import org.apache.druid.data.input.impl.TimestampSpec;
 import org.apache.druid.indexer.TaskState;
 import org.apache.druid.indexer.TaskStatus;
+import org.apache.druid.indexer.partitions.DynamicPartitionsSpec;
+import org.apache.druid.indexer.partitions.HashedPartitionsSpec;
 import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReportData;
 import org.apache.druid.indexing.common.LockGranularity;
 import org.apache.druid.indexing.common.TaskReport;
@@ -987,18 +989,18 @@ public class IndexTaskTest extends IngestionTestBase
 
     final IndexTask.IndexTuningConfig tuningConfig = new IndexTask.IndexTuningConfig(
         null,
-        2,
         null,
         null,
         null,
         null,
         null,
         null,
+        null,
+        new HashedPartitionsSpec(2, null, null),
         indexSpec,
         null,
         null,
         true,
-        true,
         false,
         null,
         null,
@@ -1112,17 +1114,17 @@ public class IndexTaskTest extends IngestionTestBase
     // Allow up to 3 parse exceptions, and save up to 2 parse exceptions
     final IndexTask.IndexTuningConfig tuningConfig = new IndexTask.IndexTuningConfig(
         null,
-        2,
         null,
         null,
         null,
         null,
         null,
         null,
+        null,
+        new DynamicPartitionsSpec(2, null),
         indexSpec,
         null,
         null,
-        true,
         false,
         false,
         null,
@@ -1230,18 +1232,18 @@ public class IndexTaskTest extends IngestionTestBase
     // Allow up to 3 parse exceptions, and save up to 2 parse exceptions
     final IndexTask.IndexTuningConfig tuningConfig = new IndexTask.IndexTuningConfig(
         null,
-        2,
         null,
         null,
         null,
         null,
         null,
         null,
+        null,
+        new HashedPartitionsSpec(2, null, null),
         indexSpec,
         null,
         null,
         true,
-        true,
         false,
         null,
         null,
@@ -1666,10 +1668,10 @@ public class IndexTaskTest extends IngestionTestBase
         null,
         numShards,
         partitionDimensions,
+        null,
         indexSpec,
         null,
         null,
-        true,
         forceGuaranteedRollup,
         reportParseException,
         null,
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TaskSerdeTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TaskSerdeTest.java
index 189b4df..7a52bdc 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TaskSerdeTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TaskSerdeTest.java
@@ -27,6 +27,7 @@ import org.apache.druid.client.indexing.ClientKillQuery;
 import org.apache.druid.guice.FirehoseModule;
 import org.apache.druid.indexer.HadoopIOConfig;
 import org.apache.druid.indexer.HadoopIngestionSpec;
+import org.apache.druid.indexer.partitions.DynamicPartitionsSpec;
 import org.apache.druid.indexing.common.TestUtils;
 import org.apache.druid.indexing.common.stats.RowIngestionMetersFactory;
 import org.apache.druid.indexing.common.task.IndexTask.IndexIOConfig;
@@ -129,7 +130,7 @@ public class TaskSerdeTest
     Assert.assertNull(tuningConfig.getNumShards());
 
     tuningConfig = jsonMapper.readValue(
-        "{\"type\":\"index\", \"numShards\":10}",
+        "{\"type\":\"index\", \"numShards\":10, \"forceGuaranteedRollup\": true}",
         IndexTask.IndexTuningConfig.class
     );
 
@@ -137,7 +138,7 @@ public class TaskSerdeTest
     Assert.assertEquals(10, (int) tuningConfig.getNumShards());
 
     tuningConfig = jsonMapper.readValue(
-        "{\"type\":\"index\", \"targetPartitionSize\":-1, \"numShards\":10}",
+        "{\"type\":\"index\", \"targetPartitionSize\":-1, \"numShards\":10, \"forceGuaranteedRollup\": true}",
         IndexTask.IndexTuningConfig.class
     );
 
@@ -153,7 +154,7 @@ public class TaskSerdeTest
     Assert.assertEquals(10, (int) tuningConfig.getMaxRowsPerSegment());
 
     tuningConfig = jsonMapper.readValue(
-        "{\"type\":\"index\", \"targetPartitionSize\":-1, \"numShards\":-1}",
+        "{\"type\":\"index\", \"targetPartitionSize\":-1, \"numShards\":-1, \"forceGuaranteedRollup\": true}",
         IndexTask.IndexTuningConfig.class
     );
 
@@ -167,7 +168,7 @@ public class TaskSerdeTest
     thrown.expectCause(CoreMatchers.isA(IllegalArgumentException.class));
 
     jsonMapper.readValue(
-        "{\"type\":\"index\", \"targetPartitionSize\":10, \"numShards\":10}",
+        "{\"type\":\"index\", \"targetPartitionSize\":10, \"numShards\":10, \"forceGuaranteedRollup\": true}",
         IndexTask.IndexTuningConfig.class
     );
   }
@@ -194,17 +195,17 @@ public class TaskSerdeTest
             new IndexIOConfig(new LocalFirehoseFactory(new File("lol"), "rofl", null), true),
             new IndexTuningConfig(
                 null,
-                10000,
+                null,
                 10,
                 null,
                 null,
                 9999,
                 null,
                 null,
+                new DynamicPartitionsSpec(10000, null),
                 indexSpec,
                 null,
                 3,
-                true,
                 false,
                 null,
                 null,
@@ -278,17 +279,17 @@ public class TaskSerdeTest
             new IndexIOConfig(new LocalFirehoseFactory(new File("lol"), "rofl", null), true),
             new IndexTuningConfig(
                 null,
-                10000,
+                null,
                 10,
                 null,
                 null,
                 null,
                 null,
                 null,
+                new DynamicPartitionsSpec(10000, null),
                 indexSpec,
                 null,
                 3,
-                true,
                 false,
                 null,
                 null,
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskKillTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskKillTest.java
index 2d0678a..7d90a06 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskKillTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskKillTest.java
@@ -190,6 +190,7 @@ public class ParallelIndexSupervisorTaskKillTest extends AbstractParallelIndexSu
             null,
             null,
             null,
+            null,
             numTotalSubTasks,
             null,
             null,
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskResourceTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskResourceTest.java
index d62512e..f2b151e 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskResourceTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskResourceTest.java
@@ -430,6 +430,7 @@ public class ParallelIndexSupervisorTaskResourceTest extends AbstractParallelInd
             null,
             null,
             null,
+            null,
             NUM_SUB_TASKS,
             null,
             null,
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskSerdeTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskSerdeTest.java
index ccd3bd0..2f1c24f 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskSerdeTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskSerdeTest.java
@@ -140,6 +140,7 @@ public class ParallelIndexSupervisorTaskSerdeTest
             null,
             null,
             null,
+            null,
             2,
             null,
             null,
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskTest.java
index 14ddf4b..7ff4bac 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskTest.java
@@ -299,6 +299,7 @@ public class ParallelIndexSupervisorTaskTest extends AbstractParallelIndexSuperv
             null,
             null,
             null,
+            null,
             1,
             null,
             null,
@@ -367,6 +368,7 @@ public class ParallelIndexSupervisorTaskTest extends AbstractParallelIndexSuperv
             null,
             null,
             null,
+            null,
             2,
             null,
             null,
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexTuningConfigTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexTuningConfigTest.java
index 2d39302..d96cf54 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexTuningConfigTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexTuningConfigTest.java
@@ -21,6 +21,7 @@ package org.apache.druid.indexing.common.task.batch.parallel;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.jsontype.NamedType;
+import org.apache.druid.indexer.partitions.DynamicPartitionsSpec;
 import org.apache.druid.jackson.DefaultObjectMapper;
 import org.apache.druid.segment.IndexSpec;
 import org.apache.druid.segment.data.CompressionFactory.LongEncodingStrategy;
@@ -60,11 +61,12 @@ public class ParallelIndexTuningConfigTest
   {
     final ParallelIndexTuningConfig tuningConfig = new ParallelIndexTuningConfig(
         null,
-        100,
+        null,
         10,
         1000L,
-        100L,
         null,
+        null,
+        new DynamicPartitionsSpec(100, 100L),
         new IndexSpec(
             new RoaringBitmapSerdeFactory(true),
             CompressionStrategy.UNCOMPRESSED,
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java
index cb320e8..8db01cb 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java
@@ -697,10 +697,10 @@ public class TaskLifecycleTest
                 null,
                 null,
                 null,
+                null,
                 indexSpec,
                 null,
                 3,
-                true,
                 false,
                 null,
                 null,
@@ -780,10 +780,10 @@ public class TaskLifecycleTest
                 null,
                 null,
                 null,
+                null,
                 indexSpec,
                 null,
                 3,
-                true,
                 false,
                 null,
                 null,
@@ -1176,10 +1176,10 @@ public class TaskLifecycleTest
                 null,
                 null,
                 null,
+                null,
                 indexSpec,
                 null,
                 null,
-                false,
                 null,
                 null,
                 null,
diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorDriverAddResult.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorDriverAddResult.java
index 219e553..62ee322 100644
--- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorDriverAddResult.java
+++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorDriverAddResult.java
@@ -100,11 +100,6 @@ public class AppenderatorDriverAddResult
     return isPersistRequired;
   }
 
-  public boolean isPushRequired(AppenderatorConfig tuningConfig)
-  {
-    return isPushRequired(tuningConfig.getMaxRowsPerSegment(), tuningConfig.getMaxTotalRows());
-  }
-
   public boolean isPushRequired(@Nullable Integer maxRowsPerSegment, @Nullable Long maxTotalRows)
   {
     boolean overThreshold = false;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org