You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by ab...@apache.org on 2021/11/06 07:20:44 UTC

[druid] branch master updated: Add support for multi dimension range partitioning (#11848)

This is an automated email from the ASF dual-hosted git repository.

abhishek pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 2d77e1a  Add support for multi dimension range partitioning (#11848)
2d77e1a is described below

commit 2d77e1a3c6a8b5cec32649d0bd9fc64bff820610
Author: Kashif Faraz <ka...@gmail.com>
AuthorDate: Sat Nov 6 12:50:17 2021 +0530

    Add support for multi dimension range partitioning (#11848)
    
    This PR adds support for range partitioning on multiple dimensions. It extends on the
    concept and implementation of single dimension range partitioning.
    
    The new partition type added is range which corresponds to a set of Dimension Range Partition classes. single_dim is now treated as a range type partition with a single partition dimension.
    
    The start and end values of a DimensionRangeShardSpec are represented
    by StringTuples, where each String in the tuple is the value of a partition dimension.
---
 .../org/apache/druid/data/input/StringTuple.java   | 135 +++++++++
 .../partitions/DimensionBasedPartitionsSpec.java   |   9 +
 .../partitions/DimensionRangePartitionsSpec.java   | 203 +++++++++++++
 .../indexer/partitions/HashedPartitionsSpec.java   |  15 +-
 .../druid/indexer/partitions/PartitionsSpec.java   |   1 +
 .../partitions/SingleDimensionPartitionsSpec.java  | 174 ++++-------
 .../partition/BuildingDimensionRangeShardSpec.java | 161 ++++++++++
 ...pec.java => DimensionRangeBucketShardSpec.java} |  68 +++--
 ...ShardSpec.java => DimensionRangeShardSpec.java} | 133 +++++----
 .../timeline/partition/PartitionBoundaries.java    |  11 +-
 .../apache/druid/timeline/partition/ShardSpec.java |   5 +-
 ...va => SingleDimensionRangeBucketShardSpec.java} |  10 +-
 .../partition/SingleDimensionShardSpec.java        |  82 +++---
 .../timeline/partition/StringPartitionChunk.java   |  34 ++-
 .../apache/druid/data/input/StringTupleTest.java   | 148 ++++++++++
 .../DimensionRangePartitionsSpecTest.java          | 208 +++++++++++++
 .../SingleDimensionPartitionsSpecTest.java         |  58 +++-
 .../BuildingDimensionRangeShardSpecTest.java       | 121 ++++++++
 .../DimensionRangeBucketShardSpecTest.java         | 190 ++++++++++++
 .../partition/DimensionRangeShardSpecTest.java     | 327 +++++++++++++++++++++
 .../partition/PartitionBoundariesTest.java         |  21 +-
 .../partition/PartitionHolderCompletenessTest.java |  62 ++++
 .../timeline/partition/ShardSpecTestUtils.java     |   1 +
 ...> SingleDimensionRangeBucketShardSpecTest.java} |  20 +-
 .../partition/SingleDimensionShardSpecTest.java    |  20 ++
 .../partition/StringPartitionChunkTest.java        | 166 +++++++++--
 .../parallel/ParallelIndexSupervisorTask.java      |   8 +-
 .../batch/parallel/ParallelIndexTuningConfig.java  |  10 +-
 .../parallel/PartialDimensionDistributionTask.java |  64 ++--
 .../parallel/PartialRangeSegmentGenerateTask.java  |  22 +-
 .../distribution/ArrayOfStringTuplesSerDe.java     |  99 +++++++
 .../parallel/distribution/StringDistribution.java  |   7 +-
 .../batch/parallel/distribution/StringSketch.java  |  44 +--
 .../parallel/distribution/StringSketchMerger.java  |   5 +-
 ...ePartitionIndexTaskInputRowIteratorBuilder.java |  73 +++--
 .../batch/partition/RangePartitionAnalysis.java    |  24 +-
 .../common/task/CompactionTaskParallelRunTest.java |  88 ++++++
 ...ePartitionCachingLocalSegmentAllocatorTest.java |  46 +--
 .../ParallelIndexSupervisorTaskSerdeTest.java      |   2 +-
 .../PartialDimensionCardinalityTaskTest.java       |   2 +-
 .../PartialDimensionDistributionTaskTest.java      |  13 +-
 .../PartialRangeSegmentGenerateTaskTest.java       |   9 +-
 ...ngePartitionAdjustingCorePartitionSizeTest.java |  18 +-
 ...ngePartitionMultiPhaseParallelIndexingTest.java |  30 +-
 .../distribution/StringSketchMergerTest.java       |   7 +-
 .../parallel/distribution/StringSketchTest.java    |  21 +-
 ...xTaskInputRowIteratorBuilderTestingFactory.java |   1 +
 ...titionIndexTaskInputRowIteratorBuilderTest.java |   4 +-
 .../server/coordinator/duty/CompactSegments.java   |   4 +-
 .../IndexerSQLMetadataStorageCoordinatorTest.java  |  45 +++
 .../appenderator/SegmentPublisherHelperTest.java   |  38 +++
 51 files changed, 2568 insertions(+), 499 deletions(-)

diff --git a/core/src/main/java/org/apache/druid/data/input/StringTuple.java b/core/src/main/java/org/apache/druid/data/input/StringTuple.java
new file mode 100644
index 0000000..1362ed7
--- /dev/null
+++ b/core/src/main/java/org/apache/druid/data/input/StringTuple.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.data.input;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonValue;
+import com.google.common.base.Preconditions;
+import org.apache.druid.java.util.common.IAE;
+
+import java.util.Arrays;
+
+/**
+ * Represents a tuple of String values, typically used to represent
+ * (single-valued) dimension values for an InputRow.
+ */
+public class StringTuple implements Comparable<StringTuple>
+{
+  private final String[] values;
+
+  public static StringTuple create(String... values)
+  {
+    return new StringTuple(values);
+  }
+
+  @JsonCreator
+  public StringTuple(String[] values)
+  {
+    Preconditions.checkNotNull(values, "Array of values should not be null");
+    this.values = values;
+  }
+
+  public String get(int index)
+  {
+    return values[index];
+  }
+
+  public int size()
+  {
+    return values.length;
+  }
+
+  @JsonValue
+  public String[] toArray()
+  {
+    return values;
+  }
+
+  @Override
+  public int compareTo(StringTuple that)
+  {
+    // null is less than non-null
+    if (this == that) {
+      return 0;
+    } else if (that == null) {
+      return 1;
+    }
+
+    // Compare tuples of the same size only
+    if (size() != that.size()) {
+      throw new IAE("Cannot compare StringTuples of different sizes");
+    }
+
+    // Both tuples are empty
+    if (size() == 0) {
+      return 0;
+    }
+
+    // Compare the elements at each index until a differing element is found
+    for (int i = 0; i < size(); ++i) {
+      int comparison = nullSafeCompare(get(i), that.get(i));
+      if (comparison != 0) {
+        return comparison;
+      }
+    }
+
+    return 0;
+  }
+
+  private int nullSafeCompare(String a, String b)
+  {
+    // Treat null as smaller than non-null
+    if (a == null && b == null) {
+      return 0;
+    } else if (a == null) {
+      return -1;
+    } else if (b == null) {
+      return 1;
+    } else {
+      return a.compareTo(b);
+    }
+  }
+
+  @Override
+  public boolean equals(Object o)
+  {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    StringTuple that = (StringTuple) o;
+    return Arrays.equals(values, that.values);
+  }
+
+  @Override
+  public int hashCode()
+  {
+    return Arrays.hashCode(values);
+  }
+
+  @Override
+  public String toString()
+  {
+    return Arrays.toString(values);
+  }
+
+}
diff --git a/core/src/main/java/org/apache/druid/indexer/partitions/DimensionBasedPartitionsSpec.java b/core/src/main/java/org/apache/druid/indexer/partitions/DimensionBasedPartitionsSpec.java
index 9ab4821..dd1a2db 100644
--- a/core/src/main/java/org/apache/druid/indexer/partitions/DimensionBasedPartitionsSpec.java
+++ b/core/src/main/java/org/apache/druid/indexer/partitions/DimensionBasedPartitionsSpec.java
@@ -27,7 +27,16 @@ import java.util.List;
  */
 public interface DimensionBasedPartitionsSpec extends PartitionsSpec
 {
+  String PARTITION_DIMENSIONS = "partitionDimensions";
+
   String TARGET_ROWS_PER_SEGMENT = "targetRowsPerSegment";
+  String MAX_PARTITION_SIZE = "maxPartitionSize";
+  String ASSUME_GROUPED = "assumeGrouped";
+
+  /**
+   * Message denoting that this spec is forceGuaranteedRollup compatible.
+   */
+  String FORCE_GUARANTEED_ROLLUP_COMPATIBLE = "";
 
   // Deprecated properties preserved for backward compatibility:
   @Deprecated
diff --git a/core/src/main/java/org/apache/druid/indexer/partitions/DimensionRangePartitionsSpec.java b/core/src/main/java/org/apache/druid/indexer/partitions/DimensionRangePartitionsSpec.java
new file mode 100644
index 0000000..744f5c9
--- /dev/null
+++ b/core/src/main/java/org/apache/druid/indexer/partitions/DimensionRangePartitionsSpec.java
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexer.partitions;
+
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.base.Preconditions;
+import org.apache.druid.indexer.Property;
+
+import javax.annotation.Nullable;
+import javax.validation.constraints.NotNull;
+import java.util.List;
+import java.util.Objects;
+
+/**
+ * Spec to create partitions based on value ranges of multiple dimensions.
+ * <p>
+ * A MultiDimensionPartitionSpec has the following fields:
+ * <ul>
+ *   <li>either targetRowsPerSegment or maxRowsPerSegment</li>
+ *   <li>partitionDimensions: List of dimension names to be used for partitioning</li>
+ *   <li>assumeGrouped: true if input data has already been grouped on time and dimensions</li>
+ * </ul>
+ */
+public class DimensionRangePartitionsSpec implements DimensionBasedPartitionsSpec
+{
+  public static final String NAME = "range";
+
+  private final Integer targetRowsPerSegment;
+  private final Integer maxRowsPerSegment;
+  private final List<String> partitionDimensions;
+  private final boolean assumeGrouped;
+
+  // Value of this field is derived from targetRows and maxRows
+  private final int resolvedMaxRowPerSegment;
+
+  @JsonCreator
+  public DimensionRangePartitionsSpec(
+      @JsonProperty(TARGET_ROWS_PER_SEGMENT) @Nullable Integer targetRowsPerSegment,
+      @JsonProperty(MAX_ROWS_PER_SEGMENT) @Nullable Integer maxRowsPerSegment,
+      @JsonProperty(PARTITION_DIMENSIONS) List<String> partitionDimensions,
+      @JsonProperty(ASSUME_GROUPED) boolean assumeGrouped  // false by default
+  )
+  {
+    Preconditions.checkArgument(partitionDimensions != null, "partitionDimensions must be specified");
+    this.partitionDimensions = partitionDimensions;
+    this.assumeGrouped = assumeGrouped;
+
+    final Property<Integer> target = new Property<>(
+        TARGET_ROWS_PER_SEGMENT,
+        PartitionsSpec.resolveHistoricalNullIfNeeded(targetRowsPerSegment)
+    );
+    final Property<Integer> max = new Property<>(
+        MAX_ROWS_PER_SEGMENT,
+        PartitionsSpec.resolveHistoricalNullIfNeeded(maxRowsPerSegment)
+    );
+
+    Preconditions.checkArgument(
+        (target.getValue() == null) != (max.getValue() == null),
+        "Exactly one of " + target.getName() + " or " + max.getName() + " must be present"
+    );
+
+    this.resolvedMaxRowPerSegment = resolveMaxRowsPerSegment(target, max);
+    this.targetRowsPerSegment = target.getValue();
+    this.maxRowsPerSegment = max.getValue();
+  }
+
+  private static int resolveMaxRowsPerSegment(Property<Integer> targetRows, Property<Integer> maxRows)
+  {
+    if (targetRows.getValue() != null) {
+      Preconditions.checkArgument(targetRows.getValue() > 0, targetRows.getName() + " must be greater than 0");
+      try {
+        return Math.addExact(targetRows.getValue(), (targetRows.getValue() / 2));
+      }
+      catch (ArithmeticException e) {
+        throw new IllegalArgumentException(targetRows.getName() + " is too large");
+      }
+    } else {
+      Preconditions.checkArgument(maxRows.getValue() > 0, maxRows.getName() + " must be greater than 0");
+      return maxRows.getValue();
+    }
+  }
+
+  @JsonProperty
+  @Override
+  @Nullable
+  public Integer getTargetRowsPerSegment()
+  {
+    return targetRowsPerSegment;
+  }
+
+  @Override
+  public SecondaryPartitionType getType()
+  {
+    return SecondaryPartitionType.RANGE;
+  }
+
+  /**
+   * @return Resolved value of max rows per segment.
+   */
+  @JsonIgnore
+  @Override
+  @NotNull
+  public Integer getMaxRowsPerSegment()
+  {
+    return resolvedMaxRowPerSegment;  // NOTE: This returns the *resolved* value
+  }
+
+  @JsonProperty(PartitionsSpec.MAX_ROWS_PER_SEGMENT)
+  protected Integer getMaxRowsPerSegmentForJson()
+  {
+    return maxRowsPerSegment;
+  }
+
+  @JsonProperty
+  public boolean isAssumeGrouped()
+  {
+    return assumeGrouped;
+  }
+
+  @JsonProperty
+  @Override
+  public List<String> getPartitionDimensions()
+  {
+    return partitionDimensions;
+  }
+
+  @Override
+  public String getForceGuaranteedRollupIncompatiblityReason()
+  {
+    if (getPartitionDimensions() == null || getPartitionDimensions().isEmpty()) {
+      return PARTITION_DIMENSIONS + " must be specified";
+    }
+
+    return FORCE_GUARANTEED_ROLLUP_COMPATIBLE;
+  }
+
+  @Override
+  public boolean needsDeterminePartitions(boolean useForHadoopTask)
+  {
+    return true;
+  }
+
+  @Override
+  public boolean equals(Object o)
+  {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    DimensionRangePartitionsSpec that = (DimensionRangePartitionsSpec) o;
+    return assumeGrouped == that.assumeGrouped &&
+           resolvedMaxRowPerSegment == that.resolvedMaxRowPerSegment &&
+           Objects.equals(targetRowsPerSegment, that.targetRowsPerSegment) &&
+           Objects.equals(maxRowsPerSegment, that.maxRowsPerSegment) &&
+           Objects.equals(partitionDimensions, that.partitionDimensions);
+  }
+
+  @Override
+  public int hashCode()
+  {
+    return Objects.hash(
+        targetRowsPerSegment,
+        maxRowsPerSegment,
+        partitionDimensions,
+        assumeGrouped,
+        resolvedMaxRowPerSegment
+    );
+  }
+
+  @Override
+  public String toString()
+  {
+    return "DimensionRangePartitionsSpec{" +
+           "targetRowsPerSegment=" + targetRowsPerSegment +
+           ", maxRowsPerSegment=" + maxRowsPerSegment +
+           ", partitionDimension='" + partitionDimensions + '\'' +
+           ", assumeGrouped=" + assumeGrouped +
+           ", resolvedMaxRowPerSegment=" + resolvedMaxRowPerSegment +
+           '}';
+  }
+}
diff --git a/core/src/main/java/org/apache/druid/indexer/partitions/HashedPartitionsSpec.java b/core/src/main/java/org/apache/druid/indexer/partitions/HashedPartitionsSpec.java
index 59d6f36..c9561ac 100644
--- a/core/src/main/java/org/apache/druid/indexer/partitions/HashedPartitionsSpec.java
+++ b/core/src/main/java/org/apache/druid/indexer/partitions/HashedPartitionsSpec.java
@@ -38,7 +38,6 @@ public class HashedPartitionsSpec implements DimensionBasedPartitionsSpec
   @VisibleForTesting
   static final String NUM_SHARDS = "numShards";
 
-  private static final String FORCE_GUARANTEED_ROLLUP_COMPATIBLE = "";
   private static final HashPartitionFunction DEFAULT_HASH_FUNCTION = HashPartitionFunction.MURMUR3_32_ABS;
 
   @Nullable
@@ -55,15 +54,15 @@ public class HashedPartitionsSpec implements DimensionBasedPartitionsSpec
 
   @JsonCreator
   public HashedPartitionsSpec(
-      @JsonProperty(DimensionBasedPartitionsSpec.TARGET_ROWS_PER_SEGMENT) @Nullable Integer targetRowsPerSegment,
+      @JsonProperty(TARGET_ROWS_PER_SEGMENT) @Nullable Integer targetRowsPerSegment,
       @JsonProperty(NUM_SHARDS) @Nullable Integer numShards,
-      @JsonProperty("partitionDimensions") @Nullable List<String> partitionDimensions,
+      @JsonProperty(PARTITION_DIMENSIONS) @Nullable List<String> partitionDimensions,
       @JsonProperty("partitionFunction") @Nullable HashPartitionFunction partitionFunction,
 
       // Deprecated properties preserved for backward compatibility:
-      @Deprecated @JsonProperty(DimensionBasedPartitionsSpec.TARGET_PARTITION_SIZE) @Nullable
+      @Deprecated @JsonProperty(TARGET_PARTITION_SIZE) @Nullable
           Integer targetPartitionSize,  // prefer targetRowsPerSegment
-      @Deprecated @JsonProperty(PartitionsSpec.MAX_ROWS_PER_SEGMENT) @Nullable
+      @Deprecated @JsonProperty(MAX_ROWS_PER_SEGMENT) @Nullable
           Integer maxRowsPerSegment  // prefer targetRowsPerSegment
   )
   {
@@ -74,14 +73,14 @@ public class HashedPartitionsSpec implements DimensionBasedPartitionsSpec
 
     // targetRowsPerSegment, targetPartitionSize, and maxRowsPerSegment are aliases
     Property<Integer> target = Checks.checkAtMostOneNotNull(
-        DimensionBasedPartitionsSpec.TARGET_ROWS_PER_SEGMENT,
+        TARGET_ROWS_PER_SEGMENT,
         adjustedTargetRowsPerSegment,
-        DimensionBasedPartitionsSpec.TARGET_PARTITION_SIZE,
+        TARGET_PARTITION_SIZE,
         adjustedTargetPartitionSize
     );
     target = Checks.checkAtMostOneNotNull(
         target,
-        new Property<>(PartitionsSpec.MAX_ROWS_PER_SEGMENT, adjustedMaxRowsPerSegment)
+        new Property<>(MAX_ROWS_PER_SEGMENT, adjustedMaxRowsPerSegment)
     );
 
     // targetRowsPerSegment/targetPartitionSize/maxRowsPerSegment and numShards are incompatible
diff --git a/core/src/main/java/org/apache/druid/indexer/partitions/PartitionsSpec.java b/core/src/main/java/org/apache/druid/indexer/partitions/PartitionsSpec.java
index cb3b43e..f19095e 100644
--- a/core/src/main/java/org/apache/druid/indexer/partitions/PartitionsSpec.java
+++ b/core/src/main/java/org/apache/druid/indexer/partitions/PartitionsSpec.java
@@ -32,6 +32,7 @@ import javax.annotation.Nullable;
 @JsonSubTypes(value = {
     @JsonSubTypes.Type(name = SingleDimensionPartitionsSpec.NAME, value = SingleDimensionPartitionsSpec.class),
     @JsonSubTypes.Type(name = SingleDimensionPartitionsSpec.OLD_NAME, value = SingleDimensionPartitionsSpec.class),  // for backward compatibility
+    @JsonSubTypes.Type(name = DimensionRangePartitionsSpec.NAME, value = DimensionRangePartitionsSpec.class),
     @JsonSubTypes.Type(name = HashedPartitionsSpec.NAME, value = HashedPartitionsSpec.class),
     @JsonSubTypes.Type(name = DynamicPartitionsSpec.NAME, value = DynamicPartitionsSpec.class)
 })
diff --git a/core/src/main/java/org/apache/druid/indexer/partitions/SingleDimensionPartitionsSpec.java b/core/src/main/java/org/apache/druid/indexer/partitions/SingleDimensionPartitionsSpec.java
index c4b0085..75cdaa0 100644
--- a/core/src/main/java/org/apache/druid/indexer/partitions/SingleDimensionPartitionsSpec.java
+++ b/core/src/main/java/org/apache/druid/indexer/partitions/SingleDimensionPartitionsSpec.java
@@ -21,83 +21,80 @@ package org.apache.druid.indexer.partitions;
 
 
 import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonIgnore;
 import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonValue;
 import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
 import org.apache.druid.indexer.Checks;
 import org.apache.druid.indexer.Property;
 
 import javax.annotation.Nullable;
-import javax.validation.constraints.NotNull;
 import java.util.Collections;
-import java.util.List;
-import java.util.Objects;
+import java.util.HashMap;
+import java.util.Map;
 
 /**
  * Partition a segment by a single dimension.
  */
-public class SingleDimensionPartitionsSpec implements DimensionBasedPartitionsSpec
+public class SingleDimensionPartitionsSpec extends DimensionRangePartitionsSpec
 {
   public static final String NAME = "single_dim";
   static final String OLD_NAME = "dimension";  // for backward compatibility
 
-  private static final String PARITION_DIMENSION = "partitionDimension";
-  private static final String MAX_PARTITION_SIZE = "maxPartitionSize";
-  private static final String FORCE_GUARANTEED_ROLLUP_COMPATIBLE = "";
+  private static final String PARTITION_DIMENSION = "partitionDimension";
 
-  private final Integer targetRowsPerSegment;
-  private final Integer maxRowsPerSegment;
   private final String partitionDimension;
-  private final boolean assumeGrouped;
-
-  // Values for these fields are derived from the one above:
-  private final int resolvedMaxRowPerSegment;
 
   @JsonCreator
   public SingleDimensionPartitionsSpec(
-      @JsonProperty(DimensionBasedPartitionsSpec.TARGET_ROWS_PER_SEGMENT) @Nullable Integer targetRowsPerSegment,
-      @JsonProperty(PartitionsSpec.MAX_ROWS_PER_SEGMENT) @Nullable Integer maxRowsPerSegment,
-      @JsonProperty(PARITION_DIMENSION) @Nullable String partitionDimension,
-      @JsonProperty("assumeGrouped") boolean assumeGrouped,  // false by default
+      @JsonProperty(TARGET_ROWS_PER_SEGMENT) @Nullable Integer targetRowsPerSegment,
+      @JsonProperty(MAX_ROWS_PER_SEGMENT) @Nullable Integer maxRowsPerSegment,
+      @JsonProperty(PARTITION_DIMENSION) @Nullable String partitionDimension,
+      @JsonProperty(ASSUME_GROUPED) boolean assumeGrouped,  // false by default
 
       // Deprecated properties preserved for backward compatibility:
-      @Deprecated @JsonProperty(DimensionBasedPartitionsSpec.TARGET_PARTITION_SIZE) @Nullable
+      @Deprecated @JsonProperty(TARGET_PARTITION_SIZE) @Nullable
           Integer targetPartitionSize,  // prefer targetRowsPerSegment
       @Deprecated @JsonProperty(MAX_PARTITION_SIZE) @Nullable
           Integer maxPartitionSize  // prefer maxRowsPerSegment
   )
   {
-    Integer adjustedTargetRowsPerSegment = PartitionsSpec.resolveHistoricalNullIfNeeded(targetRowsPerSegment);
-    Integer adjustedMaxRowsPerSegment = PartitionsSpec.resolveHistoricalNullIfNeeded(maxRowsPerSegment);
+    super(
+        computeTargetRows(targetRowsPerSegment, targetPartitionSize),
+        computeMaxRows(maxRowsPerSegment, maxPartitionSize),
+        partitionDimension == null ? Collections.emptyList() : Collections.singletonList(partitionDimension),
+        assumeGrouped
+    );
+    this.partitionDimension = partitionDimension;
+  }
+
+  private static Integer computeTargetRows(Integer targetRows, Integer targetPartitionSize)
+  {
+    Integer adjustedTargetRowsPerSegment = PartitionsSpec.resolveHistoricalNullIfNeeded(targetRows);
     Integer adjustedTargetPartitionSize = PartitionsSpec.resolveHistoricalNullIfNeeded(targetPartitionSize);
-    Integer adjustedMaxPartitionSize = PartitionsSpec.resolveHistoricalNullIfNeeded(maxPartitionSize);
 
     Property<Integer> target = Checks.checkAtMostOneNotNull(
-        DimensionBasedPartitionsSpec.TARGET_ROWS_PER_SEGMENT,
+        TARGET_ROWS_PER_SEGMENT,
         adjustedTargetRowsPerSegment,
-        DimensionBasedPartitionsSpec.TARGET_PARTITION_SIZE,
+        TARGET_PARTITION_SIZE,
         adjustedTargetPartitionSize
     );
 
+    return target.getValue();
+  }
+
+  private static Integer computeMaxRows(Integer maxRows, Integer maxPartitionSize)
+  {
+    Integer adjustedMaxRowsPerSegment = PartitionsSpec.resolveHistoricalNullIfNeeded(maxRows);
+    Integer adjustedMaxPartitionSize = PartitionsSpec.resolveHistoricalNullIfNeeded(maxPartitionSize);
+
     Property<Integer> max = Checks.checkAtMostOneNotNull(
-        PartitionsSpec.MAX_ROWS_PER_SEGMENT,
+        MAX_ROWS_PER_SEGMENT,
         adjustedMaxRowsPerSegment,
         MAX_PARTITION_SIZE,
         adjustedMaxPartitionSize
     );
 
-    Preconditions.checkArgument(
-        (target.getValue() == null) != (max.getValue() == null),
-        "Exactly one of " + target.getName() + " or " + max.getName() + " must be present"
-    );
-
-    this.partitionDimension = partitionDimension;
-    this.assumeGrouped = assumeGrouped;
-    this.targetRowsPerSegment = target.getValue();
-    this.maxRowsPerSegment = max.getValue();
-
-    this.resolvedMaxRowPerSegment = resolveMaxRowsPerSegment(target, max);
+    return max.getValue();
   }
 
   @VisibleForTesting
@@ -111,53 +108,6 @@ public class SingleDimensionPartitionsSpec implements DimensionBasedPartitionsSp
     this(targetRowsPerSegment, maxRowsPerSegment, partitionDimension, assumeGrouped, null, null);
   }
 
-  private static int resolveMaxRowsPerSegment(Property<Integer> target, Property<Integer> max)
-  {
-    final int resolvedValue;
-
-    if (target.getValue() != null) {
-      Preconditions.checkArgument(target.getValue() > 0, target.getName() + " must be greater than 0");
-      try {
-        resolvedValue = Math.addExact(target.getValue(), (target.getValue() / 2));
-      }
-      catch (ArithmeticException e) {
-        throw new IllegalArgumentException(target.getName() + " is too large");
-      }
-    } else {
-      Preconditions.checkArgument(max.getValue() > 0, max.getName() + " must be greater than 0");
-      resolvedValue = max.getValue();
-    }
-    return resolvedValue;
-  }
-
-  @JsonProperty
-  @Override
-  @Nullable
-  public Integer getTargetRowsPerSegment()
-  {
-    return targetRowsPerSegment;
-  }
-
-  @Override
-  public SecondaryPartitionType getType()
-  {
-    return SecondaryPartitionType.RANGE;
-  }
-
-  @JsonIgnore
-  @Override
-  @NotNull
-  public Integer getMaxRowsPerSegment()
-  {
-    return resolvedMaxRowPerSegment;  // NOTE: This returns the *resolved* value
-  }
-
-  @JsonProperty(PartitionsSpec.MAX_ROWS_PER_SEGMENT)
-  private Integer getMaxRowsPerSegmentForJson()
-  {
-    return maxRowsPerSegment;
-  }
-
   @JsonProperty
   @Nullable
   public String getPartitionDimension()
@@ -165,36 +115,38 @@ public class SingleDimensionPartitionsSpec implements DimensionBasedPartitionsSp
     return partitionDimension;
   }
 
-  @JsonProperty
-  public boolean isAssumeGrouped()
+  /**
+   * Returns a Map to be used for serializing objects of this class. This is to
+   * ensure that a new field added in {@link DimensionRangePartitionsSpec} does
+   * not get serialized when serializing a {@code SingleDimensionPartitionsSpec}.
+   *
+   * @return A map containing only the keys {@code "partitionDimension"},
+   * {@code "targetRowsPerSegment"}, {@code "maxRowsPerSegments"} and
+   * {@code "assumeGrouped"}.
+   */
+  @JsonValue
+  public Map<String, Object> getSerializableObject()
   {
-    return assumeGrouped;
-  }
+    Map<String, Object> jsonMap = new HashMap<>();
+    jsonMap.put(TARGET_ROWS_PER_SEGMENT, getTargetRowsPerSegment());
+    jsonMap.put(MAX_ROWS_PER_SEGMENT, getMaxRowsPerSegmentForJson());
+    jsonMap.put(PARTITION_DIMENSION, getPartitionDimension());
+    jsonMap.put(ASSUME_GROUPED, isAssumeGrouped());
 
-  @JsonIgnore
-  @Override
-  public List<String> getPartitionDimensions()
-  {
-    return partitionDimension == null ? Collections.emptyList() : Collections.singletonList(partitionDimension);
+    return jsonMap;
   }
 
   @Override
   public String getForceGuaranteedRollupIncompatiblityReason()
   {
     if (getPartitionDimension() == null) {
-      return PARITION_DIMENSION + " must be specified";
+      return PARTITION_DIMENSION + " must be specified";
     }
 
     return FORCE_GUARANTEED_ROLLUP_COMPATIBLE;
   }
 
   @Override
-  public boolean needsDeterminePartitions(boolean useForHadoopTask)
-  {
-    return true;
-  }
-
-  @Override
   public boolean equals(Object o)
   {
     if (this == o) {
@@ -204,34 +156,24 @@ public class SingleDimensionPartitionsSpec implements DimensionBasedPartitionsSp
       return false;
     }
     SingleDimensionPartitionsSpec that = (SingleDimensionPartitionsSpec) o;
-    return assumeGrouped == that.assumeGrouped &&
-           resolvedMaxRowPerSegment == that.resolvedMaxRowPerSegment &&
-           Objects.equals(targetRowsPerSegment, that.targetRowsPerSegment) &&
-           Objects.equals(maxRowsPerSegment, that.maxRowsPerSegment) &&
-           Objects.equals(partitionDimension, that.partitionDimension);
+    return super.equals(that);
   }
 
   @Override
   public int hashCode()
   {
-    return Objects.hash(
-        targetRowsPerSegment,
-        maxRowsPerSegment,
-        partitionDimension,
-        assumeGrouped,
-        resolvedMaxRowPerSegment
-    );
+    return super.hashCode();
   }
 
   @Override
   public String toString()
   {
     return "SingleDimensionPartitionsSpec{" +
-           "targetRowsPerSegment=" + targetRowsPerSegment +
-           ", maxRowsPerSegment=" + maxRowsPerSegment +
+           "targetRowsPerSegment=" + getTargetRowsPerSegment() +
+           ", maxRowsPerSegment=" + getMaxRowsPerSegmentForJson() +
            ", partitionDimension='" + partitionDimension + '\'' +
-           ", assumeGrouped=" + assumeGrouped +
-           ", resolvedMaxRowPerSegment=" + resolvedMaxRowPerSegment +
+           ", assumeGrouped=" + isAssumeGrouped() +
+           ", resolvedMaxRowPerSegment=" + getMaxRowsPerSegment() +
            '}';
   }
 }
diff --git a/core/src/main/java/org/apache/druid/timeline/partition/BuildingDimensionRangeShardSpec.java b/core/src/main/java/org/apache/druid/timeline/partition/BuildingDimensionRangeShardSpec.java
new file mode 100644
index 0000000..88f5e85
--- /dev/null
+++ b/core/src/main/java/org/apache/druid/timeline/partition/BuildingDimensionRangeShardSpec.java
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.timeline.partition;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.druid.data.input.StringTuple;
+
+import javax.annotation.Nullable;
+import java.util.List;
+import java.util.Objects;
+
+/**
+ * See {@link BuildingShardSpec} for how this class is used.
+ *
+ * @see DimensionRangeShardSpec
+ */
+public class BuildingDimensionRangeShardSpec implements BuildingShardSpec<DimensionRangeShardSpec>
+{
+  public static final String TYPE = "building_range";
+
+  private final int bucketId;
+  private final List<String> dimensions;
+  @Nullable
+  private final StringTuple start;
+  @Nullable
+  private final StringTuple end;
+  private final int partitionId;
+
+  @JsonCreator
+  public BuildingDimensionRangeShardSpec(
+      @JsonProperty("bucketId") int bucketId,
+      @JsonProperty("dimensions") List<String> dimensions,
+      @JsonProperty("start") @Nullable StringTuple start,
+      @JsonProperty("end") @Nullable StringTuple end,
+      @JsonProperty("partitionNum") int partitionNum
+  )
+  {
+    this.bucketId = bucketId;
+    this.dimensions = dimensions;
+    this.start = start;
+    this.end = end;
+    this.partitionId = partitionNum;
+  }
+
+  @JsonProperty("dimensions")
+  public List<String> getDimensions()
+  {
+    return dimensions;
+  }
+
+  @Nullable
+  @JsonProperty("start")
+  public StringTuple getStart()
+  {
+    return start;
+  }
+
+  @Nullable
+  @JsonProperty("end")
+  public StringTuple getEnd()
+  {
+    return end;
+  }
+
+  @Override
+  @JsonProperty("partitionNum")
+  public int getPartitionNum()
+  {
+    return partitionId;
+  }
+
+  @Override
+  @JsonProperty("bucketId")
+  public int getBucketId()
+  {
+    return bucketId;
+  }
+
+  @Override
+  public DimensionRangeShardSpec convert(int numCorePartitions)
+  {
+    return dimensions != null && dimensions.size() == 1
+           ? new SingleDimensionShardSpec(
+        dimensions.get(0),
+        firstOrNull(start),
+        firstOrNull(end),
+        partitionId,
+        numCorePartitions
+    ) : new DimensionRangeShardSpec(
+        dimensions,
+        start,
+        end,
+        partitionId,
+        numCorePartitions
+    );
+  }
+
+  private String firstOrNull(StringTuple tuple)
+  {
+    return tuple == null || tuple.size() < 1 ? null : tuple.get(0);
+  }
+
+  @Override
+  public <T> PartitionChunk<T> createChunk(T obj)
+  {
+    return new NumberedPartitionChunk<>(partitionId, 0, obj);
+  }
+
+  @Override
+  public boolean equals(Object o)
+  {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    BuildingDimensionRangeShardSpec that = (BuildingDimensionRangeShardSpec) o;
+    return bucketId == that.bucketId &&
+           partitionId == that.partitionId &&
+           Objects.equals(dimensions, that.dimensions) &&
+           Objects.equals(start, that.start) &&
+           Objects.equals(end, that.end);
+  }
+
+  @Override
+  public int hashCode()
+  {
+    return Objects.hash(bucketId, dimensions, start, end, partitionId);
+  }
+
+  @Override
+  public String toString()
+  {
+    return "BuildingDimensionRangeShardSpec{" +
+           "bucketId=" + bucketId +
+           ", dimension='" + dimensions + '\'' +
+           ", start='" + start + '\'' +
+           ", end='" + end + '\'' +
+           ", partitionNum=" + partitionId +
+           '}';
+  }
+}
diff --git a/core/src/main/java/org/apache/druid/timeline/partition/RangeBucketShardSpec.java b/core/src/main/java/org/apache/druid/timeline/partition/DimensionRangeBucketShardSpec.java
similarity index 57%
copy from core/src/main/java/org/apache/druid/timeline/partition/RangeBucketShardSpec.java
copy to core/src/main/java/org/apache/druid/timeline/partition/DimensionRangeBucketShardSpec.java
index 00b01f1..1cdedb2 100644
--- a/core/src/main/java/org/apache/druid/timeline/partition/RangeBucketShardSpec.java
+++ b/core/src/main/java/org/apache/druid/timeline/partition/DimensionRangeBucketShardSpec.java
@@ -21,7 +21,9 @@ package org.apache.druid.timeline.partition;
 
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.base.Preconditions;
 import org.apache.druid.data.input.InputRow;
+import org.apache.druid.data.input.StringTuple;
 import org.apache.druid.java.util.common.ISE;
 
 import javax.annotation.Nullable;
@@ -31,29 +33,39 @@ import java.util.Objects;
 /**
  * See {@link BucketNumberedShardSpec} for how this class is used.
  *
- * @see BuildingSingleDimensionShardSpec
+ * @see BuildingDimensionRangeShardSpec
  */
-public class RangeBucketShardSpec implements BucketNumberedShardSpec<BuildingSingleDimensionShardSpec>
+public class DimensionRangeBucketShardSpec implements BucketNumberedShardSpec<BuildingDimensionRangeShardSpec>
 {
-  public static final String TYPE = "bucket_single_dim";
+  public static final String TYPE = "bucket_range";
 
   private final int bucketId;
-  private final String dimension;
+  private final List<String> dimensions;
   @Nullable
-  private final String start;
+  private final StringTuple start;
   @Nullable
-  private final String end;
+  private final StringTuple end;
 
   @JsonCreator
-  public RangeBucketShardSpec(
+  public DimensionRangeBucketShardSpec(
       @JsonProperty("bucketId") int bucketId,
-      @JsonProperty("dimension") String dimension,
-      @JsonProperty("start") @Nullable String start,
-      @JsonProperty("end") @Nullable String end
+      @JsonProperty("dimensions") List<String> dimensions,
+      @JsonProperty("start") @Nullable StringTuple start,
+      @JsonProperty("end") @Nullable StringTuple end
   )
   {
+    // Verify that the tuple sizes and number of dimensions are the same
+    Preconditions.checkArgument(
+        start == null || start.size() == dimensions.size(),
+        "Start tuple must either be null or of the same size as the number of partition dimensions"
+    );
+    Preconditions.checkArgument(
+        end == null || end.size() == dimensions.size(),
+        "End tuple must either be null or of the same size as the number of partition dimensions"
+    );
+
     this.bucketId = bucketId;
-    this.dimension = dimension;
+    this.dimensions = dimensions;
     this.start = start;
     this.end = end;
   }
@@ -66,34 +78,29 @@ public class RangeBucketShardSpec implements BucketNumberedShardSpec<BuildingSin
   }
 
   @JsonProperty
-  public String getDimension()
+  public List<String> getDimensions()
   {
-    return dimension;
+    return dimensions;
   }
 
   @Nullable
   @JsonProperty
-  public String getStart()
+  public StringTuple getStart()
   {
     return start;
   }
 
   @Nullable
   @JsonProperty
-  public String getEnd()
+  public StringTuple getEnd()
   {
     return end;
   }
 
   @Override
-  public BuildingSingleDimensionShardSpec convert(int partitionId)
-  {
-    return new BuildingSingleDimensionShardSpec(bucketId, dimension, start, end, partitionId);
-  }
-
-  public boolean isInChunk(InputRow inputRow)
+  public BuildingDimensionRangeShardSpec convert(int partitionId)
   {
-    return SingleDimensionShardSpec.isInChunk(dimension, start, end, inputRow);
+    return new BuildingDimensionRangeShardSpec(bucketId, dimensions, start, end, partitionId);
   }
 
   @Override
@@ -101,7 +108,7 @@ public class RangeBucketShardSpec implements BucketNumberedShardSpec<BuildingSin
   {
     return (long timestamp, InputRow row) -> {
       for (ShardSpec spec : shardSpecs) {
-        if (((RangeBucketShardSpec) spec).isInChunk(row)) {
+        if (((DimensionRangeBucketShardSpec) spec).isInChunk(row)) {
           return spec;
         }
       }
@@ -109,6 +116,11 @@ public class RangeBucketShardSpec implements BucketNumberedShardSpec<BuildingSin
     };
   }
 
+  private boolean isInChunk(InputRow inputRow)
+  {
+    return DimensionRangeShardSpec.isInChunk(dimensions, start, end, inputRow);
+  }
+
   @Override
   public boolean equals(Object o)
   {
@@ -118,9 +130,9 @@ public class RangeBucketShardSpec implements BucketNumberedShardSpec<BuildingSin
     if (o == null || getClass() != o.getClass()) {
       return false;
     }
-    RangeBucketShardSpec bucket = (RangeBucketShardSpec) o;
+    DimensionRangeBucketShardSpec bucket = (DimensionRangeBucketShardSpec) o;
     return bucketId == bucket.bucketId &&
-           Objects.equals(dimension, bucket.dimension) &&
+           Objects.equals(dimensions, bucket.dimensions) &&
            Objects.equals(start, bucket.start) &&
            Objects.equals(end, bucket.end);
   }
@@ -128,15 +140,15 @@ public class RangeBucketShardSpec implements BucketNumberedShardSpec<BuildingSin
   @Override
   public int hashCode()
   {
-    return Objects.hash(bucketId, dimension, start, end);
+    return Objects.hash(bucketId, dimensions, start, end);
   }
 
   @Override
   public String toString()
   {
-    return "RangeBucket{" +
+    return "DimensionRangeBucketShardSpec{" +
            ", bucketId=" + bucketId +
-           ", dimension='" + dimension + '\'' +
+           ", dimension='" + dimensions + '\'' +
            ", start='" + start + '\'' +
            ", end='" + end + '\'' +
            '}';
diff --git a/core/src/main/java/org/apache/druid/timeline/partition/SingleDimensionShardSpec.java b/core/src/main/java/org/apache/druid/timeline/partition/DimensionRangeShardSpec.java
similarity index 55%
copy from core/src/main/java/org/apache/druid/timeline/partition/SingleDimensionShardSpec.java
copy to core/src/main/java/org/apache/druid/timeline/partition/DimensionRangeShardSpec.java
index da5024a..d4a964c 100644
--- a/core/src/main/java/org/apache/druid/timeline/partition/SingleDimensionShardSpec.java
+++ b/core/src/main/java/org/apache/druid/timeline/partition/DimensionRangeShardSpec.java
@@ -21,73 +21,83 @@ package org.apache.druid.timeline.partition;
 
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
-import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
-import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Range;
 import com.google.common.collect.RangeSet;
 import org.apache.druid.data.input.InputRow;
+import org.apache.druid.data.input.StringTuple;
 import org.apache.druid.java.util.common.ISE;
 
 import javax.annotation.Nullable;
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 
 /**
- * {@link ShardSpec} for range partitioning based on a single dimension
+ * {@link ShardSpec} for partitioning based on ranges of one or more dimensions.
  */
-public class SingleDimensionShardSpec implements ShardSpec
+public class DimensionRangeShardSpec implements ShardSpec
 {
   public static final int UNKNOWN_NUM_CORE_PARTITIONS = -1;
 
-  private final String dimension;
+  private final List<String> dimensions;
   @Nullable
-  private final String start;
+  private final StringTuple start;
   @Nullable
-  private final String end;
+  private final StringTuple end;
   private final int partitionNum;
   private final int numCorePartitions;
 
+  private final String firstDimStart;
+  private final String firstDimEnd;
+
   /**
-   * @param dimension    partition dimension
+   * @param dimensions   partition dimensions
    * @param start        inclusive start of this range
    * @param end          exclusive end of this range
    * @param partitionNum unique ID for this shard
    */
   @JsonCreator
-  public SingleDimensionShardSpec(
-      @JsonProperty("dimension") String dimension,
-      @JsonProperty("start") @Nullable String start,
-      @JsonProperty("end") @Nullable String end,
+  public DimensionRangeShardSpec(
+      @JsonProperty("dimensions") List<String> dimensions,
+      @JsonProperty("start") @Nullable StringTuple start,
+      @JsonProperty("end") @Nullable StringTuple end,
       @JsonProperty("partitionNum") int partitionNum,
       @JsonProperty("numCorePartitions") @Nullable Integer numCorePartitions // nullable for backward compatibility
   )
   {
     Preconditions.checkArgument(partitionNum >= 0, "partitionNum >= 0");
-    this.dimension = Preconditions.checkNotNull(dimension, "dimension");
+    Preconditions.checkArgument(
+        dimensions != null && !dimensions.isEmpty(),
+        "dimensions should be non-null and non-empty"
+    );
+
+    this.dimensions = dimensions;
     this.start = start;
     this.end = end;
     this.partitionNum = partitionNum;
     this.numCorePartitions = numCorePartitions == null ? UNKNOWN_NUM_CORE_PARTITIONS : numCorePartitions;
+    this.firstDimStart = getFirstValueOrNull(start);
+    this.firstDimEnd = getFirstValueOrNull(end);
   }
 
-  @JsonProperty("dimension")
-  public String getDimension()
+  @JsonProperty("dimensions")
+  public List<String> getDimensions()
   {
-    return dimension;
+    return dimensions;
   }
 
   @Nullable
   @JsonProperty("start")
-  public String getStart()
+  public StringTuple getStartTuple()
   {
     return start;
   }
 
   @Nullable
   @JsonProperty("end")
-  public String getEnd()
+  public StringTuple getEndTuple()
   {
     return end;
   }
@@ -106,17 +116,22 @@ public class SingleDimensionShardSpec implements ShardSpec
     return numCorePartitions;
   }
 
+  public boolean isNumCorePartitionsUnknown()
+  {
+    return numCorePartitions == UNKNOWN_NUM_CORE_PARTITIONS;
+  }
+
   @Override
   public ShardSpecLookup getLookup(final List<? extends ShardSpec> shardSpecs)
   {
     return createLookup(shardSpecs);
   }
 
-  static ShardSpecLookup createLookup(List<? extends ShardSpec> shardSpecs)
+  private static ShardSpecLookup createLookup(List<? extends ShardSpec> shardSpecs)
   {
     return (long timestamp, InputRow row) -> {
       for (ShardSpec spec : shardSpecs) {
-        if (((SingleDimensionShardSpec) spec).isInChunk(row)) {
+        if (((DimensionRangeShardSpec) spec).isInChunk(row)) {
           return spec;
         }
       }
@@ -127,20 +142,20 @@ public class SingleDimensionShardSpec implements ShardSpec
   @Override
   public List<String> getDomainDimensions()
   {
-    return ImmutableList.of(dimension);
+    return Collections.unmodifiableList(dimensions);
   }
 
-  private Range<String> getRange()
+  private Range<String> getFirstDimRange()
   {
     Range<String> range;
-    if (start == null && end == null) {
+    if (firstDimStart == null && firstDimEnd == null) {
       range = Range.all();
-    } else if (start == null) {
-      range = Range.atMost(end);
-    } else if (end == null) {
-      range = Range.atLeast(start);
+    } else if (firstDimStart == null) {
+      range = Range.atMost(firstDimEnd);
+    } else if (firstDimEnd == null) {
+      range = Range.atLeast(firstDimStart);
     } else {
-      range = Range.closed(start, end);
+      range = Range.closed(firstDimStart, firstDimEnd);
     }
     return range;
   }
@@ -148,57 +163,53 @@ public class SingleDimensionShardSpec implements ShardSpec
   @Override
   public boolean possibleInDomain(Map<String, RangeSet<String>> domain)
   {
-    RangeSet<String> rangeSet = domain.get(dimension);
+    RangeSet<String> rangeSet = domain.get(dimensions.get(0));
     if (rangeSet == null) {
       return true;
     }
-    return !rangeSet.subRangeSet(getRange()).isEmpty();
+    return !rangeSet.subRangeSet(getFirstDimRange()).isEmpty();
   }
 
   @Override
   public <T> PartitionChunk<T> createChunk(T obj)
   {
-    if (numCorePartitions == UNKNOWN_NUM_CORE_PARTITIONS) {
-      return new StringPartitionChunk<>(start, end, partitionNum, obj);
+    if (isNumCorePartitionsUnknown()) {
+      return StringPartitionChunk.make(start, end, partitionNum, obj);
     } else {
       return new NumberedPartitionChunk<>(partitionNum, numCorePartitions, obj);
     }
   }
 
-  @VisibleForTesting
-  boolean isInChunk(InputRow inputRow)
+  private boolean isInChunk(InputRow inputRow)
   {
-    return isInChunk(dimension, start, end, inputRow);
+    return isInChunk(dimensions, start, end, inputRow);
   }
 
-  private static boolean checkValue(@Nullable String start, @Nullable String end, String value)
+  public static boolean isInChunk(
+      List<String> dimensions,
+      @Nullable StringTuple start,
+      @Nullable StringTuple end,
+      InputRow inputRow
+  )
   {
-    if (value == null) {
-      return start == null;
+    final String[] inputDimensionValues = new String[dimensions.size()];
+    for (int i = 0; i < dimensions.size(); ++i) {
+      // Get the values of this dimension, treat multiple values as null
+      List<String> values = inputRow.getDimension(dimensions.get(i));
+      inputDimensionValues[i] = values != null && values.size() == 1 ? values.get(0) : null;
     }
+    final StringTuple inputRowTuple = StringTuple.create(inputDimensionValues);
 
-    if (start == null) {
-      return end == null || value.compareTo(end) < 0;
-    }
+    int inputVsStart = inputRowTuple.compareTo(start);
+    int inputVsEnd = inputRowTuple.compareTo(end);
 
-    return value.compareTo(start) >= 0 &&
-           (end == null || value.compareTo(end) < 0);
+    return (inputVsStart >= 0 || start == null)
+           && (inputVsEnd < 0 || end == null);
   }
 
-  public static boolean isInChunk(
-      String dimension,
-      @Nullable String start,
-      @Nullable String end,
-      InputRow inputRow
-  )
+  private static String getFirstValueOrNull(StringTuple values)
   {
-    final List<String> values = inputRow.getDimension(dimension);
-
-    if (values == null || values.size() != 1) {
-      return checkValue(start, end, null);
-    } else {
-      return checkValue(start, end, values.get(0));
-    }
+    return values != null && values.size() > 0 ? values.get(0) : null;
   }
 
   @Override
@@ -210,10 +221,10 @@ public class SingleDimensionShardSpec implements ShardSpec
     if (o == null || getClass() != o.getClass()) {
       return false;
     }
-    SingleDimensionShardSpec shardSpec = (SingleDimensionShardSpec) o;
+    DimensionRangeShardSpec shardSpec = (DimensionRangeShardSpec) o;
     return partitionNum == shardSpec.partitionNum &&
            numCorePartitions == shardSpec.numCorePartitions &&
-           Objects.equals(dimension, shardSpec.dimension) &&
+           Objects.equals(dimensions, shardSpec.dimensions) &&
            Objects.equals(start, shardSpec.start) &&
            Objects.equals(end, shardSpec.end);
   }
@@ -221,14 +232,14 @@ public class SingleDimensionShardSpec implements ShardSpec
   @Override
   public int hashCode()
   {
-    return Objects.hash(dimension, start, end, partitionNum, numCorePartitions);
+    return Objects.hash(dimensions, start, end, partitionNum, numCorePartitions);
   }
 
   @Override
   public String toString()
   {
-    return "SingleDimensionShardSpec{" +
-           "dimension='" + dimension + '\'' +
+    return "DimensionRangeShardSpec{" +
+           "dimensions='" + dimensions + '\'' +
            ", start='" + start + '\'' +
            ", end='" + end + '\'' +
            ", partitionNum=" + partitionNum +
diff --git a/core/src/main/java/org/apache/druid/timeline/partition/PartitionBoundaries.java b/core/src/main/java/org/apache/druid/timeline/partition/PartitionBoundaries.java
index 7cd11bf..2247a91 100644
--- a/core/src/main/java/org/apache/druid/timeline/partition/PartitionBoundaries.java
+++ b/core/src/main/java/org/apache/druid/timeline/partition/PartitionBoundaries.java
@@ -20,6 +20,7 @@
 package org.apache.druid.timeline.partition;
 
 import com.google.common.collect.ForwardingList;
+import org.apache.druid.data.input.StringTuple;
 
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -31,9 +32,9 @@ import java.util.stream.Collectors;
 /**
  * List of range partition boundaries.
  */
-public class PartitionBoundaries extends ForwardingList<String> implements List<String>
+public class PartitionBoundaries extends ForwardingList<StringTuple> implements List<StringTuple>
 {
-  private final List<String> delegate;
+  private final List<StringTuple> delegate;
 
   // For jackson
   @SuppressWarnings("unused")
@@ -45,7 +46,7 @@ public class PartitionBoundaries extends ForwardingList<String> implements List<
   /**
    * @param partitions Elements corresponding to evenly-spaced fractional ranks of the distribution
    */
-  public PartitionBoundaries(String... partitions)
+  public PartitionBoundaries(StringTuple... partitions)
   {
     if (partitions.length == 0) {
       delegate = Collections.emptyList();
@@ -53,7 +54,7 @@ public class PartitionBoundaries extends ForwardingList<String> implements List<
     }
 
     // Future improvement: Handle skewed partitions better (e.g., many values are repeated).
-    List<String> partitionBoundaries = Arrays.stream(partitions)
+    List<StringTuple> partitionBoundaries = Arrays.stream(partitions)
                                              .distinct()
                                              .collect(Collectors.toCollection(ArrayList::new));
 
@@ -71,7 +72,7 @@ public class PartitionBoundaries extends ForwardingList<String> implements List<
   }
 
   @Override
-  protected List<String> delegate()
+  protected List<StringTuple> delegate()
   {
     return delegate;
   }
diff --git a/core/src/main/java/org/apache/druid/timeline/partition/ShardSpec.java b/core/src/main/java/org/apache/druid/timeline/partition/ShardSpec.java
index 5098a3c..16a60c1 100644
--- a/core/src/main/java/org/apache/druid/timeline/partition/ShardSpec.java
+++ b/core/src/main/java/org/apache/druid/timeline/partition/ShardSpec.java
@@ -35,6 +35,7 @@ import java.util.Map;
 @JsonSubTypes({
     @JsonSubTypes.Type(name = "none", value = NoneShardSpec.class),
     @JsonSubTypes.Type(name = "single", value = SingleDimensionShardSpec.class),
+    @JsonSubTypes.Type(name = "range", value = DimensionRangeShardSpec.class),
     @JsonSubTypes.Type(name = "linear", value = LinearShardSpec.class),
     @JsonSubTypes.Type(name = "numbered", value = NumberedShardSpec.class),
     @JsonSubTypes.Type(name = "hashed", value = HashBasedNumberedShardSpec.class),
@@ -44,11 +45,13 @@ import java.util.Map;
     @JsonSubTypes.Type(name = BuildingNumberedShardSpec.TYPE, value = BuildingNumberedShardSpec.class),
     @JsonSubTypes.Type(name = BuildingHashBasedNumberedShardSpec.TYPE, value = BuildingHashBasedNumberedShardSpec.class),
     @JsonSubTypes.Type(name = BuildingSingleDimensionShardSpec.TYPE, value = BuildingSingleDimensionShardSpec.class),
+    @JsonSubTypes.Type(name = BuildingDimensionRangeShardSpec.TYPE, value = BuildingDimensionRangeShardSpec.class),
     // BucketShardSpecs are the shardSpec with missing partitionId and numCorePartitions.
     // These shardSpecs must not be used in segment push.
     // See BucketShardSpec for more details.
     @JsonSubTypes.Type(name = HashBucketShardSpec.TYPE, value = HashBucketShardSpec.class),
-    @JsonSubTypes.Type(name = RangeBucketShardSpec.TYPE, value = RangeBucketShardSpec.class)
+    @JsonSubTypes.Type(name = SingleDimensionRangeBucketShardSpec.TYPE, value = SingleDimensionRangeBucketShardSpec.class),
+    @JsonSubTypes.Type(name = DimensionRangeBucketShardSpec.TYPE, value = DimensionRangeBucketShardSpec.class)
 })
 public interface ShardSpec
 {
diff --git a/core/src/main/java/org/apache/druid/timeline/partition/RangeBucketShardSpec.java b/core/src/main/java/org/apache/druid/timeline/partition/SingleDimensionRangeBucketShardSpec.java
similarity index 90%
rename from core/src/main/java/org/apache/druid/timeline/partition/RangeBucketShardSpec.java
rename to core/src/main/java/org/apache/druid/timeline/partition/SingleDimensionRangeBucketShardSpec.java
index 00b01f1..1ee236d 100644
--- a/core/src/main/java/org/apache/druid/timeline/partition/RangeBucketShardSpec.java
+++ b/core/src/main/java/org/apache/druid/timeline/partition/SingleDimensionRangeBucketShardSpec.java
@@ -33,7 +33,7 @@ import java.util.Objects;
  *
  * @see BuildingSingleDimensionShardSpec
  */
-public class RangeBucketShardSpec implements BucketNumberedShardSpec<BuildingSingleDimensionShardSpec>
+public class SingleDimensionRangeBucketShardSpec implements BucketNumberedShardSpec<BuildingSingleDimensionShardSpec>
 {
   public static final String TYPE = "bucket_single_dim";
 
@@ -45,7 +45,7 @@ public class RangeBucketShardSpec implements BucketNumberedShardSpec<BuildingSin
   private final String end;
 
   @JsonCreator
-  public RangeBucketShardSpec(
+  public SingleDimensionRangeBucketShardSpec(
       @JsonProperty("bucketId") int bucketId,
       @JsonProperty("dimension") String dimension,
       @JsonProperty("start") @Nullable String start,
@@ -101,7 +101,7 @@ public class RangeBucketShardSpec implements BucketNumberedShardSpec<BuildingSin
   {
     return (long timestamp, InputRow row) -> {
       for (ShardSpec spec : shardSpecs) {
-        if (((RangeBucketShardSpec) spec).isInChunk(row)) {
+        if (((SingleDimensionRangeBucketShardSpec) spec).isInChunk(row)) {
           return spec;
         }
       }
@@ -118,7 +118,7 @@ public class RangeBucketShardSpec implements BucketNumberedShardSpec<BuildingSin
     if (o == null || getClass() != o.getClass()) {
       return false;
     }
-    RangeBucketShardSpec bucket = (RangeBucketShardSpec) o;
+    SingleDimensionRangeBucketShardSpec bucket = (SingleDimensionRangeBucketShardSpec) o;
     return bucketId == bucket.bucketId &&
            Objects.equals(dimension, bucket.dimension) &&
            Objects.equals(start, bucket.start) &&
@@ -134,7 +134,7 @@ public class RangeBucketShardSpec implements BucketNumberedShardSpec<BuildingSin
   @Override
   public String toString()
   {
-    return "RangeBucket{" +
+    return "SingleDimensionRangeBucketShardSpec{" +
            ", bucketId=" + bucketId +
            ", dimension='" + dimension + '\'' +
            ", start='" + start + '\'' +
diff --git a/core/src/main/java/org/apache/druid/timeline/partition/SingleDimensionShardSpec.java b/core/src/main/java/org/apache/druid/timeline/partition/SingleDimensionShardSpec.java
index da5024a..e4bff90 100644
--- a/core/src/main/java/org/apache/druid/timeline/partition/SingleDimensionShardSpec.java
+++ b/core/src/main/java/org/apache/druid/timeline/partition/SingleDimensionShardSpec.java
@@ -21,15 +21,17 @@ package org.apache.druid.timeline.partition;
 
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonValue;
 import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Range;
 import com.google.common.collect.RangeSet;
 import org.apache.druid.data.input.InputRow;
+import org.apache.druid.data.input.StringTuple;
 import org.apache.druid.java.util.common.ISE;
 
 import javax.annotation.Nullable;
+import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
@@ -37,7 +39,7 @@ import java.util.Objects;
 /**
  * {@link ShardSpec} for range partitioning based on a single dimension
  */
-public class SingleDimensionShardSpec implements ShardSpec
+public class SingleDimensionShardSpec extends DimensionRangeShardSpec
 {
   public static final int UNKNOWN_NUM_CORE_PARTITIONS = -1;
 
@@ -46,8 +48,6 @@ public class SingleDimensionShardSpec implements ShardSpec
   private final String start;
   @Nullable
   private final String end;
-  private final int partitionNum;
-  private final int numCorePartitions;
 
   /**
    * @param dimension    partition dimension
@@ -64,49 +64,57 @@ public class SingleDimensionShardSpec implements ShardSpec
       @JsonProperty("numCorePartitions") @Nullable Integer numCorePartitions // nullable for backward compatibility
   )
   {
-    Preconditions.checkArgument(partitionNum >= 0, "partitionNum >= 0");
-    this.dimension = Preconditions.checkNotNull(dimension, "dimension");
+    super(
+        dimension == null ? Collections.emptyList() : Collections.singletonList(dimension),
+        start == null ? null : StringTuple.create(start),
+        end == null ? null : StringTuple.create(end),
+        partitionNum,
+        numCorePartitions
+    );
+    this.dimension = dimension;
     this.start = start;
     this.end = end;
-    this.partitionNum = partitionNum;
-    this.numCorePartitions = numCorePartitions == null ? UNKNOWN_NUM_CORE_PARTITIONS : numCorePartitions;
   }
 
-  @JsonProperty("dimension")
+  /**
+   * Returns a Map to be used for serializing objects of this class. This is to
+   * ensure that a new field added in {@link DimensionRangeShardSpec} does
+   * not get serialized when serializing a {@code SingleDimensionShardSpec}.
+   *
+   * @return A map containing only the keys {@code "dimension"}, {@code "start"},
+   * {@code "end"}, {@code "partitionNum"} and {@code "numCorePartitions"}.
+   */
+  @JsonValue
+  public Map<String, Object> getSerializableObject()
+  {
+    Map<String, Object> jsonMap = new HashMap<>();
+    jsonMap.put("start", start);
+    jsonMap.put("end", end);
+    jsonMap.put("dimension", dimension);
+    jsonMap.put("partitionNum", getPartitionNum());
+    jsonMap.put("numCorePartitions", getNumCorePartitions());
+
+    return jsonMap;
+  }
+
   public String getDimension()
   {
     return dimension;
   }
 
   @Nullable
-  @JsonProperty("start")
   public String getStart()
   {
     return start;
   }
 
   @Nullable
-  @JsonProperty("end")
   public String getEnd()
   {
     return end;
   }
 
   @Override
-  @JsonProperty("partitionNum")
-  public int getPartitionNum()
-  {
-    return partitionNum;
-  }
-
-  @Override
-  @JsonProperty
-  public int getNumCorePartitions()
-  {
-    return numCorePartitions;
-  }
-
-  @Override
   public ShardSpecLookup getLookup(final List<? extends ShardSpec> shardSpecs)
   {
     return createLookup(shardSpecs);
@@ -124,12 +132,6 @@ public class SingleDimensionShardSpec implements ShardSpec
     };
   }
 
-  @Override
-  public List<String> getDomainDimensions()
-  {
-    return ImmutableList.of(dimension);
-  }
-
   private Range<String> getRange()
   {
     Range<String> range;
@@ -158,10 +160,10 @@ public class SingleDimensionShardSpec implements ShardSpec
   @Override
   public <T> PartitionChunk<T> createChunk(T obj)
   {
-    if (numCorePartitions == UNKNOWN_NUM_CORE_PARTITIONS) {
-      return new StringPartitionChunk<>(start, end, partitionNum, obj);
+    if (isNumCorePartitionsUnknown()) {
+      return StringPartitionChunk.makeForSingleDimension(start, end, getPartitionNum(), obj);
     } else {
-      return new NumberedPartitionChunk<>(partitionNum, numCorePartitions, obj);
+      return new NumberedPartitionChunk<>(getPartitionNum(), getNumCorePartitions(), obj);
     }
   }
 
@@ -211,8 +213,8 @@ public class SingleDimensionShardSpec implements ShardSpec
       return false;
     }
     SingleDimensionShardSpec shardSpec = (SingleDimensionShardSpec) o;
-    return partitionNum == shardSpec.partitionNum &&
-           numCorePartitions == shardSpec.numCorePartitions &&
+    return getPartitionNum() == shardSpec.getPartitionNum() &&
+           getNumCorePartitions() == shardSpec.getNumCorePartitions() &&
            Objects.equals(dimension, shardSpec.dimension) &&
            Objects.equals(start, shardSpec.start) &&
            Objects.equals(end, shardSpec.end);
@@ -221,7 +223,7 @@ public class SingleDimensionShardSpec implements ShardSpec
   @Override
   public int hashCode()
   {
-    return Objects.hash(dimension, start, end, partitionNum, numCorePartitions);
+    return Objects.hash(dimension, start, end, getPartitionNum(), getNumCorePartitions());
   }
 
   @Override
@@ -231,8 +233,8 @@ public class SingleDimensionShardSpec implements ShardSpec
            "dimension='" + dimension + '\'' +
            ", start='" + start + '\'' +
            ", end='" + end + '\'' +
-           ", partitionNum=" + partitionNum +
-           ", numCorePartitions=" + numCorePartitions +
+           ", partitionNum=" + getPartitionNum() +
+           ", numCorePartitions=" + getNumCorePartitions() +
            '}';
   }
 }
diff --git a/core/src/main/java/org/apache/druid/timeline/partition/StringPartitionChunk.java b/core/src/main/java/org/apache/druid/timeline/partition/StringPartitionChunk.java
index 28d7505..640dfcc 100644
--- a/core/src/main/java/org/apache/druid/timeline/partition/StringPartitionChunk.java
+++ b/core/src/main/java/org/apache/druid/timeline/partition/StringPartitionChunk.java
@@ -19,23 +19,37 @@
 
 package org.apache.druid.timeline.partition;
 
+import org.apache.druid.data.input.StringTuple;
+
+import java.util.Objects;
+
 /**
  */
 public class StringPartitionChunk<T> implements PartitionChunk<T>
 {
-  private final String start;
-  private final String end;
+  private final StringTuple start;
+  private final StringTuple end;
   private final int chunkNumber;
   private final T object;
 
-  public static <T> StringPartitionChunk<T> make(String start, String end, int chunkNumber, T obj)
+  public static <T> StringPartitionChunk<T> makeForSingleDimension(String start, String end, int chunkNumber, T obj)
+  {
+    return new StringPartitionChunk<>(
+        start == null ? null : StringTuple.create(start),
+        end == null ? null : StringTuple.create(end),
+        chunkNumber,
+        obj
+    );
+  }
+
+  public static <T> StringPartitionChunk<T> make(StringTuple start, StringTuple end, int chunkNumber, T obj)
   {
-    return new StringPartitionChunk<T>(start, end, chunkNumber, obj);
+    return new StringPartitionChunk<>(start, end, chunkNumber, obj);
   }
 
-  public StringPartitionChunk(
-      String start,
-      String end,
+  private StringPartitionChunk(
+      StringTuple start,
+      StringTuple end,
       int chunkNumber,
       T object
   )
@@ -58,7 +72,7 @@ public class StringPartitionChunk<T> implements PartitionChunk<T>
     if (chunk instanceof StringPartitionChunk) {
       StringPartitionChunk<T> stringChunk = (StringPartitionChunk<T>) chunk;
 
-      return !stringChunk.isStart() && stringChunk.start.equals(end);
+      return !stringChunk.isStart() && Objects.equals(stringChunk.start, end);
     }
 
     return false;
@@ -111,8 +125,8 @@ public class StringPartitionChunk<T> implements PartitionChunk<T>
   @Override
   public int hashCode()
   {
-    int result = start != null ? start.hashCode() : 0;
-    result = 31 * result + (end != null ? end.hashCode() : 0);
+    int result = Objects.hashCode(start);
+    result = 31 * result + Objects.hashCode(end);
     result = 31 * result + (object != null ? object.hashCode() : 0);
     return result;
   }
diff --git a/core/src/test/java/org/apache/druid/data/input/StringTupleTest.java b/core/src/test/java/org/apache/druid/data/input/StringTupleTest.java
new file mode 100644
index 0000000..7ce439d
--- /dev/null
+++ b/core/src/test/java/org/apache/druid/data/input/StringTupleTest.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.data.input;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.junit.Test;
+
+import java.io.IOException;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+public class StringTupleTest
+{
+
+  @Test
+  public void testSize()
+  {
+    StringTuple tuple = StringTuple.create("a", "b", "c");
+    assertEquals(3, tuple.size());
+  }
+
+  @Test
+  public void testGet()
+  {
+    StringTuple tuple = StringTuple.create("a", "b", "c");
+    assertEquals("a", tuple.get(0));
+    assertEquals("b", tuple.get(1));
+    assertEquals("c", tuple.get(2));
+  }
+
+  @Test
+  public void testToArray()
+  {
+    StringTuple tuple = StringTuple.create("a", "b", "c");
+    assertEquals(new String[]{"a", "b", "c"}, tuple.toArray());
+  }
+
+  @Test
+  public void testWithNullValues()
+  {
+    StringTuple tuple = StringTuple.create("a", null, "b");
+    assertEquals("a", tuple.get(0));
+    assertNull(tuple.get(1));
+    assertEquals("b", tuple.get(2));
+
+    tuple = StringTuple.create(null, null);
+    assertNull(tuple.get(0));
+    assertNull(tuple.get(1));
+
+    tuple = StringTuple.create((String) null);
+    assertNull(tuple.get(0));
+  }
+
+  @Test
+  public void testSerde() throws IOException
+  {
+    StringTuple original = StringTuple.create("a", "b", "c");
+    final ObjectMapper mapper = new ObjectMapper();
+    String json = mapper.writeValueAsString(original);
+
+    StringTuple deserialized = mapper.readValue(json, StringTuple.class);
+    assertEquals(original, deserialized);
+  }
+
+  @Test
+  public void testCompareTo()
+  {
+    StringTuple lhs = StringTuple.create("c", "10");
+
+    // Objects equal to lhs
+    assertEquals(
+        0,
+        lhs.compareTo(StringTuple.create("c", "10"))
+    );
+
+    // Objects smaller than lhs
+    assertTrue(lhs.compareTo(null) > 0);
+    assertTrue(lhs.compareTo(StringTuple.create(null, null)) > 0);
+    assertTrue(lhs.compareTo(StringTuple.create("c", "09")) > 0);
+    assertTrue(lhs.compareTo(StringTuple.create("b", "01")) > 0);
+
+    // Objects bigger than lhs
+    assertTrue(lhs.compareTo(StringTuple.create("c", "11")) < 0);
+    assertTrue(lhs.compareTo(StringTuple.create("d", "01")) < 0);
+  }
+
+  @Test
+  public void testEquals()
+  {
+    assertEquals(
+        StringTuple.create((String) null),
+        StringTuple.create((String) null)
+    );
+    assertEquals(
+        StringTuple.create("a"),
+        StringTuple.create("a")
+    );
+    assertEquals(
+        StringTuple.create(null, null, null),
+        StringTuple.create(null, null, null)
+    );
+    assertEquals(
+        StringTuple.create("a", "10", "z"),
+        StringTuple.create("a", "10", "z")
+    );
+    assertEquals(
+        new StringTuple(new String[]{"a", "10", "z"}),
+        StringTuple.create("a", "10", "z")
+    );
+
+    assertNotEquals(
+        StringTuple.create(null, null, null),
+        StringTuple.create(null, null)
+    );
+    assertNotEquals(
+        StringTuple.create("a"),
+        StringTuple.create((String) null)
+    );
+    assertNotEquals(
+        StringTuple.create("a", "b"),
+        StringTuple.create("a", "c")
+    );
+    assertNotEquals(
+        StringTuple.create("a", "b"),
+        StringTuple.create("c", "b")
+    );
+  }
+}
diff --git a/core/src/test/java/org/apache/druid/indexer/partitions/DimensionRangePartitionsSpecTest.java b/core/src/test/java/org/apache/druid/indexer/partitions/DimensionRangePartitionsSpecTest.java
new file mode 100644
index 0000000..7e31569
--- /dev/null
+++ b/core/src/test/java/org/apache/druid/indexer/partitions/DimensionRangePartitionsSpecTest.java
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexer.partitions;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+public class DimensionRangePartitionsSpecTest
+{
+  private static final Integer TARGET_ROWS_PER_SEGMENT = 1;
+  private static final Integer MAX_ROWS_PER_SEGMENT = null;
+  private static final Integer HISTORICAL_NULL = PartitionsSpec.HISTORICAL_NULL;
+  private static final List<String> PARTITION_DIMENSIONS = Arrays.asList("a", "b");
+  private static final boolean ASSUME_GROUPED = false;
+  private static final DimensionRangePartitionsSpec SPEC = new DimensionRangePartitionsSpec(
+      TARGET_ROWS_PER_SEGMENT,
+      MAX_ROWS_PER_SEGMENT,
+      PARTITION_DIMENSIONS,
+      ASSUME_GROUPED
+  );
+  private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+
+  @Rule
+  public ExpectedException exception = ExpectedException.none();
+
+  @Test
+  public void serde()
+  {
+    String json = serialize(SPEC);
+    DimensionRangePartitionsSpec spec = deserialize(json);
+    Assert.assertEquals(SPEC, spec);
+  }
+
+  @Test
+  public void havingNeitherTargetNorMaxForbidden()
+  {
+    new TestSpecBuilder()
+        .testIllegalArgumentException("Exactly one of targetRowsPerSegment or maxRowsPerSegment must be present");
+  }
+
+  @Test
+  public void targetRowsPerSegmentMustBePositive()
+  {
+    new TestSpecBuilder()
+        .targetRowsPerSegment(0)
+        .testIllegalArgumentException("targetRowsPerSegment must be greater than 0");
+  }
+
+  @Test
+  public void targetRowsPerSegmentHistoricalNull()
+  {
+    new TestSpecBuilder()
+        .targetRowsPerSegment(HISTORICAL_NULL)
+        .testIllegalArgumentException("Exactly one of targetRowsPerSegment or maxRowsPerSegment must be present");
+  }
+
+  @Test
+  public void targetMaxRowsPerSegmentOverflows()
+  {
+    new TestSpecBuilder()
+        .targetRowsPerSegment(Integer.MAX_VALUE)
+        .testIllegalArgumentException("targetRowsPerSegment is too large");
+  }
+
+  @Test
+  public void maxRowsPerSegmentMustBePositive()
+  {
+    new TestSpecBuilder()
+        .maxRowsPerSegment(0)
+        .testIllegalArgumentException("maxRowsPerSegment must be greater than 0");
+  }
+
+  @Test
+  public void maxRowsPerSegmentHistoricalNull()
+  {
+    new TestSpecBuilder()
+        .maxRowsPerSegment(HISTORICAL_NULL)
+        .testIllegalArgumentException("Exactly one of targetRowsPerSegment or maxRowsPerSegment must be present");
+  }
+
+  @Test
+  public void resolvesMaxFromTargetRowsPerSegment()
+  {
+    DimensionRangePartitionsSpec spec = new TestSpecBuilder()
+        .targetRowsPerSegment(123)
+        .build();
+    Assert.assertEquals(184, spec.getMaxRowsPerSegment().intValue());
+  }
+
+  @Test
+  public void resolvesMaxFromMaxRowsPerSegment()
+  {
+    DimensionRangePartitionsSpec spec = new TestSpecBuilder()
+        .maxRowsPerSegment(123)
+        .build();
+    Assert.assertEquals(123, spec.getMaxRowsPerSegment().intValue());
+  }
+
+  @Test
+  public void getPartitionDimensionFromNull()
+  {
+    // Verify that partitionDimensions must be non-null
+    new TestSpecBuilder()
+        .partitionDimensions(null)
+        .testIllegalArgumentException("partitionDimensions must be specified");
+  }
+
+  @Test
+  public void getPartitionDimensionFromNonNull()
+  {
+    List<String> partitionDimensions = Collections.singletonList("a");
+    DimensionRangePartitionsSpec spec = new TestSpecBuilder()
+        .targetRowsPerSegment(10)
+        .partitionDimensions(partitionDimensions)
+        .build();
+    Assert.assertEquals(partitionDimensions, spec.getPartitionDimensions());
+  }
+
+  private static String serialize(Object object)
+  {
+    try {
+      return OBJECT_MAPPER.writeValueAsString(object);
+    }
+    catch (JsonProcessingException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  private static DimensionRangePartitionsSpec deserialize(String serialized)
+  {
+    try {
+      return OBJECT_MAPPER.readValue(serialized, DimensionRangePartitionsSpec.class);
+    }
+    catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  /**
+   * Spec builder used in this test.
+   */
+  private class TestSpecBuilder
+  {
+    private Integer targetRowsPerSegment;
+    private Integer maxRowsPerSegment;
+    private List<String> partitionDimensions = Collections.emptyList();
+
+    TestSpecBuilder targetRowsPerSegment(Integer targetRowsPerSegment)
+    {
+      this.targetRowsPerSegment = targetRowsPerSegment;
+      return this;
+    }
+
+    TestSpecBuilder maxRowsPerSegment(Integer maxRowsPerSegment)
+    {
+      this.maxRowsPerSegment = maxRowsPerSegment;
+      return this;
+    }
+
+    TestSpecBuilder partitionDimensions(List<String> partitionDimensions)
+    {
+      this.partitionDimensions = partitionDimensions;
+      return this;
+    }
+
+    void testIllegalArgumentException(String exceptionExpectedMessage)
+    {
+      exception.expect(IllegalArgumentException.class);
+      exception.expectMessage(exceptionExpectedMessage);
+      build();
+    }
+
+    DimensionRangePartitionsSpec build()
+    {
+      return new DimensionRangePartitionsSpec(
+          targetRowsPerSegment,
+          maxRowsPerSegment,
+          partitionDimensions,
+          ASSUME_GROUPED
+      );
+    }
+  }
+}
diff --git a/core/src/test/java/org/apache/druid/indexer/partitions/SingleDimensionPartitionsSpecTest.java b/core/src/test/java/org/apache/druid/indexer/partitions/SingleDimensionPartitionsSpecTest.java
index 51883a6..291a8d9 100644
--- a/core/src/test/java/org/apache/druid/indexer/partitions/SingleDimensionPartitionsSpecTest.java
+++ b/core/src/test/java/org/apache/druid/indexer/partitions/SingleDimensionPartitionsSpecTest.java
@@ -27,6 +27,7 @@ import org.junit.Test;
 import org.junit.rules.ExpectedException;
 
 import java.util.Collections;
+import java.util.Map;
 
 public class SingleDimensionPartitionsSpecTest
 {
@@ -69,6 +70,57 @@ public class SingleDimensionPartitionsSpecTest
   }
 
   @Test
+  public void testDeserializeWithUnrecognizedProperty()
+  {
+    // Verify that single_dim spec does not recognize any extra fields from
+    // MultiDimensionPartitionsSpec
+    String json = "{"
+                  + "\"type\":\"single_dim\""
+                  + ",\"targetPartitionSize\":100"
+                  + ",\"partitionDimension\":\"dim1\""
+                  + ",\"partitionDimensions\":[\"dim2\"]"
+                  + "}";
+
+    try {
+      deserialize(json);
+    }
+    catch (RuntimeException e) {
+      Assert.assertTrue(e.getMessage().contains(
+          "UnrecognizedPropertyException: Unrecognized field \"partitionDimensions\""
+      ));
+    }
+  }
+
+  @Test
+  public void testGetSerializableObjectContainsNoExtraField()
+  {
+    // This test verifies a serialized SingleDimensionPartitionsSpec has no field
+    // from the parent MultiDimensionPartitionsSpec
+    verifySerializableFields(SPEC);
+    verifySerializableFields(
+        new SingleDimensionPartitionsSpec(
+            null,
+            null,
+            "abc",
+            false,
+            100,
+            null
+        )
+    );
+  }
+
+  private void verifySerializableFields(SingleDimensionPartitionsSpec spec)
+  {
+    Map<String, Object> jsonMap = spec.getSerializableObject();
+
+    Assert.assertEquals(4, jsonMap.size());
+    Assert.assertTrue(jsonMap.containsKey(PartitionsSpec.MAX_ROWS_PER_SEGMENT));
+    Assert.assertTrue(jsonMap.containsKey(DimensionBasedPartitionsSpec.TARGET_ROWS_PER_SEGMENT));
+    Assert.assertTrue(jsonMap.containsKey(DimensionBasedPartitionsSpec.ASSUME_GROUPED));
+    Assert.assertTrue(jsonMap.containsKey("partitionDimension"));
+  }
+
+  @Test
   public void havingBothTargetForbidden()
   {
     new Tester()
@@ -114,7 +166,7 @@ public class SingleDimensionPartitionsSpecTest
   {
     new Tester()
         .targetPartitionSize(0)
-        .testIllegalArgumentException("targetPartitionSize must be greater than 0");
+        .testIllegalArgumentException("targetRowsPerSegment must be greater than 0");
   }
 
   @Test
@@ -130,7 +182,7 @@ public class SingleDimensionPartitionsSpecTest
   {
     new Tester()
         .targetPartitionSize(Integer.MAX_VALUE)
-        .testIllegalArgumentException("targetPartitionSize is too large");
+        .testIllegalArgumentException("targetRowsPerSegment is too large");
   }
 
   @Test
@@ -154,7 +206,7 @@ public class SingleDimensionPartitionsSpecTest
   {
     new Tester()
         .maxPartitionSize(0)
-        .testIllegalArgumentException("maxPartitionSize must be greater than 0");
+        .testIllegalArgumentException("maxRowsPerSegment must be greater than 0");
   }
 
   @Test
diff --git a/core/src/test/java/org/apache/druid/timeline/partition/BuildingDimensionRangeShardSpecTest.java b/core/src/test/java/org/apache/druid/timeline/partition/BuildingDimensionRangeShardSpecTest.java
new file mode 100644
index 0000000..9817f8d
--- /dev/null
+++ b/core/src/test/java/org/apache/druid/timeline/partition/BuildingDimensionRangeShardSpecTest.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.timeline.partition;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.InjectableValues.Std;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.jsontype.NamedType;
+import nl.jqno.equalsverifier.EqualsVerifier;
+import org.apache.druid.data.input.StringTuple;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+
+public class BuildingDimensionRangeShardSpecTest
+{
+  @Test
+  public void testConvert()
+  {
+    Assert.assertEquals(
+        new DimensionRangeShardSpec(
+            Arrays.asList("dim1", "dim2"),
+            StringTuple.create("start1", "start2"),
+            StringTuple.create("end1", "end2"),
+            5,
+            10
+        ),
+        new BuildingDimensionRangeShardSpec(
+            1,
+            Arrays.asList("dim1", "dim2"),
+            StringTuple.create("start1", "start2"),
+            StringTuple.create("end1", "end2"),
+            5
+        ).convert(10)
+    );
+  }
+
+  @Test
+  public void testConvert_withSingleDimension()
+  {
+    Assert.assertEquals(
+        new SingleDimensionShardSpec(
+            "dim",
+            "start",
+            "end",
+            5,
+            10
+        ),
+        new BuildingDimensionRangeShardSpec(
+            1,
+            Collections.singletonList("dim"),
+            StringTuple.create("start"),
+            StringTuple.create("end"),
+            5
+        ).convert(10)
+    );
+  }
+
+  @Test
+  public void testCreateChunk()
+  {
+    Assert.assertEquals(
+        new NumberedPartitionChunk<>(5, 0, "test"),
+        new BuildingDimensionRangeShardSpec(
+            1,
+            Arrays.asList("dim1", "dim2"),
+            StringTuple.create("start1", "start2"),
+            StringTuple.create("end1", "end2"),
+            5
+        ).createChunk("test")
+    );
+  }
+
+  @Test
+  public void testSerde() throws JsonProcessingException
+  {
+    final ObjectMapper mapper = ShardSpecTestUtils.initObjectMapper();
+    mapper.registerSubtypes(
+        new NamedType(BuildingDimensionRangeShardSpec.class, BuildingDimensionRangeShardSpec.TYPE)
+    );
+    mapper.setInjectableValues(new Std().addValue(ObjectMapper.class, mapper));
+    final BuildingDimensionRangeShardSpec original = new BuildingDimensionRangeShardSpec(
+        1,
+        Arrays.asList("dim1", "dim2"),
+        StringTuple.create("start1", "start2"),
+        StringTuple.create("end1", "end2"),
+        5
+    );
+    final String json = mapper.writeValueAsString(original);
+    final BuildingDimensionRangeShardSpec fromJson = (BuildingDimensionRangeShardSpec) mapper.readValue(
+        json,
+        ShardSpec.class
+    );
+    Assert.assertEquals(original, fromJson);
+  }
+
+  @Test
+  public void testEquals()
+  {
+    EqualsVerifier.forClass(BuildingDimensionRangeShardSpec.class).usingGetClass().verify();
+  }
+}
diff --git a/core/src/test/java/org/apache/druid/timeline/partition/DimensionRangeBucketShardSpecTest.java b/core/src/test/java/org/apache/druid/timeline/partition/DimensionRangeBucketShardSpecTest.java
new file mode 100644
index 0000000..3285c15
--- /dev/null
+++ b/core/src/test/java/org/apache/druid/timeline/partition/DimensionRangeBucketShardSpecTest.java
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.timeline.partition;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.InjectableValues.Std;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.jsontype.NamedType;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import nl.jqno.equalsverifier.EqualsVerifier;
+import org.apache.druid.data.input.MapBasedInputRow;
+import org.apache.druid.data.input.StringTuple;
+import org.apache.druid.java.util.common.DateTimes;
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+import java.util.Arrays;
+import java.util.List;
+
+public class DimensionRangeBucketShardSpecTest
+{
+
+  private static final List<String> DIMENSIONS = Arrays.asList("dim1", "dim2");
+
+  @Rule
+  public ExpectedException expectedException = ExpectedException.none();
+
+  @Test
+  public void testConvert()
+  {
+    Assert.assertEquals(
+        new BuildingDimensionRangeShardSpec(
+            1,
+            DIMENSIONS,
+            StringTuple.create("start1", "start2"),
+            StringTuple.create("end1", "end2"),
+            5
+        ),
+        new DimensionRangeBucketShardSpec(
+            1,
+            DIMENSIONS,
+            StringTuple.create("start1", "start2"),
+            StringTuple.create("end1", "end2")
+        ).convert(5)
+    );
+  }
+
+  @Test
+  public void testCreateChunk()
+  {
+    Assert.assertEquals(
+        new NumberedPartitionChunk<>(1, 0, "test"),
+        new DimensionRangeBucketShardSpec(
+            1,
+            DIMENSIONS,
+            StringTuple.create("start1", "start2"),
+            StringTuple.create("end1", "end2")
+        ).createChunk("test")
+    );
+  }
+
+  @Test
+  public void testShardSpecLookup()
+  {
+    final List<ShardSpec> shardSpecs = ImmutableList.of(
+        new DimensionRangeBucketShardSpec(0, DIMENSIONS, null, StringTuple.create("c", "12")),
+        new DimensionRangeBucketShardSpec(
+            1,
+            DIMENSIONS,
+            StringTuple.create("f", "13"),
+            StringTuple.create("i", "9")
+        ),
+        new DimensionRangeBucketShardSpec(2, DIMENSIONS, StringTuple.create("i", "9"), null)
+    );
+    final ShardSpecLookup lookup = shardSpecs.get(0).getLookup(shardSpecs);
+    final long currentTime = DateTimes.nowUtc().getMillis();
+    Assert.assertEquals(
+        shardSpecs.get(0),
+        lookup.getShardSpec(
+            currentTime,
+            new MapBasedInputRow(
+                currentTime,
+                DIMENSIONS,
+                ImmutableMap.of(DIMENSIONS.get(0), "a", DIMENSIONS.get(1), "12", "time", currentTime)
+            )
+        )
+    );
+    Assert.assertEquals(
+        shardSpecs.get(1),
+        lookup.getShardSpec(
+            currentTime,
+            new MapBasedInputRow(
+                currentTime,
+                DIMENSIONS, ImmutableMap.of(DIMENSIONS.get(0), "g", DIMENSIONS.get(1), "8", "time", currentTime)
+            )
+        )
+    );
+    Assert.assertEquals(
+        shardSpecs.get(2),
+        lookup.getShardSpec(
+            currentTime,
+            new MapBasedInputRow(
+                currentTime,
+                DIMENSIONS, ImmutableMap.of(DIMENSIONS.get(0), "k", DIMENSIONS.get(1), "14", "time", currentTime)
+            )
+        )
+    );
+  }
+
+  @Test
+  public void testSerde() throws JsonProcessingException
+  {
+    final ObjectMapper mapper = ShardSpecTestUtils.initObjectMapper();
+    mapper.registerSubtypes(new NamedType(
+        DimensionRangeBucketShardSpec.class,
+        DimensionRangeBucketShardSpec.TYPE
+    ));
+    mapper.setInjectableValues(new Std().addValue(ObjectMapper.class, mapper));
+    final DimensionRangeBucketShardSpec original = new DimensionRangeBucketShardSpec(
+        1,
+        DIMENSIONS,
+        StringTuple.create("start1", "start2"),
+        StringTuple.create("end1", "end2")
+    );
+    final String json = mapper.writeValueAsString(original);
+    final DimensionRangeBucketShardSpec fromJson = (DimensionRangeBucketShardSpec) mapper.readValue(
+        json,
+        ShardSpec.class
+    );
+    Assert.assertEquals(original, fromJson);
+  }
+
+  @Test
+  public void testInvalidStartTupleSize()
+  {
+    expectedException.expect(IllegalArgumentException.class);
+    expectedException.expectMessage(
+        "Start tuple must either be null or of the same size as the number of partition dimensions"
+    );
+
+    new DimensionRangeBucketShardSpec(
+        1,
+        DIMENSIONS,
+        StringTuple.create("a"),
+        null
+    );
+  }
+
+  @Test
+  public void testInvalidEndTupleSize()
+  {
+    expectedException.expect(IllegalArgumentException.class);
+    expectedException.expectMessage(
+        "End tuple must either be null or of the same size as the number of partition dimensions"
+    );
+
+    new DimensionRangeBucketShardSpec(
+        1,
+        DIMENSIONS,
+        StringTuple.create("a", "b"),
+        StringTuple.create("e", "f", "g")
+    );
+  }
+
+  @Test
+  public void testEquals()
+  {
+    EqualsVerifier.forClass(DimensionRangeBucketShardSpec.class).usingGetClass().verify();
+  }
+}
diff --git a/core/src/test/java/org/apache/druid/timeline/partition/DimensionRangeShardSpecTest.java b/core/src/test/java/org/apache/druid/timeline/partition/DimensionRangeShardSpecTest.java
new file mode 100644
index 0000000..8256122
--- /dev/null
+++ b/core/src/test/java/org/apache/druid/timeline/partition/DimensionRangeShardSpecTest.java
@@ -0,0 +1,327 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.timeline.partition;
+
+import org.apache.druid.data.input.InputRow;
+import org.apache.druid.data.input.MapBasedInputRow;
+import org.apache.druid.data.input.StringTuple;
+import org.apache.druid.java.util.common.DateTimes;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class DimensionRangeShardSpecTest
+{
+
+  private final List<String> dimensions = new ArrayList<>();
+
+  @Test
+  public void testIsInChunk()
+  {
+    setDimensions("d1", "d2");
+
+    final DimensionRangeShardSpec shardSpec = new DimensionRangeShardSpec(
+        dimensions,
+        StringTuple.create("India", "Delhi"),
+        StringTuple.create("Spain", "Valencia"),
+        10,
+        null
+    );
+
+    // Verify that entries starting from (India, Delhi) until (Spain, Valencia) are in chunk
+    assertTrue(isInChunk(
+        shardSpec,
+        createRow("India", "Delhi")
+    ));
+    assertTrue(isInChunk(
+        shardSpec,
+        createRow("India", "Kolkata")
+    ));
+    assertTrue(isInChunk(
+        shardSpec,
+        createRow("Japan", "Tokyo")
+    ));
+    assertTrue(isInChunk(
+        shardSpec,
+        createRow("Spain", "Barcelona")
+    ));
+
+    assertFalse(isInChunk(
+        shardSpec,
+        createRow("India", "Bengaluru")
+    ));
+    assertFalse(isInChunk(
+        shardSpec,
+        createRow("Spain", "Valencia")
+    ));
+    assertFalse(isInChunk(
+        shardSpec,
+        createRow("United Kingdom", "London")
+    ));
+  }
+
+  @Test
+  public void testIsInChunk_withNullStart()
+  {
+    setDimensions("d1", "d2");
+
+    final DimensionRangeShardSpec shardSpec = new DimensionRangeShardSpec(
+        dimensions,
+        null,
+        StringTuple.create("Spain", "Valencia"),
+        10,
+        null
+    );
+
+    // Verify that anything before (Spain, Valencia) is in chunk
+    assertTrue(isInChunk(
+        shardSpec,
+        createRow(null, null)
+    ));
+    assertTrue(isInChunk(
+        shardSpec,
+        createRow(null, "Lyon")
+    ));
+    assertTrue(isInChunk(
+        shardSpec,
+        createRow("India", null)
+    ));
+    assertTrue(isInChunk(
+        shardSpec,
+        createRow("India", "Kolkata")
+    ));
+    assertTrue(isInChunk(
+        shardSpec,
+        createRow("Japan", "Tokyo")
+    ));
+    assertTrue(isInChunk(
+        shardSpec,
+        createRow("Spain", "Barcelona")
+    ));
+
+    assertFalse(isInChunk(
+        shardSpec,
+        createRow("Spain", "Valencia")
+    ));
+    assertFalse(isInChunk(
+        shardSpec,
+        createRow("United Kingdom", "London")
+    ));
+  }
+
+  @Test
+  public void testIsInChunk_withNullEnd()
+  {
+    setDimensions("d1", "d2");
+
+    final DimensionRangeShardSpec shardSpec = new DimensionRangeShardSpec(
+        dimensions,
+        StringTuple.create("France", "Lyon"),
+        null,
+        10,
+        null
+    );
+
+    // Verify that anything starting from (France, Lyon) is in chunk
+    assertTrue(isInChunk(
+        shardSpec,
+        createRow("France", "Paris")
+    ));
+    assertTrue(isInChunk(
+        shardSpec,
+        createRow("Japan", "Tokyo")
+    ));
+    assertTrue(isInChunk(
+        shardSpec,
+        createRow("Spain", null)
+    ));
+
+    assertFalse(isInChunk(
+        shardSpec,
+        createRow(null, null)
+    ));
+    assertFalse(isInChunk(
+        shardSpec,
+        createRow("France", null)
+    ));
+    assertFalse(isInChunk(
+        shardSpec,
+        createRow("France", "Bordeaux")
+    ));
+  }
+
+  @Test
+  public void testIsInChunk_withFirstDimEqual()
+  {
+    setDimensions("d1", "d2");
+
+    final DimensionRangeShardSpec shardSpec = new DimensionRangeShardSpec(
+        dimensions,
+        StringTuple.create("France", "Bordeaux"),
+        StringTuple.create("France", "Paris"),
+        10,
+        null
+    );
+
+    // Verify that entries starting from (India, Bengaluru) until (India, Patna) are in chunk
+    assertTrue(isInChunk(
+        shardSpec,
+        createRow("France", "Bordeaux")
+    ));
+    assertTrue(isInChunk(
+        shardSpec,
+        createRow("France", "Lyon")
+    ));
+
+    assertFalse(isInChunk(
+        shardSpec,
+        createRow("France", "Paris")
+    ));
+    assertFalse(isInChunk(
+        shardSpec,
+        createRow("France", "Avignon")
+    ));
+    assertFalse(isInChunk(
+        shardSpec,
+        createRow("France", "Toulouse")
+    ));
+  }
+
+  @Test
+  public void testIsInChunk_withSingleDimension()
+  {
+    setDimensions("d1");
+
+    final DimensionRangeShardSpec shardSpec = new DimensionRangeShardSpec(
+        dimensions,
+        StringTuple.create("India"),
+        StringTuple.create("Spain"),
+        10,
+        null
+    );
+
+    // Verify that entries starting from (India) until (Spain) are in chunk
+    assertTrue(isInChunk(
+        shardSpec,
+        createRow("India")
+    ));
+    assertTrue(isInChunk(
+        shardSpec,
+        createRow("Japan")
+    ));
+    assertTrue(isInChunk(
+        shardSpec,
+        createRow("Malaysia")
+    ));
+
+    assertFalse(isInChunk(
+        shardSpec,
+        createRow("Belgium")
+    ));
+    assertFalse(isInChunk(
+        shardSpec,
+        createRow("Spain")
+    ));
+    assertFalse(isInChunk(
+        shardSpec,
+        createRow("United Kingdom")
+    ));
+  }
+
+  @Test
+  public void testIsInChunk_withMultiValues()
+  {
+    setDimensions("d1", "d2");
+
+    final DimensionRangeShardSpec shardSpec = new DimensionRangeShardSpec(
+        dimensions,
+        StringTuple.create("India", "Delhi"),
+        StringTuple.create("Spain", "Valencia"),
+        10,
+        null
+    );
+
+    // Verify that entries starting from (India, Delhi) until (Spain, Valencia) are in chunk
+    assertTrue(isInChunk(
+        shardSpec,
+        createRow("India", "Delhi")
+    ));
+    assertTrue(isInChunk(
+        shardSpec,
+        createRow("India", "Kolkata")
+    ));
+    assertTrue(isInChunk(
+        shardSpec,
+        createRow("Japan", "Tokyo")
+    ));
+    assertTrue(isInChunk(
+        shardSpec,
+        createRow("Spain", "Barcelona")
+    ));
+
+    assertFalse(isInChunk(
+        shardSpec,
+        createRow("India", "Bengaluru")
+    ));
+    assertFalse(isInChunk(
+        shardSpec,
+        createRow("Spain", "Valencia")
+    ));
+    assertFalse(isInChunk(
+        shardSpec,
+        createRow("United Kingdom", "London")
+    ));
+  }
+
+  /**
+   * Checks if the given InputRow is in the chunk represented by the given shard spec.
+   */
+  private boolean isInChunk(DimensionRangeShardSpec shardSpec, InputRow row)
+  {
+    return DimensionRangeShardSpec.isInChunk(
+        shardSpec.getDimensions(),
+        shardSpec.getStartTuple(),
+        shardSpec.getEndTuple(),
+        row
+    );
+  }
+
+  private void setDimensions(String... dimensionNames)
+  {
+    dimensions.clear();
+    dimensions.addAll(Arrays.asList(dimensionNames));
+  }
+
+  private InputRow createRow(String... values)
+  {
+    Map<String, Object> valueMap = new HashMap<>();
+    for (int i = 0; i < dimensions.size(); ++i) {
+      valueMap.put(dimensions.get(i), values[i]);
+    }
+    return new MapBasedInputRow(DateTimes.nowUtc(), dimensions, valueMap);
+  }
+}
diff --git a/core/src/test/java/org/apache/druid/timeline/partition/PartitionBoundariesTest.java b/core/src/test/java/org/apache/druid/timeline/partition/PartitionBoundariesTest.java
index f564459..e87818c 100644
--- a/core/src/test/java/org/apache/druid/timeline/partition/PartitionBoundariesTest.java
+++ b/core/src/test/java/org/apache/druid/timeline/partition/PartitionBoundariesTest.java
@@ -22,6 +22,7 @@ package org.apache.druid.timeline.partition;
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import nl.jqno.equalsverifier.EqualsVerifier;
+import org.apache.druid.data.input.StringTuple;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
@@ -30,17 +31,23 @@ import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 
+
 public class PartitionBoundariesTest
 {
   private PartitionBoundaries target;
-  private String[] values;
-  private List<String> expected;
+  private StringTuple[] values;
+  private List<StringTuple> expected;
 
   @Before
   public void setup()
   {
-    values = new String[]{"a", "dup", "dup", "z"};
-    expected = Arrays.asList(null, "dup", null);
+    values = new StringTuple[]{
+        StringTuple.create("a"),
+        StringTuple.create("dup"),
+        StringTuple.create("dup"),
+        StringTuple.create("z")
+    };
+    expected = Arrays.asList(null, StringTuple.create("dup"), null);
     target = new PartitionBoundaries(values);
   }
 
@@ -53,13 +60,13 @@ public class PartitionBoundariesTest
   @Test(expected = UnsupportedOperationException.class)
   public void isImmutable()
   {
-    target.add("should fail");
+    target.add(StringTuple.create("should fail"));
   }
 
   @Test
   public void cannotBeIndirectlyModified()
   {
-    values[1] = "changed";
+    values[1] = StringTuple.create("changed");
     Assert.assertEquals(expected, target);
   }
 
@@ -72,7 +79,7 @@ public class PartitionBoundariesTest
   @Test
   public void handlesRepeatedValue()
   {
-    Assert.assertEquals(Arrays.asList(null, null), new PartitionBoundaries("a", "a", "a"));
+    Assert.assertEquals(Arrays.asList(null, null), new PartitionBoundaries(StringTuple.create("a"), StringTuple.create("a"), StringTuple.create("a")));
   }
 
   @Test
diff --git a/core/src/test/java/org/apache/druid/timeline/partition/PartitionHolderCompletenessTest.java b/core/src/test/java/org/apache/druid/timeline/partition/PartitionHolderCompletenessTest.java
index 58e1fbd..8131ea7 100644
--- a/core/src/test/java/org/apache/druid/timeline/partition/PartitionHolderCompletenessTest.java
+++ b/core/src/test/java/org/apache/druid/timeline/partition/PartitionHolderCompletenessTest.java
@@ -21,12 +21,14 @@ package org.apache.druid.timeline.partition;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.collect.ImmutableList;
+import org.apache.druid.data.input.StringTuple;
 import org.apache.druid.java.util.common.StringUtils;
 import org.junit.Assert;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 
+import java.util.Collections;
 import java.util.List;
 
 @RunWith(Parameterized.class)
@@ -76,6 +78,66 @@ public class PartitionHolderCompletenessTest
                 "%s with missing numCorePartitions",
                 SingleDimensionShardSpec.class.getSimpleName()
             )
+        },
+        new Object[]{
+            // Simulate empty range buckets with MultiDimensionShardSpec
+            ImmutableList.of(
+                new DimensionRangeShardSpec(
+                    Collections.singletonList("dim"),
+                    null,
+                    StringTuple.create("aaa"),
+                    0,
+                    3
+                ),
+                new DimensionRangeShardSpec(
+                    Collections.singletonList("dim"),
+                    StringTuple.create("ttt"),
+                    StringTuple.create("zzz"),
+                    2,
+                    3
+                ),
+                new DimensionRangeShardSpec(
+                    Collections.singletonList("dim"),
+                    StringTuple.create("bbb"),
+                    StringTuple.create("fff"),
+                    1,
+                    3
+                )
+            ),
+            StringUtils.format(
+                "%s with empty buckets",
+                DimensionRangeShardSpec.class.getSimpleName()
+            )
+        },
+        new Object[]{
+            // Simulate old format segments with missing numCorePartitions
+            ImmutableList.of(
+                new DimensionRangeShardSpec(
+                    Collections.singletonList("dim"),
+                    StringTuple.create("bbb"),
+                    StringTuple.create("fff"),
+                    1,
+                    null
+                ),
+                new DimensionRangeShardSpec(
+                    Collections.singletonList("dim"),
+                    StringTuple.create("fff"),
+                    null,
+                    2,
+                    null
+                ),
+                new DimensionRangeShardSpec(
+                    Collections.singletonList("dim"),
+                    null,
+                    StringTuple.create("bbb"),
+                    0,
+                    null
+                )
+            ),
+            StringUtils.format(
+                "%s with missing numCorePartitions",
+                DimensionRangeShardSpec.class.getSimpleName()
+            )
         }
     );
   }
diff --git a/core/src/test/java/org/apache/druid/timeline/partition/ShardSpecTestUtils.java b/core/src/test/java/org/apache/druid/timeline/partition/ShardSpecTestUtils.java
index dd613af..3314525 100644
--- a/core/src/test/java/org/apache/druid/timeline/partition/ShardSpecTestUtils.java
+++ b/core/src/test/java/org/apache/druid/timeline/partition/ShardSpecTestUtils.java
@@ -48,4 +48,5 @@ public class ShardSpecTestUtils
   private ShardSpecTestUtils()
   {
   }
+
 }
diff --git a/core/src/test/java/org/apache/druid/timeline/partition/RangeBucketShardSpecTest.java b/core/src/test/java/org/apache/druid/timeline/partition/SingleDimensionRangeBucketShardSpecTest.java
similarity index 77%
rename from core/src/test/java/org/apache/druid/timeline/partition/RangeBucketShardSpecTest.java
rename to core/src/test/java/org/apache/druid/timeline/partition/SingleDimensionRangeBucketShardSpecTest.java
index d2c06e0..fb8d5f4 100644
--- a/core/src/test/java/org/apache/druid/timeline/partition/RangeBucketShardSpecTest.java
+++ b/core/src/test/java/org/apache/druid/timeline/partition/SingleDimensionRangeBucketShardSpecTest.java
@@ -33,14 +33,14 @@ import org.junit.Test;
 
 import java.util.List;
 
-public class RangeBucketShardSpecTest
+public class SingleDimensionRangeBucketShardSpecTest
 {
   @Test
   public void testConvert()
   {
     Assert.assertEquals(
         new BuildingSingleDimensionShardSpec(1, "dim", "start", "end", 5),
-        new RangeBucketShardSpec(1, "dim", "start", "end").convert(5)
+        new SingleDimensionRangeBucketShardSpec(1, "dim", "start", "end").convert(5)
     );
   }
 
@@ -49,7 +49,7 @@ public class RangeBucketShardSpecTest
   {
     Assert.assertEquals(
         new NumberedPartitionChunk<>(1, 0, "test"),
-        new RangeBucketShardSpec(1, "dim", "start", "end").createChunk("test")
+        new SingleDimensionRangeBucketShardSpec(1, "dim", "start", "end").createChunk("test")
     );
   }
 
@@ -57,9 +57,9 @@ public class RangeBucketShardSpecTest
   public void testShardSpecLookup()
   {
     final List<ShardSpec> shardSpecs = ImmutableList.of(
-        new RangeBucketShardSpec(0, "dim", null, "c"),
-        new RangeBucketShardSpec(1, "dim", "f", "i"),
-        new RangeBucketShardSpec(2, "dim", "i", null)
+        new SingleDimensionRangeBucketShardSpec(0, "dim", null, "c"),
+        new SingleDimensionRangeBucketShardSpec(1, "dim", "f", "i"),
+        new SingleDimensionRangeBucketShardSpec(2, "dim", "i", null)
     );
     final ShardSpecLookup lookup = shardSpecs.get(0).getLookup(shardSpecs);
     final long currentTime = DateTimes.nowUtc().getMillis();
@@ -99,17 +99,17 @@ public class RangeBucketShardSpecTest
   public void testSerde() throws JsonProcessingException
   {
     final ObjectMapper mapper = ShardSpecTestUtils.initObjectMapper();
-    mapper.registerSubtypes(new NamedType(RangeBucketShardSpec.class, RangeBucketShardSpec.TYPE));
+    mapper.registerSubtypes(new NamedType(SingleDimensionRangeBucketShardSpec.class, SingleDimensionRangeBucketShardSpec.TYPE));
     mapper.setInjectableValues(new Std().addValue(ObjectMapper.class, mapper));
-    final RangeBucketShardSpec original = new RangeBucketShardSpec(1, "dim", "start", "end");
+    final SingleDimensionRangeBucketShardSpec original = new SingleDimensionRangeBucketShardSpec(1, "dim", "start", "end");
     final String json = mapper.writeValueAsString(original);
-    final RangeBucketShardSpec fromJson = (RangeBucketShardSpec) mapper.readValue(json, ShardSpec.class);
+    final SingleDimensionRangeBucketShardSpec fromJson = (SingleDimensionRangeBucketShardSpec) mapper.readValue(json, ShardSpec.class);
     Assert.assertEquals(original, fromJson);
   }
 
   @Test
   public void testEquals()
   {
-    EqualsVerifier.forClass(RangeBucketShardSpec.class).usingGetClass().verify();
+    EqualsVerifier.forClass(SingleDimensionRangeBucketShardSpec.class).usingGetClass().verify();
   }
 }
diff --git a/core/src/test/java/org/apache/druid/timeline/partition/SingleDimensionShardSpecTest.java b/core/src/test/java/org/apache/druid/timeline/partition/SingleDimensionShardSpecTest.java
index a579245..1a05f12 100644
--- a/core/src/test/java/org/apache/druid/timeline/partition/SingleDimensionShardSpecTest.java
+++ b/core/src/test/java/org/apache/druid/timeline/partition/SingleDimensionShardSpecTest.java
@@ -19,6 +19,7 @@
 
 package org.apache.druid.timeline.partition;
 
+import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
@@ -33,6 +34,7 @@ import org.apache.druid.java.util.common.StringUtils;
 import org.junit.Assert;
 import org.junit.Test;
 
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
@@ -41,6 +43,8 @@ import java.util.Map;
  */
 public class SingleDimensionShardSpecTest
 {
+  private final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+
   @Test
   public void testIsInChunk()
   {
@@ -150,6 +154,22 @@ public class SingleDimensionShardSpecTest
     Assert.assertFalse(shardSpec.sharePartitionSpace(new NumberedOverwritePartialShardSpec(0, 2, 1)));
   }
 
+  @Test
+  public void testSerde() throws IOException
+  {
+    testSerde(new SingleDimensionShardSpec("dim", null, null, 10, null));
+    testSerde(new SingleDimensionShardSpec("dim", "abc", null, 5, 10));
+    testSerde(new SingleDimensionShardSpec("dim", null, "xyz", 10, 1));
+    testSerde(new SingleDimensionShardSpec("dim", "abc", "xyz", 10, null));
+  }
+
+  private void testSerde(SingleDimensionShardSpec shardSpec) throws IOException
+  {
+    String json = OBJECT_MAPPER.writeValueAsString(shardSpec);
+    SingleDimensionShardSpec deserializedSpec = OBJECT_MAPPER.readValue(json, SingleDimensionShardSpec.class);
+    Assert.assertEquals(shardSpec, deserializedSpec);
+  }
+
   private static RangeSet<String> rangeSet(List<Range<String>> ranges)
   {
     ImmutableRangeSet.Builder<String> builder = ImmutableRangeSet.builder();
diff --git a/core/src/test/java/org/apache/druid/timeline/partition/StringPartitionChunkTest.java b/core/src/test/java/org/apache/druid/timeline/partition/StringPartitionChunkTest.java
index e3f9846..63a8c27 100644
--- a/core/src/test/java/org/apache/druid/timeline/partition/StringPartitionChunkTest.java
+++ b/core/src/test/java/org/apache/druid/timeline/partition/StringPartitionChunkTest.java
@@ -19,92 +19,196 @@
 
 package org.apache.druid.timeline.partition;
 
+import org.apache.druid.data.input.StringTuple;
 import org.junit.Assert;
 import org.junit.Test;
 
+
 public class StringPartitionChunkTest
 {
   @Test
   public void testAbuts()
   {
-    StringPartitionChunk<Integer> lhs = StringPartitionChunk.make(null, "10", 0, 1);
+    // Test with multiple dimensions
+    StringPartitionChunk<Integer> lhs = StringPartitionChunk.make(null, StringTuple.create("10", "abc"), 0, 1);
 
-    Assert.assertTrue(lhs.abuts(StringPartitionChunk.make("10", null, 1, 2)));
-    Assert.assertFalse(lhs.abuts(StringPartitionChunk.make("11", null, 2, 3)));
+    Assert.assertTrue(lhs.abuts(StringPartitionChunk.make(StringTuple.create("10", "abc"), null, 1, 2)));
+    Assert.assertFalse(lhs.abuts(StringPartitionChunk.make(StringTuple.create("10", "xyz"), null, 2, 3)));
+    Assert.assertFalse(lhs.abuts(StringPartitionChunk.make(StringTuple.create("11", "abc"), null, 2, 3)));
     Assert.assertFalse(lhs.abuts(StringPartitionChunk.make(null, null, 3, 4)));
 
+    // Test with single dimension
+    lhs = StringPartitionChunk.makeForSingleDimension(null, "10", 0, 1);
+
+    Assert.assertTrue(lhs.abuts(StringPartitionChunk.makeForSingleDimension("10", null, 1, 2)));
+    Assert.assertFalse(lhs.abuts(StringPartitionChunk.makeForSingleDimension("11", null, 2, 3)));
+    Assert.assertFalse(lhs.abuts(StringPartitionChunk.makeForSingleDimension(null, null, 3, 4)));
+
     Assert.assertFalse(StringPartitionChunk.make(null, null, 0, 1).abuts(StringPartitionChunk.make(null, null, 1, 2)));
   }
 
   @Test
   public void testIsStart()
   {
-    Assert.assertTrue(StringPartitionChunk.make(null, "10", 0, 1).isStart());
-    Assert.assertFalse(StringPartitionChunk.make("10", null, 0, 1).isStart());
-    Assert.assertFalse(StringPartitionChunk.make("10", "11", 0, 1).isStart());
-    Assert.assertTrue(StringPartitionChunk.make(null, null, 0, 1).isStart());
+    // Test with multiple dimensions
+    Assert.assertTrue(StringPartitionChunk.make(null, StringTuple.create("10", "abc"), 0, 1).isStart());
+    Assert.assertFalse(StringPartitionChunk.make(StringTuple.create("10", "abc"), null, 0, 1).isStart());
+    Assert.assertFalse(
+        StringPartitionChunk.make(
+            StringTuple.create("10", "abc"),
+            StringTuple.create("11", "def"),
+            0,
+            1
+        ).isStart()
+    );
+
+    // Test with a single dimension
+    Assert.assertTrue(StringPartitionChunk.makeForSingleDimension(null, "10", 0, 1).isStart());
+    Assert.assertFalse(StringPartitionChunk.makeForSingleDimension("10", null, 0, 1).isStart());
+    Assert.assertFalse(StringPartitionChunk.makeForSingleDimension("10", "11", 0, 1).isStart());
+    Assert.assertTrue(StringPartitionChunk.makeForSingleDimension(null, null, 0, 1).isStart());
   }
 
   @Test
   public void testIsEnd()
   {
-    Assert.assertFalse(StringPartitionChunk.make(null, "10", 0, 1).isEnd());
-    Assert.assertTrue(StringPartitionChunk.make("10", null, 0, 1).isEnd());
-    Assert.assertFalse(StringPartitionChunk.make("10", "11", 0, 1).isEnd());
-    Assert.assertTrue(StringPartitionChunk.make(null, null, 0, 1).isEnd());
+    // Test with multiple dimensions
+    Assert.assertFalse(StringPartitionChunk.make(null, StringTuple.create("10", "abc"), 0, 1).isEnd());
+    Assert.assertTrue(StringPartitionChunk.make(StringTuple.create("10", "abc"), null, 0, 1).isEnd());
+    Assert.assertFalse(
+        StringPartitionChunk.make(
+            StringTuple.create("10", "abc"),
+            StringTuple.create("11", "def"),
+            0,
+            1
+        ).isEnd()
+    );
+
+    // Test with a single dimension
+    Assert.assertFalse(StringPartitionChunk.makeForSingleDimension(null, "10", 0, 1).isEnd());
+    Assert.assertTrue(StringPartitionChunk.makeForSingleDimension("10", null, 0, 1).isEnd());
+    Assert.assertFalse(StringPartitionChunk.makeForSingleDimension("10", "11", 0, 1).isEnd());
+    Assert.assertTrue(StringPartitionChunk.makeForSingleDimension(null, null, 0, 1).isEnd());
   }
 
   @Test
   public void testCompareTo()
   {
+    // Test with multiple dimensions
     Assert.assertEquals(
         0,
-        StringPartitionChunk.make(null, null, 0, 1)
-                            .compareTo(StringPartitionChunk.make(null, null, 0, 2))
+        StringPartitionChunk.make(null, null, 0, 1).compareTo(
+            StringPartitionChunk.make(null, null, 0, 2)
+        )
     );
     Assert.assertEquals(
         0,
-        StringPartitionChunk.make("10", null, 0, 1)
-                            .compareTo(StringPartitionChunk.make("10", null, 0, 2))
+        StringPartitionChunk.make(StringTuple.create("10", "abc"), null, 0, 1).compareTo(
+            StringPartitionChunk.make(StringTuple.create("10", "abc"), null, 0, 2)
+        )
     );
     Assert.assertEquals(
         0,
-        StringPartitionChunk.make(null, "10", 1, 1)
-                            .compareTo(StringPartitionChunk.make(null, "10", 1, 2))
+        StringPartitionChunk.make(null, StringTuple.create("10", "abc"), 1, 1).compareTo(
+            StringPartitionChunk.make(null, StringTuple.create("10", "abc"), 1, 2)
+        )
     );
     Assert.assertEquals(
         0,
-        StringPartitionChunk.make("10", "11", 1, 1)
-                            .compareTo(StringPartitionChunk.make("10", "11", 1, 2))
+        StringPartitionChunk.make(StringTuple.create("10", "abc"), StringTuple.create("11", "aa"), 1, 1).compareTo(
+            StringPartitionChunk.make(StringTuple.create("10", "abc"), StringTuple.create("11", "aa"), 1, 2)
+        )
     );
     Assert.assertEquals(
         -1,
-        StringPartitionChunk.make(null, "10", 0, 1)
-                            .compareTo(StringPartitionChunk.make("10", null, 1, 2))
+        StringPartitionChunk.make(null, StringTuple.create("10", "abc"), 0, 1).compareTo(
+            StringPartitionChunk.make(StringTuple.create("10", "abc"), null, 1, 2)
+        )
     );
     Assert.assertEquals(
         -1,
-        StringPartitionChunk.make("11", "20", 0, 1)
-                            .compareTo(StringPartitionChunk.make("20", "33", 1, 1))
+        StringPartitionChunk.make(StringTuple.create("11", "b"), StringTuple.create("20", "a"), 0, 1).compareTo(
+            StringPartitionChunk.make(StringTuple.create("20", "a"), StringTuple.create("33", "z"), 1, 1)
+        )
     );
     Assert.assertEquals(
         1,
-        StringPartitionChunk.make("20", "33", 1, 1)
-                            .compareTo(StringPartitionChunk.make("11", "20", 0, 1))
+        StringPartitionChunk.make(StringTuple.create("20", "a"), StringTuple.create("33", "z"), 1, 1).compareTo(
+            StringPartitionChunk.make(StringTuple.create("11", "b"), StringTuple.create("20", "a"), 0, 1)
+        )
+    );
+
+    // Test with a single dimension
+    Assert.assertEquals(
+        0,
+        StringPartitionChunk.makeForSingleDimension(null, null, 0, 1)
+                            .compareTo(StringPartitionChunk.makeForSingleDimension(null, null, 0, 2))
+    );
+    Assert.assertEquals(
+        0,
+        StringPartitionChunk.makeForSingleDimension("10", null, 0, 1)
+                            .compareTo(StringPartitionChunk.makeForSingleDimension("10", null, 0, 2))
+    );
+    Assert.assertEquals(
+        0,
+        StringPartitionChunk.makeForSingleDimension(null, "10", 1, 1)
+                            .compareTo(StringPartitionChunk.makeForSingleDimension(null, "10", 1, 2))
+    );
+    Assert.assertEquals(
+        0,
+        StringPartitionChunk.makeForSingleDimension("10", "11", 1, 1)
+                            .compareTo(StringPartitionChunk.makeForSingleDimension("10", "11", 1, 2))
+    );
+    Assert.assertEquals(
+        -1,
+        StringPartitionChunk.makeForSingleDimension(null, "10", 0, 1)
+                            .compareTo(StringPartitionChunk.makeForSingleDimension("10", null, 1, 2))
+    );
+    Assert.assertEquals(
+        -1,
+        StringPartitionChunk.makeForSingleDimension("11", "20", 0, 1)
+                            .compareTo(StringPartitionChunk.makeForSingleDimension("20", "33", 1, 1))
     );
     Assert.assertEquals(
         1,
-        StringPartitionChunk.make("10", null, 1, 1)
-                            .compareTo(StringPartitionChunk.make(null, "10", 0, 1))
+        StringPartitionChunk.makeForSingleDimension("20", "33", 1, 1)
+                            .compareTo(StringPartitionChunk.makeForSingleDimension("11", "20", 0, 1))
+    );
+    Assert.assertEquals(
+        1,
+        StringPartitionChunk.makeForSingleDimension("10", null, 1, 1)
+                            .compareTo(StringPartitionChunk.makeForSingleDimension(null, "10", 0, 1))
     );
   }
 
   @Test
   public void testEquals()
   {
-    Assert.assertEquals(StringPartitionChunk.make(null, null, 0, 1), StringPartitionChunk.make(null, null, 0, 1));
-    Assert.assertEquals(StringPartitionChunk.make(null, "10", 0, 1), StringPartitionChunk.make(null, "10", 0, 1));
-    Assert.assertEquals(StringPartitionChunk.make("10", null, 0, 1), StringPartitionChunk.make("10", null, 0, 1));
-    Assert.assertEquals(StringPartitionChunk.make("10", "11", 0, 1), StringPartitionChunk.make("10", "11", 0, 1));
+    Assert.assertEquals(
+        StringPartitionChunk.makeForSingleDimension(null, null, 0, 1),
+        StringPartitionChunk.makeForSingleDimension(null, null, 0, 1)
+    );
+    Assert.assertEquals(
+        StringPartitionChunk.makeForSingleDimension(null, "10", 0, 1),
+        StringPartitionChunk.makeForSingleDimension(null, "10", 0, 1)
+    );
+    Assert.assertEquals(
+        StringPartitionChunk.makeForSingleDimension("10", null, 0, 1),
+        StringPartitionChunk.makeForSingleDimension("10", null, 0, 1)
+    );
+    Assert.assertEquals(
+        StringPartitionChunk.makeForSingleDimension("10", "11", 0, 1),
+        StringPartitionChunk.makeForSingleDimension("10", "11", 0, 1)
+    );
+  }
+
+  @Test
+  public void testMakeForSingleDimension()
+  {
+    StringPartitionChunk<Integer> chunk = StringPartitionChunk
+        .makeForSingleDimension("a", null, 0, 1);
+    Assert.assertEquals(0, chunk.getChunkNumber());
+    Assert.assertTrue(chunk.isEnd());
+    Assert.assertFalse(chunk.isStart());
   }
 }
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java
index 201a48e..ec5a5df 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java
@@ -39,9 +39,9 @@ import org.apache.druid.data.input.InputSource;
 import org.apache.druid.indexer.IngestionState;
 import org.apache.druid.indexer.TaskState;
 import org.apache.druid.indexer.TaskStatus;
+import org.apache.druid.indexer.partitions.DimensionRangePartitionsSpec;
 import org.apache.druid.indexer.partitions.HashedPartitionsSpec;
 import org.apache.druid.indexer.partitions.PartitionsSpec;
-import org.apache.druid.indexer.partitions.SingleDimensionPartitionsSpec;
 import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReport;
 import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReportData;
 import org.apache.druid.indexing.common.TaskReport;
@@ -529,7 +529,7 @@ public class ParallelIndexSupervisorTask extends AbstractBatchIndexTask implemen
 
   private static boolean useRangePartitions(ParallelIndexTuningConfig tuningConfig)
   {
-    return tuningConfig.getGivenOrDefaultPartitionsSpec() instanceof SingleDimensionPartitionsSpec;
+    return tuningConfig.getGivenOrDefaultPartitionsSpec() instanceof DimensionRangePartitionsSpec;
   }
 
   private boolean isParallelMode()
@@ -900,8 +900,8 @@ public class ParallelIndexSupervisorTask extends AbstractBatchIndexTask implemen
     distributions.forEach(distributionMerger::merge);
     StringDistribution mergedDistribution = distributionMerger.getResult();
 
-    SingleDimensionPartitionsSpec partitionsSpec =
-        (SingleDimensionPartitionsSpec) ingestionSchema.getTuningConfig().getGivenOrDefaultPartitionsSpec();
+    DimensionRangePartitionsSpec partitionsSpec =
+        (DimensionRangePartitionsSpec) ingestionSchema.getTuningConfig().getGivenOrDefaultPartitionsSpec();
 
     final PartitionBoundaries partitions;
     Integer targetRowsPerSegment = partitionsSpec.getTargetRowsPerSegment();
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexTuningConfig.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexTuningConfig.java
index f474c1e..af90693 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexTuningConfig.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexTuningConfig.java
@@ -23,8 +23,8 @@ import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.google.common.base.Preconditions;
 import org.apache.druid.data.input.SplitHintSpec;
+import org.apache.druid.indexer.partitions.DimensionRangePartitionsSpec;
 import org.apache.druid.indexer.partitions.PartitionsSpec;
-import org.apache.druid.indexer.partitions.SingleDimensionPartitionsSpec;
 import org.apache.druid.indexing.common.task.IndexTask.IndexTuningConfig;
 import org.apache.druid.java.util.common.IAE;
 import org.apache.druid.segment.IndexSpec;
@@ -34,6 +34,7 @@ import org.joda.time.Duration;
 import org.joda.time.Period;
 
 import javax.annotation.Nullable;
+import java.util.List;
 import java.util.Objects;
 
 public class ParallelIndexTuningConfig extends IndexTuningConfig
@@ -199,9 +200,10 @@ public class ParallelIndexTuningConfig extends IndexTuningConfig
     Preconditions.checkArgument(this.maxNumConcurrentSubTasks > 0, "maxNumConcurrentSubTasks must be positive");
     Preconditions.checkArgument(this.maxNumSegmentsToMerge > 0, "maxNumSegmentsToMerge must be positive");
     Preconditions.checkArgument(this.totalNumMergeTasks > 0, "totalNumMergeTasks must be positive");
-    if (getPartitionsSpec() != null && getPartitionsSpec() instanceof SingleDimensionPartitionsSpec) {
-      if (((SingleDimensionPartitionsSpec) getPartitionsSpec()).getPartitionDimension() == null) {
-        throw new IAE("partitionDimension must be specified");
+    if (getPartitionsSpec() != null && getPartitionsSpec() instanceof DimensionRangePartitionsSpec) {
+      List<String> partitionDimensions = ((DimensionRangePartitionsSpec) getPartitionsSpec()).getPartitionDimensions();
+      if (partitionDimensions == null || partitionDimensions.isEmpty()) {
+        throw new IAE("partitionDimensions must be specified");
       }
     }
   }
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialDimensionDistributionTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialDimensionDistributionTask.java
index 41f9bf6..d0cce9d 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialDimensionDistributionTask.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialDimensionDistributionTask.java
@@ -32,8 +32,9 @@ import org.apache.druid.data.input.InputFormat;
 import org.apache.druid.data.input.InputRow;
 import org.apache.druid.data.input.InputSource;
 import org.apache.druid.data.input.Rows;
+import org.apache.druid.data.input.StringTuple;
 import org.apache.druid.indexer.TaskStatus;
-import org.apache.druid.indexer.partitions.SingleDimensionPartitionsSpec;
+import org.apache.druid.indexer.partitions.DimensionRangePartitionsSpec;
 import org.apache.druid.indexing.common.TaskToolbox;
 import org.apache.druid.indexing.common.actions.SurrogateTaskActionClient;
 import org.apache.druid.indexing.common.actions.TaskActionClient;
@@ -130,9 +131,9 @@ public class PartialDimensionDistributionTask extends PerfectRollupWorkerTask
     );
 
     Preconditions.checkArgument(
-        ingestionSchema.getTuningConfig().getPartitionsSpec() instanceof SingleDimensionPartitionsSpec,
+        ingestionSchema.getTuningConfig().getPartitionsSpec() instanceof DimensionRangePartitionsSpec,
         "%s partitionsSpec required",
-        SingleDimensionPartitionsSpec.NAME
+        DimensionRangePartitionsSpec.NAME
     );
 
     this.subtaskSpecId = subtaskSpecId;
@@ -193,10 +194,13 @@ public class PartialDimensionDistributionTask extends PerfectRollupWorkerTask
     GranularitySpec granularitySpec = dataSchema.getGranularitySpec();
     ParallelIndexTuningConfig tuningConfig = ingestionSchema.getTuningConfig();
 
-    SingleDimensionPartitionsSpec partitionsSpec = (SingleDimensionPartitionsSpec) tuningConfig.getPartitionsSpec();
+    DimensionRangePartitionsSpec partitionsSpec = (DimensionRangePartitionsSpec) tuningConfig.getPartitionsSpec();
     Preconditions.checkNotNull(partitionsSpec, "partitionsSpec required in tuningConfig");
-    String partitionDimension = partitionsSpec.getPartitionDimension();
-    Preconditions.checkNotNull(partitionDimension, "partitionDimension required in partitionsSpec");
+    final List<String> partitionDimensions = partitionsSpec.getPartitionDimensions();
+    Preconditions.checkArgument(
+        partitionDimensions != null && !partitionDimensions.isEmpty(),
+        "partitionDimension required in partitionsSpec"
+    );
     boolean isAssumeGrouped = partitionsSpec.isAssumeGrouped();
 
     InputSource inputSource = ingestionSchema.getIOConfig().getNonNullInputSource(
@@ -225,7 +229,7 @@ public class PartialDimensionDistributionTask extends PerfectRollupWorkerTask
             parseExceptionHandler
         );
         HandlingInputRowIterator iterator =
-            new RangePartitionIndexTaskInputRowIteratorBuilder(partitionDimension, SKIP_NULL)
+            new RangePartitionIndexTaskInputRowIteratorBuilder(partitionDimensions, SKIP_NULL)
                 .delegate(inputRowIterator)
                 .granularitySpec(granularitySpec)
                 .build()
@@ -233,7 +237,7 @@ public class PartialDimensionDistributionTask extends PerfectRollupWorkerTask
       Map<Interval, StringDistribution> distribution = determineDistribution(
           iterator,
           granularitySpec,
-          partitionDimension,
+          partitionDimensions,
           isAssumeGrouped
       );
       sendReport(toolbox, new DimensionDistributionReport(getId(), distribution));
@@ -245,7 +249,7 @@ public class PartialDimensionDistributionTask extends PerfectRollupWorkerTask
   private Map<Interval, StringDistribution> determineDistribution(
       HandlingInputRowIterator inputRowIterator,
       GranularitySpec granularitySpec,
-      String partitionDimension,
+      List<String> partitionDimensions,
       boolean isAssumeGrouped
   )
   {
@@ -270,12 +274,18 @@ public class PartialDimensionDistributionTask extends PerfectRollupWorkerTask
         assert optInterval.isPresent();
         interval = optInterval.get();
       }
-      String partitionDimensionValue = Iterables.getOnlyElement(inputRow.getDimension(partitionDimension));
+      String[] values = new String[partitionDimensions.size()];
+      for (int i = 0; i < partitionDimensions.size(); ++i) {
+        values[i] = Iterables.getOnlyElement(
+            inputRow.getDimension(partitionDimensions.get(i))
+        );
+      }
+      final StringTuple partitionDimensionValues = StringTuple.create(values);
 
-      if (inputRowFilter.accept(interval, partitionDimensionValue, inputRow)) {
+      if (inputRowFilter.accept(interval, partitionDimensionValues, inputRow)) {
         StringDistribution stringDistribution =
             intervalToDistribution.computeIfAbsent(interval, k -> new StringSketch());
-        stringDistribution.put(partitionDimensionValue);
+        stringDistribution.put(partitionDimensionValues);
       }
     }
 
@@ -306,17 +316,17 @@ public class PartialDimensionDistributionTask extends PerfectRollupWorkerTask
     /**
      * @return True if input row should be accepted, else false
      */
-    boolean accept(Interval interval, String partitionDimensionValue, InputRow inputRow);
+    boolean accept(Interval interval, StringTuple partitionDimensionValues, InputRow inputRow);
 
     /**
      * @return Minimum partition dimension value for each interval processed so far.
      */
-    Map<Interval, String> getIntervalToMinPartitionDimensionValue();
+    Map<Interval, StringTuple> getIntervalToMinPartitionDimensionValue();
 
     /**
      * @return Maximum partition dimension value for each interval processed so far.
      */
-    Map<Interval, String> getIntervalToMaxPartitionDimensionValue();
+    Map<Interval, StringTuple> getIntervalToMaxPartitionDimensionValue();
   }
 
   /**
@@ -362,7 +372,7 @@ public class PartialDimensionDistributionTask extends PerfectRollupWorkerTask
     }
 
     @Override
-    public boolean accept(Interval interval, String partitionDimensionValue, InputRow inputRow)
+    public boolean accept(Interval interval, StringTuple partitionDimensionValue, InputRow inputRow)
     {
       delegate.accept(interval, partitionDimensionValue, inputRow);
 
@@ -384,13 +394,13 @@ public class PartialDimensionDistributionTask extends PerfectRollupWorkerTask
     }
 
     @Override
-    public Map<Interval, String> getIntervalToMinPartitionDimensionValue()
+    public Map<Interval, StringTuple> getIntervalToMinPartitionDimensionValue()
     {
       return delegate.getIntervalToMinPartitionDimensionValue();
     }
 
     @Override
-    public Map<Interval, String> getIntervalToMaxPartitionDimensionValue()
+    public Map<Interval, StringTuple> getIntervalToMaxPartitionDimensionValue()
     {
       return delegate.getIntervalToMaxPartitionDimensionValue();
     }
@@ -402,8 +412,8 @@ public class PartialDimensionDistributionTask extends PerfectRollupWorkerTask
    */
   private static class PassthroughInputRowFilter implements InputRowFilter
   {
-    private final Map<Interval, String> intervalToMinDimensionValue;
-    private final Map<Interval, String> intervalToMaxDimensionValue;
+    private final Map<Interval, StringTuple> intervalToMinDimensionValue;
+    private final Map<Interval, StringTuple> intervalToMaxDimensionValue;
 
     PassthroughInputRowFilter()
     {
@@ -412,19 +422,19 @@ public class PartialDimensionDistributionTask extends PerfectRollupWorkerTask
     }
 
     @Override
-    public boolean accept(Interval interval, String partitionDimensionValue, InputRow inputRow)
+    public boolean accept(Interval interval, StringTuple partitionDimensionValue, InputRow inputRow)
     {
       updateMinDimensionValue(interval, partitionDimensionValue);
       updateMaxDimensionValue(interval, partitionDimensionValue);
       return true;
     }
 
-    private void updateMinDimensionValue(Interval interval, String dimensionValue)
+    private void updateMinDimensionValue(Interval interval, StringTuple dimensionValue)
     {
       intervalToMinDimensionValue.compute(
           interval,
           (intervalKey, currentMinValue) -> {
-            if (currentMinValue == null || dimensionValue.compareTo(currentMinValue) < 0) {
+            if (currentMinValue == null || currentMinValue.compareTo(dimensionValue) > 0) {
               return dimensionValue;
             } else {
               return currentMinValue;
@@ -433,12 +443,12 @@ public class PartialDimensionDistributionTask extends PerfectRollupWorkerTask
       );
     }
 
-    private void updateMaxDimensionValue(Interval interval, String dimensionValue)
+    private void updateMaxDimensionValue(Interval interval, StringTuple dimensionValue)
     {
       intervalToMaxDimensionValue.compute(
           interval,
           (intervalKey, currentMaxValue) -> {
-            if (currentMaxValue == null || dimensionValue.compareTo(currentMaxValue) > 0) {
+            if (currentMaxValue == null || currentMaxValue.compareTo(dimensionValue) < 0) {
               return dimensionValue;
             } else {
               return currentMaxValue;
@@ -448,13 +458,13 @@ public class PartialDimensionDistributionTask extends PerfectRollupWorkerTask
     }
 
     @Override
-    public Map<Interval, String> getIntervalToMinPartitionDimensionValue()
+    public Map<Interval, StringTuple> getIntervalToMinPartitionDimensionValue()
     {
       return intervalToMinDimensionValue;
     }
 
     @Override
-    public Map<Interval, String> getIntervalToMaxPartitionDimensionValue()
+    public Map<Interval, StringTuple> getIntervalToMaxPartitionDimensionValue()
     {
       return intervalToMaxDimensionValue;
     }
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialRangeSegmentGenerateTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialRangeSegmentGenerateTask.java
index 61287e0..f36f4e8 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialRangeSegmentGenerateTask.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialRangeSegmentGenerateTask.java
@@ -22,6 +22,7 @@ package org.apache.druid.indexing.common.task.batch.parallel;
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.google.common.base.Preconditions;
+import org.apache.druid.indexer.partitions.DimensionRangePartitionsSpec;
 import org.apache.druid.indexer.partitions.PartitionsSpec;
 import org.apache.druid.indexer.partitions.SingleDimensionPartitionsSpec;
 import org.apache.druid.indexing.common.TaskToolbox;
@@ -45,7 +46,7 @@ import java.util.stream.Collectors;
 
 /**
  * The worker task of {@link PartialRangeSegmentGenerateParallelIndexTaskRunner}. This task partitions input data by
- * ranges of the partition dimension specified in {@link SingleDimensionPartitionsSpec}. Partitioned segments are stored
+ * ranges of the partition dimension specified in {@link DimensionRangePartitionsSpec}. Partitioned segments are stored
  * in local storage using {@link ShuffleDataSegmentPusher}.
  */
 public class PartialRangeSegmentGenerateTask extends PartialSegmentGenerateTask<GeneratedPartitionsMetadataReport>
@@ -82,7 +83,7 @@ public class PartialRangeSegmentGenerateTask extends PartialSegmentGenerateTask<
         supervisorTaskId,
         ingestionSchema,
         context,
-        new RangePartitionIndexTaskInputRowIteratorBuilder(getPartitionDimension(ingestionSchema), !SKIP_NULL)
+        new RangePartitionIndexTaskInputRowIteratorBuilder(getPartitionDimensions(ingestionSchema), !SKIP_NULL)
     );
 
     this.subtaskSpecId = subtaskSpecId;
@@ -92,20 +93,21 @@ public class PartialRangeSegmentGenerateTask extends PartialSegmentGenerateTask<
     this.intervalToPartitions = intervalToPartitions;
   }
 
-  private static String getPartitionDimension(ParallelIndexIngestionSpec ingestionSpec)
+  private static List<String> getPartitionDimensions(ParallelIndexIngestionSpec ingestionSpec)
   {
     PartitionsSpec partitionsSpec = ingestionSpec.getTuningConfig().getPartitionsSpec();
     Preconditions.checkArgument(
-        partitionsSpec instanceof SingleDimensionPartitionsSpec,
-        "%s partitionsSpec required",
+        partitionsSpec instanceof DimensionRangePartitionsSpec,
+        "%s or %s partitionsSpec required",
+        DimensionRangePartitionsSpec.NAME,
         SingleDimensionPartitionsSpec.NAME
     );
 
-    SingleDimensionPartitionsSpec singleDimPartitionsSpec = (SingleDimensionPartitionsSpec) partitionsSpec;
-    String partitionDimension = singleDimPartitionsSpec.getPartitionDimension();
-    Preconditions.checkNotNull(partitionDimension, "partitionDimension required");
+    DimensionRangePartitionsSpec multiDimPartitionsSpec = (DimensionRangePartitionsSpec) partitionsSpec;
+    List<String> partitionDimensions = multiDimPartitionsSpec.getPartitionDimensions();
+    Preconditions.checkNotNull(partitionDimensions, "partitionDimension required");
 
-    return partitionDimension;
+    return partitionDimensions;
   }
 
   @JsonProperty
@@ -159,7 +161,7 @@ public class PartialRangeSegmentGenerateTask extends PartialSegmentGenerateTask<
       throws IOException
   {
     final RangePartitionAnalysis partitionAnalysis = new RangePartitionAnalysis(
-        (SingleDimensionPartitionsSpec) ingestionSchema.getTuningConfig().getPartitionsSpec()
+        (DimensionRangePartitionsSpec) ingestionSchema.getTuningConfig().getPartitionsSpec()
     );
     intervalToPartitions.forEach(partitionAnalysis::updateBucket);
     return SegmentAllocators.forNonLinearPartitioning(
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/distribution/ArrayOfStringTuplesSerDe.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/distribution/ArrayOfStringTuplesSerDe.java
new file mode 100644
index 0000000..1035acb
--- /dev/null
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/distribution/ArrayOfStringTuplesSerDe.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.common.task.batch.parallel.distribution;
+
+import org.apache.datasketches.ArrayOfItemsSerDe;
+import org.apache.datasketches.ArrayOfStringsSerDe;
+import org.apache.datasketches.memory.Memory;
+import org.apache.datasketches.memory.WritableMemory;
+import org.apache.datasketches.memory.internal.UnsafeUtil;
+import org.apache.druid.data.input.StringTuple;
+
+/**
+ * Serde for {@link StringTuple}.
+ * <p>
+ * Implementation similar to {@link ArrayOfStringsSerDe}.
+ */
+public class ArrayOfStringTuplesSerDe extends ArrayOfItemsSerDe<StringTuple>
+{
+  private static final ArrayOfStringsSerDe STRINGS_SERDE = new ArrayOfStringsSerDe();
+
+  @Override
+  public byte[] serializeToByteArray(StringTuple[] items)
+  {
+    int length = 0;
+    final byte[][] itemsBytes = new byte[items.length][];
+    for (int i = 0; i < items.length; i++) {
+      // Get the byte contents of the StringTuple
+      itemsBytes[i] = STRINGS_SERDE.serializeToByteArray(items[i].toArray());
+
+      // Overall byte representation contains number of items, size of content, byte contents
+      length += Integer.BYTES + Integer.BYTES + itemsBytes[i].length;
+    }
+
+    final byte[] bytes = new byte[length];
+    final WritableMemory mem = WritableMemory.writableWrap(bytes);
+    long offsetBytes = 0;
+    for (int i = 0; i < items.length; i++) {
+      // Add the number of items in the StringTuple
+      mem.putInt(offsetBytes, items[i].size());
+      offsetBytes += Integer.BYTES;
+
+      // Add the size of byte content for the StringTuple
+      mem.putInt(offsetBytes, itemsBytes[i].length);
+      offsetBytes += Integer.BYTES;
+
+      // Add the byte contents of the StringTuple
+      mem.putByteArray(offsetBytes, itemsBytes[i], 0, itemsBytes[i].length);
+      offsetBytes += itemsBytes[i].length;
+    }
+    return bytes;
+  }
+
+  @Override
+  public StringTuple[] deserializeFromMemory(Memory mem, int numItems)
+  {
+    final StringTuple[] array = new StringTuple[numItems];
+    long offsetBytes = 0;
+    for (int i = 0; i < numItems; i++) {
+      // Read the number of items in the StringTuple
+      UnsafeUtil.checkBounds(offsetBytes, Integer.BYTES, mem.getCapacity());
+      final int numItemsInTuple = mem.getInt(offsetBytes);
+      offsetBytes += Integer.BYTES;
+
+      // Read the size of byte content
+      UnsafeUtil.checkBounds(offsetBytes, Integer.BYTES, mem.getCapacity());
+      final int byteContentSize = mem.getInt(offsetBytes);
+      offsetBytes += Integer.BYTES;
+
+      // Read the byte content
+      final byte[] byteContent = new byte[byteContentSize];
+      UnsafeUtil.checkBounds(offsetBytes, byteContentSize, mem.getCapacity());
+      mem.getByteArray(offsetBytes, byteContent, 0, byteContentSize);
+      offsetBytes += byteContentSize;
+
+      // Deserialize the byte content as a StringTuple
+      array[i] = StringTuple.create(
+          STRINGS_SERDE.deserializeFromMemory(Memory.wrap(byteContent), numItemsInTuple)
+      );
+    }
+    return array;
+  }
+}
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/distribution/StringDistribution.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/distribution/StringDistribution.java
index c80f70f..1c94700 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/distribution/StringDistribution.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/distribution/StringDistribution.java
@@ -21,6 +21,7 @@ package org.apache.druid.indexing.common.task.batch.parallel.distribution;
 
 import com.fasterxml.jackson.annotation.JsonSubTypes;
 import com.fasterxml.jackson.annotation.JsonTypeInfo;
+import org.apache.druid.data.input.StringTuple;
 import org.apache.druid.timeline.partition.PartitionBoundaries;
 
 /**
@@ -35,17 +36,17 @@ public interface StringDistribution
   /**
    * Record occurrence of {@link String}
    */
-  void put(String element);
+  void put(StringTuple element);
 
   /**
    * Record occurrence of {@link String} if it will become the new minimum element.
    */
-  void putIfNewMin(String element);
+  void putIfNewMin(StringTuple element);
 
   /**
    * Record occurrence of {@link String} if it will become the new maximum element;
    */
-  void putIfNewMax(String element);
+  void putIfNewMax(StringTuple element);
 
   /**
    * Split the distribution in the fewest number of evenly-sized partitions while honoring a max
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/distribution/StringSketch.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/distribution/StringSketch.java
index 70ee85a..34e4af9 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/distribution/StringSketch.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/distribution/StringSketch.java
@@ -31,9 +31,9 @@ import com.fasterxml.jackson.databind.jsontype.TypeSerializer;
 import com.fasterxml.jackson.databind.ser.std.StdSerializer;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
-import org.apache.datasketches.ArrayOfStringsSerDe;
 import org.apache.datasketches.memory.Memory;
 import org.apache.datasketches.quantiles.ItemsSketch;
+import org.apache.druid.data.input.StringTuple;
 import org.apache.druid.timeline.partition.PartitionBoundaries;
 
 import java.io.IOException;
@@ -49,42 +49,42 @@ public class StringSketch implements StringDistribution
 {
   static final String NAME = "sketch";
   static final int SKETCH_K = 1 << 12;  // smallest value with normalized rank error < 0.1%; retain up to ~86k elements
-  static final Comparator<String> SKETCH_COMPARATOR = Comparator.naturalOrder();
-  private static final ArrayOfStringsSerDe ARRAY_OF_STRINGS_SERDE = new ArrayOfStringsSerDe();
+  static final Comparator<StringTuple> STRING_TUPLE_COMPARATOR = Comparator.naturalOrder();
+  private static final ArrayOfStringTuplesSerDe ARRAY_OF_STRINGS_SERDE = new ArrayOfStringTuplesSerDe();
 
-  private final ItemsSketch<String> delegate;
+  private final ItemsSketch<StringTuple> delegate;
 
   public StringSketch()
   {
-    this(ItemsSketch.getInstance(SKETCH_K, SKETCH_COMPARATOR));
+    this(ItemsSketch.getInstance(SKETCH_K, STRING_TUPLE_COMPARATOR));
   }
 
-  StringSketch(ItemsSketch<String> sketch)
+  StringSketch(ItemsSketch<StringTuple> sketch)
   {
     this.delegate = sketch;
   }
 
   @Override
-  public void put(String string)
+  public void put(StringTuple string)
   {
     delegate.update(string);
   }
 
   @Override
-  public void putIfNewMin(String string)
+  public void putIfNewMin(StringTuple value)
   {
-    String min = delegate.getMinValue();
-    if (min == null || string.compareTo(min) < 0) {
-      delegate.update(string);
+    StringTuple min = delegate.getMinValue();
+    if (min == null || min.compareTo(value) > 0) {
+      delegate.update(value);
     }
   }
 
   @Override
-  public void putIfNewMax(String string)
+  public void putIfNewMax(StringTuple value)
   {
-    String max = delegate.getMaxValue();
-    if (max == null || string.compareTo(max) > 0) {
-      delegate.update(string);
+    StringTuple max = delegate.getMaxValue();
+    if (max == null || max.compareTo(value) < 0) {
+      delegate.update(value);
     }
   }
 
@@ -109,13 +109,13 @@ public class StringSketch implements StringDistribution
   }
 
   @VisibleForTesting
-  public String getMin()
+  public StringTuple getMin()
   {
     return delegate.getMinValue();
   }
 
   @VisibleForTesting
-  public String getMax()
+  public StringTuple getMax()
   {
     return delegate.getMaxValue();
   }
@@ -127,8 +127,8 @@ public class StringSketch implements StringDistribution
         "evenPartitionCount must be positive but is %s",
         evenPartitionCount
     );
-    String[] partitions = delegate.getQuantiles(evenPartitionCount + 1); // add 1 since this returns endpoints
-    return new PartitionBoundaries((partitions == null) ? new String[0] : partitions);
+    StringTuple[] partitions = delegate.getQuantiles(evenPartitionCount + 1); // add 1 since this returns endpoints
+    return new PartitionBoundaries((partitions == null) ? new StringTuple[0] : partitions);
   }
 
   @Override
@@ -173,7 +173,7 @@ public class StringSketch implements StringDistribution
     );
   }
 
-  ItemsSketch<String> getDelegate()
+  ItemsSketch<StringTuple> getDelegate()
   {
     return delegate;
   }
@@ -231,9 +231,9 @@ public class StringSketch implements StringDistribution
       {
         JsonNode jsonNode = jsonParser.getCodec().readTree(jsonParser);
         byte[] sketchBytes = jsonNode.get(FIELD_SKETCH).binaryValue();
-        ItemsSketch<String> sketch = ItemsSketch.getInstance(
+        ItemsSketch<StringTuple> sketch = ItemsSketch.getInstance(
             Memory.wrap(sketchBytes),
-            SKETCH_COMPARATOR,
+            STRING_TUPLE_COMPARATOR,
             ARRAY_OF_STRINGS_SERDE
         );
         return new StringSketch(sketch);
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/distribution/StringSketchMerger.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/distribution/StringSketchMerger.java
index 5637fc0..5fac43d 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/distribution/StringSketchMerger.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/distribution/StringSketchMerger.java
@@ -20,17 +20,18 @@
 package org.apache.druid.indexing.common.task.batch.parallel.distribution;
 
 import org.apache.datasketches.quantiles.ItemsUnion;
+import org.apache.druid.data.input.StringTuple;
 
 /**
  * Merges {@link StringSketch}es.
  */
 public class StringSketchMerger implements StringDistributionMerger
 {
-  private final ItemsUnion<String> delegate;
+  private final ItemsUnion<StringTuple> delegate;
 
   public StringSketchMerger()
   {
-    delegate = ItemsUnion.getInstance(StringSketch.SKETCH_K, StringSketch.SKETCH_COMPARATOR);
+    delegate = ItemsUnion.getInstance(StringSketch.SKETCH_K, StringSketch.STRING_TUPLE_COMPARATOR);
   }
 
   @Override
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/iterator/RangePartitionIndexTaskInputRowIteratorBuilder.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/iterator/RangePartitionIndexTaskInputRowIteratorBuilder.java
index 30d34bb..c377d37 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/iterator/RangePartitionIndexTaskInputRowIteratorBuilder.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/iterator/RangePartitionIndexTaskInputRowIteratorBuilder.java
@@ -46,17 +46,17 @@ public class RangePartitionIndexTaskInputRowIteratorBuilder implements IndexTask
   private final DefaultIndexTaskInputRowIteratorBuilder delegate;
 
   /**
-   * @param partitionDimension Create range partitions for this dimension
+   * @param partitionDimensions Create range partitions for these dimensions
    * @param skipNull Whether to skip rows with a dimension value of null
    */
-  public RangePartitionIndexTaskInputRowIteratorBuilder(String partitionDimension, boolean skipNull)
+  public RangePartitionIndexTaskInputRowIteratorBuilder(List<String> partitionDimensions, boolean skipNull)
   {
     delegate = new DefaultIndexTaskInputRowIteratorBuilder();
 
     if (skipNull) {
-      delegate.appendInputRowHandler(createOnlySingleDimensionValueRowsHandler(partitionDimension));
+      delegate.appendInputRowHandler(createOnlySingleDimensionValueRowsHandler(partitionDimensions));
     } else {
-      delegate.appendInputRowHandler(createOnlySingleOrNullDimensionValueRowsHandler(partitionDimension));
+      delegate.appendInputRowHandler(createOnlySingleOrNullDimensionValueRowsHandler(partitionDimensions));
     }
   }
 
@@ -79,36 +79,69 @@ public class RangePartitionIndexTaskInputRowIteratorBuilder implements IndexTask
   }
 
   private static HandlingInputRowIterator.InputRowHandler createOnlySingleDimensionValueRowsHandler(
-      String partitionDimension
+      List<String> partitionDimensions
   )
   {
     return inputRow -> {
-      int dimensionValueCount = getSingleOrNullDimensionValueCount(inputRow, partitionDimension);
-      return dimensionValueCount != 1;
+      // Rows with multiple dimension values should cause an exception
+      ensureNoMultiValuedDimensions(inputRow, partitionDimensions);
+
+      // Rows with empty dimension values should be marked handled
+      // and need not be processed further
+      return hasEmptyDimensions(inputRow, partitionDimensions);
     };
   }
 
   private static HandlingInputRowIterator.InputRowHandler createOnlySingleOrNullDimensionValueRowsHandler(
-      String partitionDimension
+      List<String> partitionDimensions
   )
   {
     return inputRow -> {
-      int dimensionValueCount = getSingleOrNullDimensionValueCount(inputRow, partitionDimension);
-      return dimensionValueCount > 1;  // Rows.objectToStrings() returns an empty list for a single null value
+      // Rows with multiple dimension values should cause an exception
+      ensureNoMultiValuedDimensions(inputRow, partitionDimensions);
+
+      // All other rows (single or null dimension values) need to be processed
+      // further and should not be marked as handled
+      return false;
     };
   }
 
-  private static int getSingleOrNullDimensionValueCount(InputRow inputRow, String partitionDimension)
+  /**
+   * Checks if the given InputRow has any dimension column that is empty.
+   */
+  private static boolean hasEmptyDimensions(InputRow inputRow, List<String> partitionDimensions)
+  {
+    for (String dimension : partitionDimensions) {
+      int dimensionValueCount = inputRow.getDimension(dimension).size();
+      if (dimensionValueCount == 0) {
+        return true;
+      }
+    }
+
+    return false;
+  }
+
+  /**
+   * Verifies that the given InputRow does not have multiple values for any dimension.
+   *
+   * @throws IAE if any of the dimension columns in the given InputRow have
+   *             multiple values.
+   */
+  private static void ensureNoMultiValuedDimensions(
+      InputRow inputRow,
+      List<String> partitionDimensions
+  ) throws IAE
   {
-    List<String> dimensionValues = inputRow.getDimension(partitionDimension);
-    int dimensionValueCount = dimensionValues.size();
-    if (dimensionValueCount > 1) {
-      throw new IAE(
-          "Cannot partition on multi-value dimension [%s] for input row [%s]",
-          partitionDimension,
-          inputRow
-      );
+    for (String dimension : partitionDimensions) {
+      int dimensionValueCount = inputRow.getDimension(dimension).size();
+      if (dimensionValueCount > 1) {
+        throw new IAE(
+            "Cannot partition on multi-value dimension [%s] for input row [%s]",
+            dimension,
+            inputRow
+        );
+      }
     }
-    return dimensionValueCount;
   }
+
 }
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/partition/RangePartitionAnalysis.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/partition/RangePartitionAnalysis.java
index c8a2b88..8adbc2b 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/partition/RangePartitionAnalysis.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/partition/RangePartitionAnalysis.java
@@ -20,12 +20,12 @@
 package org.apache.druid.indexing.common.task.batch.partition;
 
 import com.google.common.collect.Maps;
-import org.apache.druid.indexer.partitions.SingleDimensionPartitionsSpec;
+import org.apache.druid.indexer.partitions.DimensionRangePartitionsSpec;
 import org.apache.druid.indexing.common.TaskToolbox;
 import org.apache.druid.java.util.common.IAE;
 import org.apache.druid.timeline.partition.BucketNumberedShardSpec;
+import org.apache.druid.timeline.partition.DimensionRangeBucketShardSpec;
 import org.apache.druid.timeline.partition.PartitionBoundaries;
-import org.apache.druid.timeline.partition.RangeBucketShardSpec;
 import org.joda.time.Interval;
 
 import java.util.Collections;
@@ -38,18 +38,18 @@ import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 
 public class RangePartitionAnalysis
-    implements CompletePartitionAnalysis<PartitionBoundaries, SingleDimensionPartitionsSpec>
+    implements CompletePartitionAnalysis<PartitionBoundaries, DimensionRangePartitionsSpec>
 {
   private final Map<Interval, PartitionBoundaries> intervalToPartitionBoundaries = new HashMap<>();
-  private final SingleDimensionPartitionsSpec partitionsSpec;
+  private final DimensionRangePartitionsSpec partitionsSpec;
 
-  public RangePartitionAnalysis(SingleDimensionPartitionsSpec partitionsSpec)
+  public RangePartitionAnalysis(DimensionRangePartitionsSpec partitionsSpec)
   {
     this.partitionsSpec = partitionsSpec;
   }
 
   @Override
-  public SingleDimensionPartitionsSpec getPartitionsSpec()
+  public DimensionRangePartitionsSpec getPartitionsSpec()
   {
     return partitionsSpec;
   }
@@ -90,10 +90,10 @@ public class RangePartitionAnalysis
 
   /**
    * Translate {@link PartitionBoundaries} into the corresponding
-   * {@link SingleDimensionPartitionsSpec} with segment id.
+   * {@link DimensionRangePartitionsSpec} with segment id.
    */
   private static List<BucketNumberedShardSpec<?>> translatePartitionBoundaries(
-      String partitionDimension,
+      List<String> partitionDimensions,
       PartitionBoundaries partitionBoundaries
   )
   {
@@ -102,9 +102,9 @@ public class RangePartitionAnalysis
     }
 
     return IntStream.range(0, partitionBoundaries.size() - 1)
-                    .mapToObj(i -> new RangeBucketShardSpec(
+                    .mapToObj(i -> new DimensionRangeBucketShardSpec(
                         i,
-                        partitionDimension,
+                        partitionDimensions,
                         partitionBoundaries.get(i),
                         partitionBoundaries.get(i + 1)
                     ))
@@ -114,7 +114,7 @@ public class RangePartitionAnalysis
   @Override
   public Map<Interval, List<BucketNumberedShardSpec<?>>> createBuckets(TaskToolbox toolbox)
   {
-    final String partitionDimension = partitionsSpec.getPartitionDimension();
+    final List<String> partitionDimensions = partitionsSpec.getPartitionDimensions();
     final Map<Interval, List<BucketNumberedShardSpec<?>>> intervalToSegmentIds = Maps.newHashMapWithExpectedSize(
         getNumTimePartitions()
     );
@@ -122,7 +122,7 @@ public class RangePartitionAnalysis
     forEach((interval, partitionBoundaries) ->
                 intervalToSegmentIds.put(
                     interval,
-                    translatePartitionBoundaries(partitionDimension, partitionBoundaries)
+                    translatePartitionBoundaries(partitionDimensions, partitionBoundaries)
                 )
     );
 
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskParallelRunTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskParallelRunTest.java
index 1d4b03a..e9ec434 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskParallelRunTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskParallelRunTest.java
@@ -32,6 +32,7 @@ import org.apache.druid.data.input.impl.DimensionsSpec;
 import org.apache.druid.data.input.impl.LocalInputSource;
 import org.apache.druid.data.input.impl.TimestampSpec;
 import org.apache.druid.indexer.TaskState;
+import org.apache.druid.indexer.partitions.DimensionRangePartitionsSpec;
 import org.apache.druid.indexer.partitions.DynamicPartitionsSpec;
 import org.apache.druid.indexer.partitions.HashedPartitionsSpec;
 import org.apache.druid.indexer.partitions.PartitionsSpec;
@@ -56,6 +57,7 @@ import org.apache.druid.segment.indexing.DataSchema;
 import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec;
 import org.apache.druid.timeline.CompactionState;
 import org.apache.druid.timeline.DataSegment;
+import org.apache.druid.timeline.partition.DimensionRangeShardSpec;
 import org.apache.druid.timeline.partition.HashBasedNumberedShardSpec;
 import org.apache.druid.timeline.partition.NumberedOverwriteShardSpec;
 import org.apache.druid.timeline.partition.NumberedShardSpec;
@@ -254,6 +256,49 @@ public class CompactionTaskParallelRunTest extends AbstractParallelIndexSupervis
   }
 
   @Test
+  public void testRunParallelWithMultiDimensionRangePartitioning() throws Exception
+  {
+    // Range partitioning is not supported with segment lock yet
+    Assume.assumeFalse(lockGranularity == LockGranularity.SEGMENT);
+    runIndexTask(null, true);
+
+    final Builder builder = new Builder(
+        DATA_SOURCE,
+        getSegmentCacheManagerFactory(),
+        RETRY_POLICY_FACTORY
+    );
+    final CompactionTask compactionTask = builder
+        .inputSpec(new CompactionIntervalSpec(INTERVAL_TO_INDEX, null))
+        .tuningConfig(newTuningConfig(
+            new DimensionRangePartitionsSpec(7, null, Arrays.asList("dim1", "dim2"), false),
+            2,
+            true
+        )).build();
+
+    final Set<DataSegment> compactedSegments = runTask(compactionTask);
+    for (DataSegment segment : compactedSegments) {
+      // Expect compaction state to exist as store compaction state by default
+      Assert.assertSame(DimensionRangeShardSpec.class, segment.getShardSpec().getClass());
+      CompactionState expectedState = new CompactionState(
+          new DimensionRangePartitionsSpec(7, null, Arrays.asList("dim1", "dim2"), false),
+          compactionTask.getTuningConfig().getIndexSpec().asMap(getObjectMapper()),
+          getObjectMapper().readValue(
+              getObjectMapper().writeValueAsString(
+                  new UniformGranularitySpec(
+                      Granularities.HOUR,
+                      Granularities.MINUTE,
+                      true,
+                      ImmutableList.of(segment.getInterval())
+                  )
+              ),
+              Map.class
+          )
+      );
+      Assert.assertEquals(expectedState, segment.getLastCompactionState());
+    }
+  }
+
+  @Test
   public void testRunParallelWithRangePartitioningWithSingleTask() throws Exception
   {
     // Range partitioning is not supported with segment lock yet
@@ -294,6 +339,49 @@ public class CompactionTaskParallelRunTest extends AbstractParallelIndexSupervis
   }
 
   @Test
+  public void testRunParallelWithMultiDimensionRangePartitioningWithSingleTask() throws Exception
+  {
+    // Range partitioning is not supported with segment lock yet
+    Assume.assumeFalse(lockGranularity == LockGranularity.SEGMENT);
+    runIndexTask(null, true);
+
+    final Builder builder = new Builder(
+        DATA_SOURCE,
+        getSegmentCacheManagerFactory(),
+        RETRY_POLICY_FACTORY
+    );
+    final CompactionTask compactionTask = builder
+        .inputSpec(new CompactionIntervalSpec(INTERVAL_TO_INDEX, null))
+        .tuningConfig(newTuningConfig(
+            new DimensionRangePartitionsSpec(7, null, Arrays.asList("dim1", "dim2"), false),
+            1,
+            true
+        )).build();
+
+    final Set<DataSegment> compactedSegments = runTask(compactionTask);
+    for (DataSegment segment : compactedSegments) {
+      // Expect compaction state to exist as store compaction state by default
+      Assert.assertSame(DimensionRangeShardSpec.class, segment.getShardSpec().getClass());
+      CompactionState expectedState = new CompactionState(
+          new DimensionRangePartitionsSpec(7, null, Arrays.asList("dim1", "dim2"), false),
+          compactionTask.getTuningConfig().getIndexSpec().asMap(getObjectMapper()),
+          getObjectMapper().readValue(
+              getObjectMapper().writeValueAsString(
+                  new UniformGranularitySpec(
+                      Granularities.HOUR,
+                      Granularities.MINUTE,
+                      true,
+                      ImmutableList.of(segment.getInterval())
+                  )
+              ),
+              Map.class
+          )
+      );
+      Assert.assertEquals(expectedState, segment.getLastCompactionState());
+    }
+  }
+
+  @Test
   public void testRunCompactionStateNotStoreIfContextSetToFalse()
   {
     runIndexTask(null, true);
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/RangePartitionCachingLocalSegmentAllocatorTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/RangePartitionCachingLocalSegmentAllocatorTest.java
index e841fba..dc3911f 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/RangePartitionCachingLocalSegmentAllocatorTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/RangePartitionCachingLocalSegmentAllocatorTest.java
@@ -22,7 +22,8 @@ package org.apache.druid.indexing.common.task;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import org.apache.druid.data.input.InputRow;
-import org.apache.druid.indexer.partitions.SingleDimensionPartitionsSpec;
+import org.apache.druid.data.input.StringTuple;
+import org.apache.druid.indexer.partitions.DimensionRangePartitionsSpec;
 import org.apache.druid.indexing.common.TaskLock;
 import org.apache.druid.indexing.common.TaskToolbox;
 import org.apache.druid.indexing.common.actions.LockListAction;
@@ -36,8 +37,8 @@ import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec;
 import org.apache.druid.segment.realtime.appenderator.SegmentAllocator;
 import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec;
 import org.apache.druid.timeline.SegmentId;
+import org.apache.druid.timeline.partition.DimensionRangeBucketShardSpec;
 import org.apache.druid.timeline.partition.PartitionBoundaries;
-import org.apache.druid.timeline.partition.RangeBucketShardSpec;
 import org.easymock.EasyMock;
 import org.joda.time.Interval;
 import org.junit.Assert;
@@ -60,6 +61,7 @@ public class RangePartitionCachingLocalSegmentAllocatorTest
   private static final String TASKID = "taskid";
   private static final String SUPERVISOR_TASKID = "supervisor-taskid";
   private static final String PARTITION_DIMENSION = "dimension";
+  private static final List<String> PARTITION_DIMENSIONS = Collections.singletonList(PARTITION_DIMENSION);
   private static final Interval INTERVAL_EMPTY = Intervals.utc(0, 1000);
   private static final Interval INTERVAL_SINGLETON = Intervals.utc(1000, 2000);
   private static final Interval INTERVAL_NORMAL = Intervals.utc(2000, 3000);
@@ -68,9 +70,9 @@ public class RangePartitionCachingLocalSegmentAllocatorTest
       INTERVAL_SINGLETON, "version-singleton",
       INTERVAL_NORMAL, "version-normal"
   );
-  private static final String PARTITION0 = "0";
-  private static final String PARTITION5 = "5";
-  private static final String PARTITION9 = "9";
+  private static final StringTuple PARTITION0 = StringTuple.create("0");
+  private static final StringTuple PARTITION5 = StringTuple.create("5");
+  private static final StringTuple PARTITION9 = StringTuple.create("9");
   private static final PartitionBoundaries EMPTY_PARTITIONS = new PartitionBoundaries();
   private static final PartitionBoundaries SINGLETON_PARTITIONS = new PartitionBoundaries(PARTITION0, PARTITION0);
   private static final PartitionBoundaries NORMAL_PARTITIONS = new PartitionBoundaries(
@@ -79,7 +81,7 @@ public class RangePartitionCachingLocalSegmentAllocatorTest
       PARTITION9
   );
 
-  private static final Map<Interval, PartitionBoundaries> INTERVAL_TO_PARTITONS = ImmutableMap.of(
+  private static final Map<Interval, PartitionBoundaries> INTERVAL_TO_PARTITIONS = ImmutableMap.of(
       INTERVAL_EMPTY, EMPTY_PARTITIONS,
       INTERVAL_SINGLETON, SINGLETON_PARTITIONS,
       INTERVAL_NORMAL, NORMAL_PARTITIONS
@@ -101,9 +103,9 @@ public class RangePartitionCachingLocalSegmentAllocatorTest
                            .collect(Collectors.toList())
     );
     final RangePartitionAnalysis partitionAnalysis = new RangePartitionAnalysis(
-        new SingleDimensionPartitionsSpec(null, 1, PARTITION_DIMENSION, false)
+        new DimensionRangePartitionsSpec(null, 1, PARTITION_DIMENSIONS, false)
     );
-    INTERVAL_TO_PARTITONS.forEach(partitionAnalysis::updateBucket);
+    INTERVAL_TO_PARTITIONS.forEach(partitionAnalysis::updateBucket);
     target = SegmentAllocators.forNonLinearPartitioning(
         toolbox,
         DATASOURCE,
@@ -150,7 +152,7 @@ public class RangePartitionCachingLocalSegmentAllocatorTest
   {
     Interval interval = INTERVAL_NORMAL;
     InputRow row = createInputRow(interval, PARTITION9);
-    int partitionNum = INTERVAL_TO_PARTITONS.get(interval).size() - 2;
+    int partitionNum = INTERVAL_TO_PARTITIONS.get(interval).size() - 2;
     testAllocate(row, interval, partitionNum, null);
   }
 
@@ -168,37 +170,37 @@ public class RangePartitionCachingLocalSegmentAllocatorTest
   @SuppressWarnings("SameParameterValue")
   private void testAllocate(InputRow row, Interval interval, int bucketId)
   {
-    String partitionEnd = getPartitionEnd(interval, bucketId);
+    StringTuple partitionEnd = getPartitionEnd(interval, bucketId);
     testAllocate(row, interval, bucketId, partitionEnd);
   }
 
   @Nullable
-  private static String getPartitionEnd(Interval interval, int bucketId)
+  private static StringTuple getPartitionEnd(Interval interval, int bucketId)
   {
-    PartitionBoundaries partitions = INTERVAL_TO_PARTITONS.get(interval);
+    PartitionBoundaries partitions = INTERVAL_TO_PARTITIONS.get(interval);
     boolean isLastPartition = (bucketId + 1) == partitions.size();
     return isLastPartition ? null : partitions.get(bucketId + 1);
   }
 
-  private void testAllocate(InputRow row, Interval interval, int bucketId, @Nullable String partitionEnd)
+  private void testAllocate(InputRow row, Interval interval, int bucketId, @Nullable StringTuple partitionEnd)
   {
-    String partitionStart = getPartitionStart(interval, bucketId);
+    StringTuple partitionStart = getPartitionStart(interval, bucketId);
     testAllocate(row, interval, bucketId, partitionStart, partitionEnd);
   }
 
   @Nullable
-  private static String getPartitionStart(Interval interval, int bucketId)
+  private static StringTuple getPartitionStart(Interval interval, int bucketId)
   {
     boolean isFirstPartition = bucketId == 0;
-    return isFirstPartition ? null : INTERVAL_TO_PARTITONS.get(interval).get(bucketId);
+    return isFirstPartition ? null : INTERVAL_TO_PARTITIONS.get(interval).get(bucketId);
   }
 
   private void testAllocate(
       InputRow row,
       Interval interval,
       int bucketId,
-      @Nullable String partitionStart,
-      @Nullable String partitionEnd
+      @Nullable StringTuple partitionStart,
+      @Nullable StringTuple partitionEnd
   )
   {
     String sequenceName = sequenceNameFunction.getSequenceName(interval, row);
@@ -208,8 +210,8 @@ public class RangePartitionCachingLocalSegmentAllocatorTest
         SegmentId.of(DATASOURCE, interval, INTERVAL_TO_VERSION.get(interval), bucketId),
         segmentIdWithShardSpec.asSegmentId()
     );
-    RangeBucketShardSpec shardSpec = (RangeBucketShardSpec) segmentIdWithShardSpec.getShardSpec();
-    Assert.assertEquals(PARTITION_DIMENSION, shardSpec.getDimension());
+    DimensionRangeBucketShardSpec shardSpec = (DimensionRangeBucketShardSpec) segmentIdWithShardSpec.getShardSpec();
+    Assert.assertEquals(PARTITION_DIMENSIONS, shardSpec.getDimensions());
     Assert.assertEquals(bucketId, shardSpec.getBucketId());
     Assert.assertEquals(partitionStart, shardSpec.getStart());
     Assert.assertEquals(partitionEnd, shardSpec.getEnd());
@@ -255,14 +257,14 @@ public class RangePartitionCachingLocalSegmentAllocatorTest
     return taskLock;
   }
 
-  private static InputRow createInputRow(Interval interval, String dimensionValue)
+  private static InputRow createInputRow(Interval interval, StringTuple dimensionValues)
   {
     long timestamp = interval.getStartMillis();
     InputRow inputRow = EasyMock.mock(InputRow.class);
     EasyMock.expect(inputRow.getTimestamp()).andStubReturn(DateTimes.utc(timestamp));
     EasyMock.expect(inputRow.getTimestampFromEpoch()).andStubReturn(timestamp);
     EasyMock.expect(inputRow.getDimension(PARTITION_DIMENSION))
-            .andStubReturn(Collections.singletonList(dimensionValue));
+            .andStubReturn(Collections.singletonList(dimensionValues.get(0)));
     EasyMock.replay(inputRow);
     return inputRow;
   }
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskSerdeTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskSerdeTest.java
index 542d39c..ded782c 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskSerdeTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskSerdeTest.java
@@ -131,7 +131,7 @@ public class ParallelIndexSupervisorTaskSerdeTest
   public void forceGuaranteedRollupWithSingleDimPartitionsMissingDimension()
   {
     expectedException.expect(IllegalArgumentException.class);
-    expectedException.expectMessage("partitionDimension must be specified");
+    expectedException.expectMessage("partitionDimensions must be specified");
 
     new ParallelIndexSupervisorTaskBuilder()
         .ingestionSpec(
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/PartialDimensionCardinalityTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/PartialDimensionCardinalityTaskTest.java
index b61b458..90b7de6 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/PartialDimensionCardinalityTaskTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/PartialDimensionCardinalityTaskTest.java
@@ -176,7 +176,7 @@ public class PartialDimensionCardinalityTaskTest
     public void requiresPartitionDimension() throws Exception
     {
       exception.expect(IllegalArgumentException.class);
-      exception.expectMessage("partitionDimension must be specified");
+      exception.expectMessage("partitionDimensions must be specified");
 
       ParallelIndexTuningConfig tuningConfig = new ParallelIndexTestingFactory.TuningConfigBuilder()
           .partitionsSpec(
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/PartialDimensionDistributionTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/PartialDimensionDistributionTaskTest.java
index 2e3a8d1..f5985ae 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/PartialDimensionDistributionTaskTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/PartialDimensionDistributionTaskTest.java
@@ -25,6 +25,7 @@ import com.google.common.collect.Iterables;
 import org.apache.druid.client.indexing.NoopIndexingServiceClient;
 import org.apache.druid.data.input.InputFormat;
 import org.apache.druid.data.input.InputSource;
+import org.apache.druid.data.input.StringTuple;
 import org.apache.druid.data.input.impl.InlineInputSource;
 import org.apache.druid.indexer.TaskState;
 import org.apache.druid.indexer.TaskStatus;
@@ -97,10 +98,10 @@ public class PartialDimensionDistributionTaskTest
     }
 
     @Test
-    public void requiresSingleDimensionPartitions()
+    public void requiresMultiDimensionPartitions()
     {
       exception.expect(IllegalArgumentException.class);
-      exception.expectMessage("single_dim partitionsSpec required");
+      exception.expectMessage("range partitionsSpec required");
 
       PartitionsSpec partitionsSpec = new HashedPartitionsSpec(null, 1, null);
       ParallelIndexTuningConfig tuningConfig =
@@ -174,10 +175,10 @@ public class PartialDimensionDistributionTaskTest
     }
 
     @Test
-    public void requiresPartitionDimension() throws Exception
+    public void requiresPartitionDimensions() throws Exception
     {
       exception.expect(IllegalArgumentException.class);
-      exception.expectMessage("partitionDimension must be specified");
+      exception.expectMessage("partitionDimensions must be specified");
 
       ParallelIndexTuningConfig tuningConfig = new ParallelIndexTestingFactory.TuningConfigBuilder()
           .partitionsSpec(
@@ -373,10 +374,10 @@ public class PartialDimensionDistributionTaskTest
       PartitionBoundaries partitions = distribution.getEvenPartitionsByMaxSize(1);
       Assert.assertEquals(minBloomFilterBits + 2, partitions.size()); // 2 = min + max
 
-      String minDimensionValue = dimensionValues.get(0);
+      StringTuple minDimensionValue = StringTuple.create(dimensionValues.get(0));
       Assert.assertEquals(minDimensionValue, ((StringSketch) distribution).getMin());
 
-      String maxDimensionValue = dimensionValues.get(dimensionValues.size() - 1);
+      StringTuple maxDimensionValue = StringTuple.create(dimensionValues.get(dimensionValues.size() - 1));
       Assert.assertEquals(maxDimensionValue, ((StringSketch) distribution).getMax());
     }
 
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/PartialRangeSegmentGenerateTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/PartialRangeSegmentGenerateTaskTest.java
index e09a49a..09016e1 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/PartialRangeSegmentGenerateTaskTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/PartialRangeSegmentGenerateTaskTest.java
@@ -22,6 +22,7 @@ package org.apache.druid.indexing.common.task.batch.parallel;
 import com.google.common.collect.ImmutableMap;
 import org.apache.druid.data.input.InputFormat;
 import org.apache.druid.data.input.InputSource;
+import org.apache.druid.data.input.StringTuple;
 import org.apache.druid.data.input.impl.InlineInputSource;
 import org.apache.druid.indexer.partitions.DynamicPartitionsSpec;
 import org.apache.druid.indexer.partitions.HashedPartitionsSpec;
@@ -53,7 +54,7 @@ public class PartialRangeSegmentGenerateTaskTest extends AbstractParallelIndexSu
   public void requiresForceGuaranteedRollup()
   {
     exception.expect(IllegalArgumentException.class);
-    exception.expectMessage("single_dim partitionsSpec required");
+    exception.expectMessage("range or single_dim partitionsSpec required");
 
     ParallelIndexTuningConfig tuningConfig = new ParallelIndexTestingFactory.TuningConfigBuilder()
         .forceGuaranteedRollup(false)
@@ -66,10 +67,10 @@ public class PartialRangeSegmentGenerateTaskTest extends AbstractParallelIndexSu
   }
 
   @Test
-  public void requiresSingleDimensionPartitions()
+  public void requiresMultiDimensionPartitions()
   {
     exception.expect(IllegalArgumentException.class);
-    exception.expectMessage("single_dim partitionsSpec required");
+    exception.expectMessage("range or single_dim partitionsSpec required");
 
     PartitionsSpec partitionsSpec = new HashedPartitionsSpec(null, 1, null);
     ParallelIndexTuningConfig tuningConfig =
@@ -144,7 +145,7 @@ public class PartialRangeSegmentGenerateTaskTest extends AbstractParallelIndexSu
           ParallelIndexTestingFactory.NUM_ATTEMPTS,
           ingestionSpec,
           ParallelIndexTestingFactory.CONTEXT,
-          ImmutableMap.of(Intervals.ETERNITY, new PartitionBoundaries("a"))
+          ImmutableMap.of(Intervals.ETERNITY, new PartitionBoundaries(StringTuple.create("a")))
       );
     }
   }
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/RangePartitionAdjustingCorePartitionSizeTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/RangePartitionAdjustingCorePartitionSizeTest.java
index bfdc582..f7be58a 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/RangePartitionAdjustingCorePartitionSizeTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/RangePartitionAdjustingCorePartitionSizeTest.java
@@ -26,7 +26,7 @@ import org.apache.druid.data.input.impl.DimensionsSpec;
 import org.apache.druid.data.input.impl.TimestampSpec;
 import org.apache.druid.indexer.TaskState;
 import org.apache.druid.indexer.partitions.DimensionBasedPartitionsSpec;
-import org.apache.druid.indexer.partitions.SingleDimensionPartitionsSpec;
+import org.apache.druid.indexer.partitions.DimensionRangePartitionsSpec;
 import org.apache.druid.indexing.common.LockGranularity;
 import org.apache.druid.java.util.common.Intervals;
 import org.apache.druid.java.util.common.StringUtils;
@@ -45,6 +45,7 @@ import java.nio.charset.StandardCharsets;
 import java.nio.file.Files;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.List;
 import java.util.Set;
 
@@ -98,10 +99,11 @@ public class RangePartitionAdjustingCorePartitionSizeTest extends AbstractMultiP
         writer.write(StringUtils.format("2020-01-01T00:00:00,zzz,b1,10\n"));
       }
     }
-    final DimensionBasedPartitionsSpec partitionsSpec = new SingleDimensionPartitionsSpec(
+    final List<String> partitionDimensions = Collections.singletonList("dim1");
+    final DimensionBasedPartitionsSpec partitionsSpec = new DimensionRangePartitionsSpec(
         2,
         null,
-        "dim1",
+        partitionDimensions,
         false
     );
     final List<DataSegment> segments = new ArrayList<>(
@@ -124,7 +126,7 @@ public class RangePartitionAdjustingCorePartitionSizeTest extends AbstractMultiP
     final SingleDimensionShardSpec shardSpec = (SingleDimensionShardSpec) segment.getShardSpec();
     Assert.assertEquals(1, shardSpec.getNumCorePartitions());
     Assert.assertEquals(0, shardSpec.getPartitionNum());
-    Assert.assertEquals("dim1", shardSpec.getDimension());
+    Assert.assertEquals(partitionDimensions, shardSpec.getDimensions());
   }
 
   @Test
@@ -137,10 +139,12 @@ public class RangePartitionAdjustingCorePartitionSizeTest extends AbstractMultiP
         writer.write(StringUtils.format("2020-01-01T00:00:00,%s,b1,%d\n", "aa" + (i + 10), 10 * (i + 1)));
       }
     }
-    final DimensionBasedPartitionsSpec partitionsSpec = new SingleDimensionPartitionsSpec(
+
+    final List<String> partitionDimensions = Collections.singletonList("dim1");
+    final DimensionBasedPartitionsSpec partitionsSpec = new DimensionRangePartitionsSpec(
         2,
         null,
-        "dim1",
+        partitionDimensions,
         false
     );
     final Set<DataSegment> segments = runTestTask(
@@ -161,7 +165,7 @@ public class RangePartitionAdjustingCorePartitionSizeTest extends AbstractMultiP
       final SingleDimensionShardSpec shardSpec = (SingleDimensionShardSpec) segment.getShardSpec();
       Assert.assertEquals(5, shardSpec.getNumCorePartitions());
       Assert.assertTrue(shardSpec.getPartitionNum() < shardSpec.getNumCorePartitions());
-      Assert.assertEquals("dim1", shardSpec.getDimension());
+      Assert.assertEquals(partitionDimensions, shardSpec.getDimensions());
     });
   }
 }
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/RangePartitionMultiPhaseParallelIndexingTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/RangePartitionMultiPhaseParallelIndexingTest.java
index 1b33c63..f5b76e5 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/RangePartitionMultiPhaseParallelIndexingTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/RangePartitionMultiPhaseParallelIndexingTest.java
@@ -26,12 +26,14 @@ import com.google.common.collect.Multimap;
 import com.google.common.collect.SetMultimap;
 import org.apache.druid.common.config.NullValueHandlingConfig;
 import org.apache.druid.data.input.InputFormat;
+import org.apache.druid.data.input.StringTuple;
 import org.apache.druid.data.input.impl.CSVParseSpec;
 import org.apache.druid.data.input.impl.CsvInputFormat;
 import org.apache.druid.data.input.impl.DimensionsSpec;
 import org.apache.druid.data.input.impl.ParseSpec;
 import org.apache.druid.data.input.impl.TimestampSpec;
 import org.apache.druid.indexer.TaskState;
+import org.apache.druid.indexer.partitions.DimensionRangePartitionsSpec;
 import org.apache.druid.indexer.partitions.DynamicPartitionsSpec;
 import org.apache.druid.indexer.partitions.PartitionsSpec;
 import org.apache.druid.indexer.partitions.SingleDimensionPartitionsSpec;
@@ -40,6 +42,7 @@ import org.apache.druid.java.util.common.Intervals;
 import org.apache.druid.java.util.common.guava.Comparators;
 import org.apache.druid.query.scan.ScanResultValue;
 import org.apache.druid.timeline.DataSegment;
+import org.apache.druid.timeline.partition.DimensionRangeShardSpec;
 import org.apache.druid.timeline.partition.NumberedShardSpec;
 import org.apache.druid.timeline.partition.SingleDimensionShardSpec;
 import org.hamcrest.Matchers;
@@ -62,6 +65,7 @@ import java.nio.file.Path;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.LinkedHashMap;
@@ -222,10 +226,10 @@ public class RangePartitionMultiPhaseParallelIndexingTest extends AbstractMultiP
   {
     int targetRowsPerSegment = NUM_ROW * 2 / DIM_FILE_CARDINALITY / NUM_PARTITION;
     final Set<DataSegment> publishedSegments = runTestTask(
-        new SingleDimensionPartitionsSpec(
+        new DimensionRangePartitionsSpec(
             targetRowsPerSegment,
             null,
-            DIM1,
+            Collections.singletonList(DIM1),
             false
         ),
         useMultivalueDim ? TaskState.FAILED : TaskState.SUCCESS,
@@ -352,9 +356,9 @@ public class RangePartitionMultiPhaseParallelIndexingTest extends AbstractMultiP
     intervalToSegments.asMap().forEach((interval, segments) -> {
       assertNumPartition(segments);
 
-      List<String> allValues = new ArrayList<>(NUM_ROW);
+      List<StringTuple> allValues = new ArrayList<>(NUM_ROW);
       for (DataSegment segment : segments) {
-        List<String> values = getColumnValues(segment, tempSegmentDir);
+        List<StringTuple> values = getColumnValues(segment, tempSegmentDir);
         assertValuesInRange(values, segment);
         allValues.addAll(values);
       }
@@ -373,24 +377,25 @@ public class RangePartitionMultiPhaseParallelIndexingTest extends AbstractMultiP
     Assert.assertEquals(NUM_PARTITION, segments.size());
   }
 
-  private List<String> getColumnValues(DataSegment segment, File tempDir)
+  private List<StringTuple> getColumnValues(DataSegment segment, File tempDir)
   {
     List<ScanResultValue> results = querySegment(segment, DIMS, tempDir);
     Assert.assertEquals(1, results.size());
     List<LinkedHashMap<String, String>> rows = (List<LinkedHashMap<String, String>>) results.get(0).getEvents();
     return rows.stream()
                .map(row -> row.get(DIM1))
+               .map(StringTuple::create)
                .collect(Collectors.toList());
   }
 
-  private static void assertValuesInRange(List<String> values, DataSegment segment)
+  private static void assertValuesInRange(List<StringTuple> values, DataSegment segment)
   {
-    SingleDimensionShardSpec shardSpec = (SingleDimensionShardSpec) segment.getShardSpec();
-    String start = shardSpec.getStart();
-    String end = shardSpec.getEnd();
+    DimensionRangeShardSpec shardSpec = (DimensionRangeShardSpec) segment.getShardSpec();
+    StringTuple start = shardSpec.getStartTuple();
+    StringTuple end = shardSpec.getEndTuple();
     Assert.assertTrue(shardSpec.toString(), start != null || end != null);
 
-    for (String value : values) {
+    for (StringTuple value : values) {
       if (start != null) {
         Assert.assertThat(value.compareTo(start), Matchers.greaterThanOrEqualTo(0));
       }
@@ -405,11 +410,12 @@ public class RangePartitionMultiPhaseParallelIndexingTest extends AbstractMultiP
     }
   }
 
-  private void assertIntervalHasAllExpectedValues(Interval interval, List<String> actualValues)
+  private void assertIntervalHasAllExpectedValues(Interval interval, List<StringTuple> actualValues)
   {
-    List<String> expectedValues = intervalToDims.get(interval)
+    List<StringTuple> expectedValues = intervalToDims.get(interval)
                                                 .stream()
                                                 .map(d -> (String) d.get(0))
+                                                .map(StringTuple::create)
                                                 .sorted(Comparators.naturalNullsFirst())
                                                 .collect(Collectors.toList());
     actualValues.sort(Comparators.naturalNullsFirst());
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/distribution/StringSketchMergerTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/distribution/StringSketchMergerTest.java
index 7678a7d..67f53fd 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/distribution/StringSketchMergerTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/distribution/StringSketchMergerTest.java
@@ -19,6 +19,7 @@
 
 package org.apache.druid.indexing.common.task.batch.parallel.distribution;
 
+import org.apache.druid.data.input.StringTuple;
 import org.apache.druid.timeline.partition.PartitionBoundaries;
 import org.easymock.EasyMock;
 import org.junit.Assert;
@@ -54,15 +55,15 @@ public class StringSketchMergerTest
   @Test
   public void mergesCorrectly()
   {
-    String string1 = "a";
+    StringTuple string1 = StringTuple.create("a");
     StringSketch sketch1 = new StringSketch();
     sketch1.put(string1);
 
-    String string2 = "mn";
+    StringTuple string2 = StringTuple.create("mn");
     StringSketch sketch2 = new StringSketch();
     sketch2.put(string2);
 
-    String string3 = "z";
+    StringTuple string3 = StringTuple.create("z");
     StringSketch sketch3 = new StringSketch();
     sketch3.put(string3);
 
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/distribution/StringSketchTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/distribution/StringSketchTest.java
index 43071aa..60d7b54 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/distribution/StringSketchTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/distribution/StringSketchTest.java
@@ -22,6 +22,7 @@ package org.apache.druid.indexing.common.task.batch.parallel.distribution;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import nl.jqno.equalsverifier.EqualsVerifier;
 import org.apache.datasketches.quantiles.ItemsSketch;
+import org.apache.druid.data.input.StringTuple;
 import org.apache.druid.jackson.JacksonModule;
 import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.segment.TestHelper;
@@ -49,11 +50,11 @@ public class StringSketchTest
   private static final int FACTOR = 2;
   private static final int NUM_STRING = StringSketch.SKETCH_K * FACTOR;
   private static final double DELTA = ItemsSketch.getNormalizedRankError(StringSketch.SKETCH_K, true) * NUM_STRING;
-  private static final List<String> STRINGS = IntStream.range(0, NUM_STRING)
-                                                       .mapToObj(i -> StringUtils.format("%010d", i))
-                                                       .collect(Collectors.toCollection(ArrayList::new));
-  private static final String MIN_STRING = STRINGS.get(0);
-  private static final String MAX_STRING = STRINGS.get(NUM_STRING - 1);
+  private static final List<StringTuple> STRINGS = IntStream.range(0, NUM_STRING)
+                                                            .mapToObj(i -> StringTuple.create(StringUtils.format("%010d", i)))
+                                                            .collect(Collectors.toCollection(ArrayList::new));
+  private static final StringTuple MIN_STRING = STRINGS.get(0);
+  private static final StringTuple MAX_STRING = STRINGS.get(NUM_STRING - 1);
 
   static {
     ItemsSketch.rand.setSeed(0);  // make sketches deterministic for testing
@@ -95,7 +96,7 @@ public class StringSketchTest
     @Test
     public void putIfNewMin()
     {
-      String value = MAX_STRING;
+      StringTuple value = MAX_STRING;
       Assert.assertEquals(0, getCount());
 
       target.putIfNewMin(value);
@@ -115,7 +116,7 @@ public class StringSketchTest
     @Test
     public void putIfNewMax()
     {
-      String value = MIN_STRING;
+      StringTuple value = MIN_STRING;
       Assert.assertEquals(0, getCount());
 
       target.putIfNewMax(value);
@@ -217,7 +218,7 @@ public class StringSketchTest
 
         int previous = 0;
         for (int i = 1; i < partitionBoundaries.size() - 1; i++) {
-          int current = Integer.parseInt(partitionBoundaries.get(i));
+          int current = Integer.parseInt(partitionBoundaries.get(i).get(0));
           int size = current - previous;
           Assert.assertThat(
               getErrMsgPrefix(targetSize, i) + partitionBoundariesString,
@@ -308,7 +309,7 @@ public class StringSketchTest
 
         int previous = 0;
         for (int i = 1; i < partitionBoundaries.size() - 1; i++) {
-          int current = Integer.parseInt(partitionBoundaries.get(i));
+          int current = Integer.parseInt(partitionBoundaries.get(i).get(0));
           int size = current - previous;
           Assert.assertThat(
               getErrMsgPrefix(maxSize, i) + partitionBoundariesString,
@@ -350,7 +351,7 @@ public class StringSketchTest
 
       int previous = 0;
       for (int i = 1; i < partitionBoundaries.size() - 1; i++) {
-        int current = Integer.parseInt(partitionBoundaries.get(i));
+        int current = Integer.parseInt(partitionBoundaries.get(i).get(0));
         Assert.assertEquals(
             getErrMsgPrefix(1, i) + partitionBoundariesString,
             1,
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/iterator/IndexTaskInputRowIteratorBuilderTestingFactory.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/iterator/IndexTaskInputRowIteratorBuilderTestingFactory.java
index 0ca2712..a182131 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/iterator/IndexTaskInputRowIteratorBuilderTestingFactory.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/iterator/IndexTaskInputRowIteratorBuilderTestingFactory.java
@@ -44,6 +44,7 @@ class IndexTaskInputRowIteratorBuilderTestingFactory
 {
   static final DateTime TIMESTAMP = DateTimes.utc(0);
   static final String DIMENSION = "dimension";
+  static final List<String> DIMENSIONS = Collections.singletonList(DIMENSION);
   static final Optional<Interval> PRESENT_BUCKET_INTERVAL_OPT = Optional.of(Intervals.ETERNITY);
 
   static InputRow createInputRow(DateTime timestamp)
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/iterator/RangePartitionIndexTaskInputRowIteratorBuilderTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/iterator/RangePartitionIndexTaskInputRowIteratorBuilderTest.java
index da53ba2..20065b5 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/iterator/RangePartitionIndexTaskInputRowIteratorBuilderTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/iterator/RangePartitionIndexTaskInputRowIteratorBuilderTest.java
@@ -39,7 +39,7 @@ public class RangePartitionIndexTaskInputRowIteratorBuilderTest
   private static final IndexTaskInputRowIteratorBuilderTestingFactory.HandlerTester HANDLER_TESTER =
       IndexTaskInputRowIteratorBuilderTestingFactory.createHandlerTester(
           () -> new RangePartitionIndexTaskInputRowIteratorBuilder(
-              IndexTaskInputRowIteratorBuilderTestingFactory.DIMENSION,
+              IndexTaskInputRowIteratorBuilderTestingFactory.DIMENSIONS,
               SKIP_NULL
           )
       );
@@ -181,7 +181,7 @@ public class RangePartitionIndexTaskInputRowIteratorBuilderTest
     IndexTaskInputRowIteratorBuilderTestingFactory.HandlerTester handlerTester =
         IndexTaskInputRowIteratorBuilderTestingFactory.createHandlerTester(
             () -> new RangePartitionIndexTaskInputRowIteratorBuilder(
-                IndexTaskInputRowIteratorBuilderTestingFactory.DIMENSION,
+                IndexTaskInputRowIteratorBuilderTestingFactory.DIMENSIONS,
                 !SKIP_NULL
             )
         );
diff --git a/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java b/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java
index 0b2a586..b58910a 100644
--- a/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java
+++ b/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java
@@ -28,7 +28,7 @@ import org.apache.druid.client.indexing.ClientCompactionTaskQueryTuningConfig;
 import org.apache.druid.client.indexing.IndexingServiceClient;
 import org.apache.druid.client.indexing.TaskPayloadResponse;
 import org.apache.druid.indexer.TaskStatusPlus;
-import org.apache.druid.indexer.partitions.SingleDimensionPartitionsSpec;
+import org.apache.druid.indexer.partitions.DimensionRangePartitionsSpec;
 import org.apache.druid.java.util.common.ISE;
 import org.apache.druid.java.util.common.granularity.Granularity;
 import org.apache.druid.java.util.common.logger.Logger;
@@ -297,7 +297,7 @@ public class CompactSegments implements CoordinatorDuty
   private static boolean useRangePartitions(ClientCompactionTaskQueryTuningConfig tuningConfig)
   {
     // dynamic partitionsSpec will be used if getPartitionsSpec() returns null
-    return tuningConfig.getPartitionsSpec() instanceof SingleDimensionPartitionsSpec;
+    return tuningConfig.getPartitionsSpec() instanceof DimensionRangePartitionsSpec;
   }
 
   private static List<TaskStatusPlus> filterNonCompactionTasks(List<TaskStatusPlus> taskStatuses)
diff --git a/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java b/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java
index 3f9431c..78a9d55 100644
--- a/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java
+++ b/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java
@@ -24,6 +24,7 @@ import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Iterables;
+import org.apache.druid.data.input.StringTuple;
 import org.apache.druid.indexing.overlord.DataSourceMetadata;
 import org.apache.druid.indexing.overlord.ObjectMetadata;
 import org.apache.druid.indexing.overlord.SegmentPublishResult;
@@ -35,6 +36,7 @@ import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.segment.TestHelper;
 import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec;
 import org.apache.druid.timeline.DataSegment;
+import org.apache.druid.timeline.partition.DimensionRangeShardSpec;
 import org.apache.druid.timeline.partition.HashBasedNumberedPartialShardSpec;
 import org.apache.druid.timeline.partition.HashBasedNumberedShardSpec;
 import org.apache.druid.timeline.partition.LinearShardSpec;
@@ -1805,6 +1807,49 @@ public class IndexerSQLMetadataStorageCoordinatorTest
   }
 
   @Test
+  public void testAddNumberedShardSpecAfterMultiDimensionsShardSpecWithUnknownCorePartitionSize() throws IOException
+  {
+    final String datasource = "datasource";
+    final Interval interval = Intervals.of("2020-01-01/P1D");
+    final String version = "version";
+    final List<String> dimensions = ImmutableList.of("dim");
+    final List<String> metrics = ImmutableList.of("met");
+    final Set<DataSegment> originalSegments = new HashSet<>();
+    for (int i = 0; i < 6; i++) {
+      originalSegments.add(
+          new DataSegment(
+              datasource,
+              interval,
+              version,
+              ImmutableMap.of(),
+              dimensions,
+              metrics,
+              new DimensionRangeShardSpec(
+                  Collections.singletonList("dim"),
+                  i == 0 ? null : StringTuple.create(String.valueOf(i - 1)),
+                  i == 5 ? null : StringTuple.create(String.valueOf(i)),
+                  i,
+                  null // emulate shardSpecs created in older versions of Druid
+              ),
+              9,
+              10L
+          )
+      );
+    }
+    coordinator.announceHistoricalSegments(originalSegments);
+    final SegmentIdWithShardSpec id = coordinator.allocatePendingSegment(
+        datasource,
+        "seq",
+        null,
+        interval,
+        NumberedPartialShardSpec.instance(),
+        version,
+        false
+    );
+    Assert.assertNull(id);
+  }
+
+  @Test
   public void testAddNumberedShardSpecAfterSingleDimensionsShardSpecWithUnknownCorePartitionSize() throws IOException
   {
     final String datasource = "datasource";
diff --git a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/SegmentPublisherHelperTest.java b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/SegmentPublisherHelperTest.java
index 4ace675..ef4e54f 100644
--- a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/SegmentPublisherHelperTest.java
+++ b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/SegmentPublisherHelperTest.java
@@ -22,11 +22,14 @@ package org.apache.druid.segment.realtime.appenderator;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableSet;
+import org.apache.druid.data.input.StringTuple;
 import org.apache.druid.java.util.common.Intervals;
 import org.apache.druid.timeline.DataSegment;
+import org.apache.druid.timeline.partition.BuildingDimensionRangeShardSpec;
 import org.apache.druid.timeline.partition.BuildingHashBasedNumberedShardSpec;
 import org.apache.druid.timeline.partition.BuildingNumberedShardSpec;
 import org.apache.druid.timeline.partition.BuildingSingleDimensionShardSpec;
+import org.apache.druid.timeline.partition.DimensionRangeShardSpec;
 import org.apache.druid.timeline.partition.HashBasedNumberedShardSpec;
 import org.apache.druid.timeline.partition.HashBucketShardSpec;
 import org.apache.druid.timeline.partition.HashPartitionFunction;
@@ -40,6 +43,7 @@ import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.ExpectedException;
 
+import java.util.Arrays;
 import java.util.Set;
 
 public class SegmentPublisherHelperTest
@@ -160,6 +164,40 @@ public class SegmentPublisherHelperTest
   }
 
   @Test
+  public void testAnnotateCorePartitionSetSizeForDimensionRangeShardSpec()
+  {
+    final Set<DataSegment> segments = ImmutableSet.of(
+        newSegment(new BuildingDimensionRangeShardSpec(
+            0,
+            Arrays.asList("dim1", "dim2"),
+            null,
+            StringTuple.create("a", "5"),
+            0
+        )),
+        newSegment(new BuildingDimensionRangeShardSpec(
+            1,
+            Arrays.asList("dim1", "dim2"),
+            null,
+            StringTuple.create("a", "5"),
+            1
+        )),
+        newSegment(new BuildingDimensionRangeShardSpec(
+            2,
+            Arrays.asList("dim1", "dim2"),
+            null,
+            StringTuple.create("a", "5"),
+            2
+        ))
+    );
+    final Set<DataSegment> annotated = SegmentPublisherHelper.annotateShardSpec(segments);
+    for (DataSegment segment : annotated) {
+      Assert.assertSame(DimensionRangeShardSpec.class, segment.getShardSpec().getClass());
+      final DimensionRangeShardSpec shardSpec = (DimensionRangeShardSpec) segment.getShardSpec();
+      Assert.assertEquals(3, shardSpec.getNumCorePartitions());
+    }
+  }
+
+  @Test
   public void testAnnotateShardSpecDoNothing()
   {
     final Set<DataSegment> segments = ImmutableSet.of(

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org