You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2019/01/23 21:11:32 UTC

[incubator-pinot] branch master updated: Replace partition ranges with partitions (#3725)

This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 525102f  Replace partition ranges with partitions (#3725)
525102f is described below

commit 525102fd907bfce67d2f68d9feaf74987ae177e7
Author: Xiaotian (Jackie) Jiang <17...@users.noreply.github.com>
AuthorDate: Wed Jan 23 13:11:27 2019 -0800

    Replace partition ranges with partitions (#3725)
    
    Currently for partition based routing and pruning, we store a list
    of ranges (IntRange), but does not support any operations on ranges.
    Instead, storing a set of partitions (int) should be enough to serve
    all use cases.
    
    New behavior:
    - For json format ColumnPartitionMetadata, replace "partitionRanges"
      with "partitions" key.
      E.g. "partitionRanges":"[0 0],[1 1]" -> "partitions":[0,1]
    - For partition in segment metadata, replace strings with integers
      E.g. partitionValues = [0 0],[1 1] -> partitionValues = 0,1
    
    Backward-compatibility:
    - Add custom deserializer for ColumnPartitionMetadata to handle both
      new and legacy json format
    - Add ColumnPartitionMetadata.extractPartitions() to handle both new
      and legacy config format
    - Add ColumnPartitionMetadataTest
---
 .../broker/pruner/PartitionZKMetadataPruner.java   |  14 +-
 .../PartitionAwareOfflineRoutingTableBuilder.java  |  15 +--
 .../broker/pruner/SegmentZKMetadataPrunerTest.java |   6 +-
 ...rtitionAwareOfflineRoutingTableBuilderTest.java |   5 +-
 ...titionAwareRealtimeRoutingTableBuilderTest.java |   5 +-
 .../pinot/common/config/ColumnPartitionConfig.java |  75 +----------
 .../metadata/segment/ColumnPartitionMetadata.java  | 148 ++++++++++++++-------
 .../metadata/segment/SegmentPartitionMetadata.java |  35 ++---
 .../apache/pinot/common/metrics/ServerGauge.java   |   2 +-
 .../common/metadata/SegmentZKMetadataTest.java     |  16 +--
 .../segment/ColumnPartitionMetadataTest.java       |  71 ++++++++++
 .../realtime/PinotLLCRealtimeSegmentManager.java   |   7 +-
 .../ReplicaGroupSegmentAssignmentStrategy.java     |  11 +-
 .../helix/core/util/ZKMetadataUtils.java           |   2 +-
 .../controller/utils/SegmentMetadataMockUtils.java |  10 +-
 .../core/query/pruner/PartitionSegmentPruner.java  |  27 ++--
 .../converter/RealtimeSegmentConverter.java        |   5 +-
 .../converter/stats/RealtimeColumnStatistics.java  |  48 ++-----
 .../stats/RealtimeNoDictionaryColStatistics.java   |  11 +-
 .../segment/creator/ColumnIndexCreationInfo.java   |   7 +-
 .../core/segment/creator/ColumnStatistics.java     |  10 +-
 .../creator/impl/SegmentColumnarIndexCreator.java  |  14 +-
 .../stats/AbstractColumnStatisticsCollector.java   |  39 ++----
 .../stats/SegmentPreIndexStatsCollectorImpl.java   |   2 +-
 .../pinot/core/segment/index/ColumnMetadata.java   |  32 +++--
 .../core/segment/index/SegmentMetadataImpl.java    |   2 +-
 .../defaultcolumn/DefaultColumnStatistics.java     |  20 +--
 .../index/creator/SegmentPartitionTest.java        | 107 ++++-----------
 28 files changed, 318 insertions(+), 428 deletions(-)

diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/pruner/PartitionZKMetadataPruner.java b/pinot-broker/src/main/java/org/apache/pinot/broker/pruner/PartitionZKMetadataPruner.java
index 6b1d5af..45538f0 100644
--- a/pinot-broker/src/main/java/org/apache/pinot/broker/pruner/PartitionZKMetadataPruner.java
+++ b/pinot-broker/src/main/java/org/apache/pinot/broker/pruner/PartitionZKMetadataPruner.java
@@ -21,7 +21,6 @@ package org.apache.pinot.broker.pruner;
 import java.util.List;
 import java.util.Map;
 import javax.annotation.Nonnull;
-import org.apache.commons.lang.math.IntRange;
 import org.apache.pinot.common.metadata.segment.ColumnPartitionMetadata;
 import org.apache.pinot.common.metadata.segment.SegmentPartitionMetadata;
 import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
@@ -86,22 +85,11 @@ public class PartitionZKMetadataPruner implements SegmentZKMetadataPruner {
       return false;
     }
 
-    List<IntRange> partitionRanges = metadata.getPartitionRanges();
-    if (partitionRanges == null || partitionRanges.isEmpty()) {
-      return false;
-    }
-
     String value = filterQueryTree.getValue().get(0);
     PartitionFunction partitionFunction =
         PartitionFunctionFactory.getPartitionFunction(metadata.getFunctionName(), metadata.getNumPartitions());
     int partition = partitionFunction.getPartition(value);
-
-    for (IntRange partitionRange : partitionRanges) {
-      if (partitionRange.containsInteger(partition)) {
-        return false;
-      }
-    }
-    return true;
+    return !metadata.getPartitions().contains(partition);
   }
 
   /**
diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/builder/PartitionAwareOfflineRoutingTableBuilder.java b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/builder/PartitionAwareOfflineRoutingTableBuilder.java
index 113a94d..8ecf14d 100644
--- a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/builder/PartitionAwareOfflineRoutingTableBuilder.java
+++ b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/builder/PartitionAwareOfflineRoutingTableBuilder.java
@@ -179,7 +179,10 @@ public class PartitionAwareOfflineRoutingTableBuilder extends BasePartitionAware
   }
 
   /**
-   * Get partition id from segment Zk metadata. This helper function assumes there is only one column partition metadata.
+   * Get partition id from segment Zk metadata.
+   * <p>
+   * Currently we assume the segment is partitioned on at most one column, and contains only 1 partition.
+   * TODO: support segment that is partitioned on multiple columns, or contains multiple partitions
    *
    * @param segmentZKMetadata segment zk metadata for a segment
    * @return partition id
@@ -193,16 +196,10 @@ public class PartitionAwareOfflineRoutingTableBuilder extends BasePartitionAware
         return NO_PARTITION_NUMBER;
       }
       Map<String, ColumnPartitionMetadata> columnPartitionMap = partitionMetadata.getColumnPartitionMap();
-      if (columnPartitionMap == null || columnPartitionMap.size() == 0) {
+      if (columnPartitionMap == null || columnPartitionMap.isEmpty()) {
         return NO_PARTITION_NUMBER;
       }
-      ColumnPartitionMetadata columnPartitionMetadata;
-      if (columnPartitionMap.size() == 1) {
-        columnPartitionMetadata = columnPartitionMap.values().iterator().next();
-        int partitionIdStart = columnPartitionMetadata.getPartitionRanges().get(0).getMaximumInteger();
-        // int partitionIdEnd = columnPartitionMetadata.getPartitionRanges().get(0).getMaximumInteger();
-        return partitionIdStart;
-      }
+      return columnPartitionMap.values().iterator().next().getPartitions().iterator().next();
     }
     // If we use the table level replica group assignment, we can simply return the default partition number.
     return TABLE_LEVEL_PARTITION_NUMBER;
diff --git a/pinot-broker/src/test/java/org/apache/pinot/broker/pruner/SegmentZKMetadataPrunerTest.java b/pinot-broker/src/test/java/org/apache/pinot/broker/pruner/SegmentZKMetadataPrunerTest.java
index 98a8dc6..e345033 100644
--- a/pinot-broker/src/test/java/org/apache/pinot/broker/pruner/SegmentZKMetadataPrunerTest.java
+++ b/pinot-broker/src/test/java/org/apache/pinot/broker/pruner/SegmentZKMetadataPrunerTest.java
@@ -22,7 +22,6 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 import junit.framework.Assert;
-import org.apache.commons.lang.math.IntRange;
 import org.apache.pinot.common.metadata.segment.ColumnPartitionMetadata;
 import org.apache.pinot.common.metadata.segment.OfflineSegmentZKMetadata;
 import org.apache.pinot.common.metadata.segment.SegmentPartitionMetadata;
@@ -36,7 +35,6 @@ import org.testng.annotations.Test;
  * Unit test for {@link SegmentZKMetadataPruner}
  */
 public class SegmentZKMetadataPrunerTest {
-
   private static final int NUM_PARTITION = 11;
   private static final String PARTITION_COLUMN = "partition";
   private static final String PARTITION_FUNCTION_NAME = "modulo";
@@ -48,8 +46,8 @@ public class SegmentZKMetadataPrunerTest {
     Map<String, ColumnPartitionMetadata> columnPartitionMap = new HashMap<>();
 
     int expectedPartition = 3;
-    columnPartitionMap.put(PARTITION_COLUMN, new ColumnPartitionMetadata(PARTITION_FUNCTION_NAME, NUM_PARTITION,
-        Collections.singletonList(new IntRange(expectedPartition))));
+    columnPartitionMap.put(PARTITION_COLUMN,
+        new ColumnPartitionMetadata(PARTITION_FUNCTION_NAME, NUM_PARTITION, Collections.singleton(expectedPartition)));
 
     SegmentZKMetadataPrunerService prunerService = new SegmentZKMetadataPrunerService(new String[]{PRUNER_NAME});
     SegmentPartitionMetadata segmentPartitionMetadata = new SegmentPartitionMetadata(columnPartitionMap);
diff --git a/pinot-broker/src/test/java/org/apache/pinot/broker/routing/builder/PartitionAwareOfflineRoutingTableBuilderTest.java b/pinot-broker/src/test/java/org/apache/pinot/broker/routing/builder/PartitionAwareOfflineRoutingTableBuilderTest.java
index e95b8e5..71e984a 100644
--- a/pinot-broker/src/test/java/org/apache/pinot/broker/routing/builder/PartitionAwareOfflineRoutingTableBuilderTest.java
+++ b/pinot-broker/src/test/java/org/apache/pinot/broker/routing/builder/PartitionAwareOfflineRoutingTableBuilderTest.java
@@ -26,7 +26,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.Random;
 import java.util.Set;
-import org.apache.commons.lang.math.IntRange;
 import org.apache.helix.model.ExternalView;
 import org.apache.helix.model.InstanceConfig;
 import org.apache.pinot.broker.routing.RoutingTableLookupRequest;
@@ -313,8 +312,8 @@ public class PartitionAwareOfflineRoutingTableBuilderTest {
   private SegmentZKMetadata buildOfflineSegmentZKMetadata(String segmentName, int partition) {
     OfflineSegmentZKMetadata metadata = new OfflineSegmentZKMetadata();
     Map<String, ColumnPartitionMetadata> columnPartitionMap = new HashMap<>();
-    columnPartitionMap.put(PARTITION_COLUMN, new ColumnPartitionMetadata(PARTITION_FUNCTION_NAME, NUM_PARTITION,
-        Collections.singletonList(new IntRange(partition))));
+    columnPartitionMap.put(PARTITION_COLUMN,
+        new ColumnPartitionMetadata(PARTITION_FUNCTION_NAME, NUM_PARTITION, Collections.singleton(partition)));
     SegmentPartitionMetadata segmentPartitionMetadata = new SegmentPartitionMetadata(columnPartitionMap);
 
     metadata.setSegmentName(segmentName);
diff --git a/pinot-broker/src/test/java/org/apache/pinot/broker/routing/builder/PartitionAwareRealtimeRoutingTableBuilderTest.java b/pinot-broker/src/test/java/org/apache/pinot/broker/routing/builder/PartitionAwareRealtimeRoutingTableBuilderTest.java
index ad20aee..c08657f 100644
--- a/pinot-broker/src/test/java/org/apache/pinot/broker/routing/builder/PartitionAwareRealtimeRoutingTableBuilderTest.java
+++ b/pinot-broker/src/test/java/org/apache/pinot/broker/routing/builder/PartitionAwareRealtimeRoutingTableBuilderTest.java
@@ -26,7 +26,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.Random;
 import java.util.Set;
-import org.apache.commons.lang.math.IntRange;
 import org.apache.helix.model.ExternalView;
 import org.apache.helix.model.InstanceConfig;
 import org.apache.pinot.broker.routing.RoutingTableLookupRequest;
@@ -359,8 +358,8 @@ public class PartitionAwareRealtimeRoutingTableBuilderTest {
   private SegmentZKMetadata buildSegmentZKMetadata(String segmentName, int partition) {
     LLCRealtimeSegmentZKMetadata metadata = new LLCRealtimeSegmentZKMetadata();
     Map<String, ColumnPartitionMetadata> columnPartitionMap = new HashMap<>();
-    columnPartitionMap.put(PARTITION_COLUMN, new ColumnPartitionMetadata(PARTITION_FUNCTION_NAME, NUM_PARTITION,
-        Collections.singletonList(new IntRange(partition))));
+    columnPartitionMap.put(PARTITION_COLUMN,
+        new ColumnPartitionMetadata(PARTITION_FUNCTION_NAME, NUM_PARTITION, Collections.singleton(partition)));
     SegmentPartitionMetadata segmentPartitionMetadata = new SegmentPartitionMetadata(columnPartitionMap);
 
     metadata.setSegmentName(segmentName);
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/config/ColumnPartitionConfig.java b/pinot-common/src/main/java/org/apache/pinot/common/config/ColumnPartitionConfig.java
index e773172..6b42dc5 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/config/ColumnPartitionConfig.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/config/ColumnPartitionConfig.java
@@ -21,10 +21,7 @@ package org.apache.pinot.common.config;
 import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.google.common.base.Preconditions;
-import java.util.ArrayList;
-import java.util.List;
 import javax.annotation.Nonnull;
-import org.apache.commons.lang.math.IntRange;
 import org.apache.pinot.common.utils.EqualityUtils;
 
 
@@ -32,7 +29,6 @@ import org.apache.pinot.common.utils.EqualityUtils;
 @JsonIgnoreProperties(ignoreUnknown = true)
 public class ColumnPartitionConfig {
   public static final String PARTITION_VALUE_DELIMITER = ",";
-  public static final String PARTITIONER_DELIMITER = "\t\t";
 
   @ConfigKey("functionName")
   private String _functionName;
@@ -80,75 +76,16 @@ public class ColumnPartitionConfig {
     _numPartitions = numPartitions;
   }
 
-  /**
-   * Helper method to convert an array of ranges in string form (eg [2 3]) into a list
-   * of {@link IntRange}. Expects each string range to be formatted correctly.
-   *
-   * @param inputs Array of ranges in string form.
-   * @return List of IntRange's for the given input.
-   */
-  public static List<IntRange> rangesFromString(String[] inputs) {
-    List<IntRange> ranges = new ArrayList<>(inputs.length);
-    for (String input : inputs) {
-
-      String trimmed = input.trim();
-      String[] split = trimmed.split("\\s+");
-      String startString = split[0].substring(1, split[0].length());
-      String endString = split[1].substring(0, split[1].length() - 1);
-      ranges.add(new IntRange(Integer.parseInt(startString), Integer.parseInt(endString)));
-    }
-    return ranges;
-  }
-
-  /**
-   * Helper method to convert ranges (one or more) in string form (eg "[1 2],[3 4]") into a
-   * list of {@link IntRange}. Expects string is formatted correctly.
-   *
-   * @param input String representation of ranges.
-   * @return List of IntRange's for the specified string.
-   */
-  public static List<IntRange> rangesFromString(String input) {
-    return rangesFromString(input.split(PARTITION_VALUE_DELIMITER));
-  }
-
-  /**
-   * Helper method to convert a list of {@link IntRange} to a delimited string.
-   * The delimiter used is {@link #PARTITION_VALUE_DELIMITER}
-   * @param ranges List of ranges to be converted to String.
-   * @return String representation of the lis tof ranges.
-   */
-  public static String rangesToString(List<IntRange> ranges) {
-    StringBuilder builder = new StringBuilder();
-
-    for (int i = 0; i < ranges.size(); i++) {
-      builder.append("[");
-      IntRange range = ranges.get(i);
-
-      builder.append(range.getMinimumInteger());
-      builder.append(" ");
-      builder.append(range.getMaximumInteger());
-      builder.append("]");
-
-      if (i < ranges.size() - 1) {
-        builder.append(PARTITION_VALUE_DELIMITER);
-      }
-    }
-    return builder.toString();
-  }
-
   @Override
-  public boolean equals(Object o) {
-    if (EqualityUtils.isSameReference(this, o)) {
+  public boolean equals(Object obj) {
+    if (this == obj) {
       return true;
     }
-
-    if (EqualityUtils.isNullOrNotSameClass(this, o)) {
-      return false;
+    if (obj instanceof ColumnPartitionConfig) {
+      ColumnPartitionConfig that = (ColumnPartitionConfig) obj;
+      return _functionName.equals(that._functionName) && _numPartitions == that._numPartitions;
     }
-
-    ColumnPartitionConfig that = (ColumnPartitionConfig) o;
-    return EqualityUtils.isEqual(this._numPartitions, that._numPartitions) &&
-        EqualityUtils.isEqual(this._functionName, that._functionName);
+    return false;
   }
 
   @Override
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/metadata/segment/ColumnPartitionMetadata.java b/pinot-common/src/main/java/org/apache/pinot/common/metadata/segment/ColumnPartitionMetadata.java
index 86f4be5..0220c4f 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/metadata/segment/ColumnPartitionMetadata.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/metadata/segment/ColumnPartitionMetadata.java
@@ -19,99 +19,147 @@
 package org.apache.pinot.common.metadata.segment;
 
 import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
-import com.fasterxml.jackson.annotation.JsonProperty;
-import com.fasterxml.jackson.core.JsonGenerator;
 import com.fasterxml.jackson.core.JsonParser;
 import com.fasterxml.jackson.databind.DeserializationContext;
 import com.fasterxml.jackson.databind.JsonDeserializer;
-import com.fasterxml.jackson.databind.JsonSerializer;
-import com.fasterxml.jackson.databind.SerializerProvider;
+import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
-import com.fasterxml.jackson.databind.annotation.JsonSerialize;
 import java.io.IOException;
+import java.util.HashSet;
 import java.util.List;
-import java.util.Objects;
-import org.apache.commons.lang.math.IntRange;
-import org.apache.pinot.common.config.ColumnPartitionConfig;
-import org.apache.pinot.common.utils.EqualityUtils;
+import java.util.Set;
+import org.apache.commons.lang3.StringUtils;
 
 
 /**
  * Class for partition related column metadata:
  * <ul>
- *   <li> Partition function.</li>
- *   <li> Number of partitions. </li>
- *   <li> List of partition ranges. </li>
+ *   <li>Partition function</li>
+ *   <li>Number of total partitions</li>
+ *   <li>Set of partitions the column contains</li>
  * </ul>
  */
 @JsonIgnoreProperties(ignoreUnknown = true)
-public class ColumnPartitionMetadata extends ColumnPartitionConfig {
-
-  private final List<IntRange> _partitionRanges;
+@JsonDeserialize(using = ColumnPartitionMetadata.ColumnPartitionMetadataDeserializer.class)
+public class ColumnPartitionMetadata {
+  private final String _functionName;
+  private final int _numPartitions;
+  private final Set<Integer> _partitions;
 
   /**
    * Constructor for the class.
-   * @param functionName Name of the partition function.
-   * @param numPartitions Number of partitions for this column.
-   * @param partitionRanges Partition ranges for the column.
+   *
+   * @param functionName Name of the partition function
+   * @param numPartitions Number of total partitions for this column
+   * @param partitions Set of partitions the column contains
    */
-  public ColumnPartitionMetadata(@JsonProperty("functionName") String functionName,
-      @JsonProperty("numPartitions") int numPartitions,
-      @JsonProperty("partitionRanges") @JsonDeserialize(using = PartitionRangesDeserializer.class) List<IntRange> partitionRanges) {
-    super(functionName, numPartitions);
-    _partitionRanges = partitionRanges;
+  public ColumnPartitionMetadata(String functionName, int numPartitions, Set<Integer> partitions) {
+    _functionName = functionName;
+    _numPartitions = numPartitions;
+    _partitions = partitions;
   }
 
-  /**
-   * Returns the list of partition ranges.
-   *
-   * @return List of partition ranges.
-   */
-  @JsonSerialize(using = PartitionRangesSerializer.class)
-  public List<IntRange> getPartitionRanges() {
-    return _partitionRanges;
+  public String getFunctionName() {
+    return _functionName;
+  }
+
+  public int getNumPartitions() {
+    return _numPartitions;
+  }
+
+  public Set<Integer> getPartitions() {
+    return _partitions;
   }
 
   @Override
-  public boolean equals(Object o) {
-    if (this == o) {
+  public boolean equals(Object obj) {
+    if (this == obj) {
       return true;
     }
-    if (o == null || getClass() != o.getClass()) {
-      return false;
+    if (obj instanceof ColumnPartitionMetadata) {
+      ColumnPartitionMetadata that = (ColumnPartitionMetadata) obj;
+      return _functionName.equals(that._functionName) && _numPartitions == that._numPartitions && _partitions
+          .equals(that._partitions);
     }
-
-    ColumnPartitionMetadata that = (ColumnPartitionMetadata) o;
-    return super.equals(that) && Objects.equals(_partitionRanges, that._partitionRanges);
+    return false;
   }
 
   @Override
   public int hashCode() {
-    int hashCode = _partitionRanges != null ? _partitionRanges.hashCode() : 0;
-    return EqualityUtils.hashCodeOf(super.hashCode(), hashCode);
+    return 37 * 37 * _functionName.hashCode() + 37 * _numPartitions + _partitions.hashCode();
   }
 
   /**
-   * Custom Json serializer for list of IntRange's.
+   * Helper method to extract partitions from configuration.
+   * <p>
+   * There are two format of partition strings:
+   * <ul>
+   *   <li>Integer format: e.g. {@code "0"}</li>
+   *   <li>Range format (legacy): e.g. {@code "[0 5]"}</li>
+   *   TODO: remove range format once all segments use integer format
+   * </ul>
    */
-  public static class PartitionRangesSerializer extends JsonSerializer<List<IntRange>> {
+  public static Set<Integer> extractPartitions(List partitionList) {
+    Set<Integer> partitions = new HashSet<>();
+    for (Object o : partitionList) {
+      String partitionString = o.toString();
+      if (partitionString.charAt(0) == '[') {
+        // Range format
+        addRangeToPartitions(partitionString, partitions);
+      } else {
+        partitions.add(Integer.parseInt(partitionString));
+      }
+    }
+    return partitions;
+  }
 
-    @Override
-    public void serialize(List<IntRange> value, JsonGenerator jsonGenerator, SerializerProvider provider)
-        throws IOException {
-      jsonGenerator.writeString(ColumnPartitionConfig.rangesToString(value));
+  /**
+   * Helper method to add a partition range to a set of partitions.
+   */
+  private static void addRangeToPartitions(String rangeString, Set<Integer> partitions) {
+    int delimiterIndex = rangeString.indexOf(' ');
+    int start = Integer.parseInt(rangeString.substring(1, delimiterIndex));
+    int end = Integer.parseInt(rangeString.substring(delimiterIndex + 1, rangeString.length() - 1));
+    for (int i = start; i <= end; i++) {
+      partitions.add(i);
     }
   }
 
   /**
-   * Custom Json de-serializer for list of IntRange's.
+   * Custom deserializer for {@link ColumnPartitionMetadata}.
+   * <p>
+   * This deserializer understands the legacy range format: {@code "partitionRanges":"[0 0],[1 1]"}
+   * TODO: remove custom deserializer once all segments use integer format
    */
-  public static class PartitionRangesDeserializer extends JsonDeserializer<List<IntRange>> {
+  public static class ColumnPartitionMetadataDeserializer extends JsonDeserializer<ColumnPartitionMetadata> {
+    private static final String FUNCTION_NAME_KEY = "functionName";
+    private static final String NUM_PARTITIONS_KEY = "numPartitions";
+    private static final String PARTITIONS_KEY = "partitions";
+
+    // DO NOT CHANGE: for backward-compatibility
+    private static final String LEGACY_PARTITIONS_KEY = "partitionRanges";
+    private static final char LEGACY_PARTITION_DELIMITER = ',';
 
     @Override
-    public List<IntRange> deserialize(JsonParser jsonParser, DeserializationContext context)
+    public ColumnPartitionMetadata deserialize(JsonParser p, DeserializationContext ctxt)
         throws IOException {
-      return ColumnPartitionConfig.rangesFromString(jsonParser.getText());
+      JsonNode jsonMetadata = p.getCodec().readTree(p);
+      Set<Integer> partitions = new HashSet<>();
+      JsonNode jsonPartitions = jsonMetadata.get(PARTITIONS_KEY);
+      if (jsonPartitions != null) {
+        // Integer format: "partitions":[0,1,5]
+        for (JsonNode jsonPartition : jsonPartitions) {
+          partitions.add(jsonPartition.asInt());
+        }
+      } else {
+        // Legacy format: "partitionRanges":"[0 1],[5 5]"
+        String partitionRanges = jsonMetadata.get(LEGACY_PARTITIONS_KEY).asText();
+        for (String partitionRange : StringUtils.split(partitionRanges, LEGACY_PARTITION_DELIMITER)) {
+          addRangeToPartitions(partitionRange, partitions);
+        }
+      }
+      return new ColumnPartitionMetadata(jsonMetadata.get(FUNCTION_NAME_KEY).asText(),
+          jsonMetadata.get(NUM_PARTITIONS_KEY).asInt(), partitions);
     }
   }
 }
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/metadata/segment/SegmentPartitionMetadata.java b/pinot-common/src/main/java/org/apache/pinot/common/metadata/segment/SegmentPartitionMetadata.java
index 1c41990..bbbb4ab 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/metadata/segment/SegmentPartitionMetadata.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/metadata/segment/SegmentPartitionMetadata.java
@@ -18,16 +18,14 @@
  */
 package org.apache.pinot.common.metadata.segment;
 
-import com.fasterxml.jackson.annotation.JsonIgnore;
 import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.google.common.base.Preconditions;
 import java.io.IOException;
-import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
-import org.apache.commons.lang.math.IntRange;
 import org.apache.pinot.common.utils.JsonUtils;
 
 
@@ -73,25 +71,13 @@ public class SegmentPartitionMetadata {
   }
 
   /**
-   * Set the number of partitions for the specified column.
-   *
-   * @param column Column for which to set the number of partitions.
-   * @param numPartitions Number of partitions to set.
-   */
-  @JsonIgnore
-  public void setNumPartitions(String column, int numPartitions) {
-    ColumnPartitionMetadata columnPartitionMetadata = _columnPartitionMap.get(column);
-    columnPartitionMetadata.setNumPartitions(numPartitions);
-  }
-
-  /**
    * Given a JSON string, de-serialize and return an instance of {@link SegmentPartitionMetadata}
    *
    * @param jsonString Input JSON string
    * @return Instance of {@link SegmentPartitionMetadata} built from the input string.
-   * @throws IOException
    */
-  public static SegmentPartitionMetadata fromJsonString(String jsonString) throws IOException {
+  public static SegmentPartitionMetadata fromJsonString(String jsonString)
+      throws IOException {
     return JsonUtils.stringToObject(jsonString, SegmentPartitionMetadata.class);
   }
 
@@ -99,9 +85,9 @@ public class SegmentPartitionMetadata {
    * Returns the JSON equivalent of the object.
    *
    * @return JSON string equivalent of the object.
-   * @throws IOException
    */
-  public String toJsonString() throws IOException {
+  public String toJsonString()
+      throws IOException {
     return JsonUtils.objectToString(this);
   }
 
@@ -118,15 +104,14 @@ public class SegmentPartitionMetadata {
   }
 
   /**
-   * Returns the list of partition ranges for the specified column.
-   * Returns null if the column is not partitioned.
+   * Returns the set of partitions for the specified column, or {@code null} if the column is not partitioned.
    *
-   * @param column Column for which to return the list of partition ranges.
-   * @return List of partition ranges for the specified column.
+   * @param column Column for which to return the set of partitions
+   * @return Set of partitions for the specified column
    */
-  public List<IntRange> getPartitionRanges(String column) {
+  public Set<Integer> getPartitions(String column) {
     ColumnPartitionMetadata columnPartitionMetadata = _columnPartitionMap.get(column);
-    return (columnPartitionMetadata != null) ? columnPartitionMetadata.getPartitionRanges() : null;
+    return (columnPartitionMetadata != null) ? columnPartitionMetadata.getPartitions() : null;
   }
 
   @Override
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerGauge.java b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerGauge.java
index d57c02c..9eedb7c 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerGauge.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerGauge.java
@@ -44,7 +44,7 @@ public enum ServerGauge implements AbstractMetrics.Gauge {
   STREAM_PARTITION_OFFSET_LAG("messages", false),
   REALTIME_OFFHEAP_MEMORY_USED("bytes", false),
   RUNNING_QUERIES("runningQueries", false),
-  REALTIME_SEGMENT_PARTITION_WIDTH("realtimeSegmentPartitionWidth", false),
+  REALTIME_SEGMENT_NUM_PARTITIONS("realtimeSegmentNumPartitions", false),
   LLC_SIMULTANEOUS_SEGMENT_BUILDS("llcSimultaneousSegmentBuilds", true);
 
   private final String gaugeName;
diff --git a/pinot-common/src/test/java/org/apache/pinot/common/metadata/SegmentZKMetadataTest.java b/pinot-common/src/test/java/org/apache/pinot/common/metadata/SegmentZKMetadataTest.java
index f3fff4a..923b3b3 100644
--- a/pinot-common/src/test/java/org/apache/pinot/common/metadata/SegmentZKMetadataTest.java
+++ b/pinot-common/src/test/java/org/apache/pinot/common/metadata/SegmentZKMetadataTest.java
@@ -23,7 +23,6 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
-import org.apache.commons.lang.math.IntRange;
 import org.apache.helix.ZNRecord;
 import org.apache.pinot.common.metadata.segment.ColumnPartitionMetadata;
 import org.apache.pinot.common.metadata.segment.OfflineSegmentZKMetadata;
@@ -78,18 +77,19 @@ public class SegmentZKMetadataTest {
   @Test
   public void segmentPartitionMetadataTest()
       throws IOException {
-
     // Test for partition metadata serialization/de-serialization.
-    String expectedMetadataString =
-        "{\"columnPartitionMap\":{"
-            + "\"column1\":{\"functionName\":\"func1\",\"numPartitions\":7,\"partitionRanges\":\"[5 5],[7 7]\"},"
-            + "\"column2\":{\"functionName\":\"func2\",\"numPartitions\":11,\"partitionRanges\":\"[11 11],[13 13]\"}}}";
-
+    String legacyMetadataString = "{\"columnPartitionMap\":{"
+        + "\"column1\":{\"functionName\":\"func1\",\"numPartitions\":7,\"partitionRanges\":\"[5 5],[7 7]\"},"
+        + "\"column2\":{\"functionName\":\"func2\",\"numPartitions\":11,\"partitionRanges\":\"[11 11],[13 13]\"}}}";
+    String expectedMetadataString = "{\"columnPartitionMap\":{"
+        + "\"column1\":{\"functionName\":\"func1\",\"numPartitions\":7,\"partitions\":[5,7]},"
+        + "\"column2\":{\"functionName\":\"func2\",\"numPartitions\":11,\"partitions\":[11,13]}}}";
+    assertEquals(SegmentPartitionMetadata.fromJsonString(legacyMetadataString).toJsonString(), expectedMetadataString);
     assertEquals(SegmentPartitionMetadata.fromJsonString(expectedMetadataString).toJsonString(),
         expectedMetadataString);
 
     Map<String, ColumnPartitionMetadata> columnPartitionMetadataMap = new HashMap<>();
-    columnPartitionMetadataMap.put("column", new ColumnPartitionMetadata("foo", 7, Collections.singletonList(new IntRange(11))));
+    columnPartitionMetadataMap.put("column", new ColumnPartitionMetadata("foo", 7, Collections.singleton(11)));
     SegmentPartitionMetadata expectedPartitionMetadata = new SegmentPartitionMetadata(columnPartitionMetadataMap);
 
     // Test partition metadata in OfflineSegmentZkMetadata
diff --git a/pinot-common/src/test/java/org/apache/pinot/common/metadata/segment/ColumnPartitionMetadataTest.java b/pinot-common/src/test/java/org/apache/pinot/common/metadata/segment/ColumnPartitionMetadataTest.java
new file mode 100644
index 0000000..bfdce39
--- /dev/null
+++ b/pinot-common/src/test/java/org/apache/pinot/common/metadata/segment/ColumnPartitionMetadataTest.java
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.common.metadata.segment;
+
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Set;
+import org.apache.commons.configuration.PropertiesConfiguration;
+import org.apache.pinot.common.utils.JsonUtils;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.assertEquals;
+
+
+public class ColumnPartitionMetadataTest {
+  private static final String FUNCTION_NAME = "murmur";
+  private static final int NUM_PARTITIONS = 10;
+  private static final Set<Integer> PARTITIONS = new HashSet<>(Arrays.asList(1, 2, 3, 5));
+  private static final String LEGACY_METADATA_JSON =
+      "{\"functionName\":\"murmur\",\"numPartitions\":10,\"partitionRanges\":\"[1 3],[5 5]\"}";
+  private static final String LEGACY_PARTITION_RANGES_STRING = "[1 3],[5 5]";
+
+  @Test
+  public void testSerDe()
+      throws Exception {
+    ColumnPartitionMetadata expected = new ColumnPartitionMetadata(FUNCTION_NAME, NUM_PARTITIONS, PARTITIONS);
+    ColumnPartitionMetadata actual =
+        JsonUtils.stringToObject(JsonUtils.objectToString(expected), ColumnPartitionMetadata.class);
+    assertEquals(actual, expected);
+  }
+
+  @Test
+  public void testLegacyMetadataDeserialization()
+      throws Exception {
+    ColumnPartitionMetadata expected = new ColumnPartitionMetadata(FUNCTION_NAME, NUM_PARTITIONS, PARTITIONS);
+    ColumnPartitionMetadata actual = JsonUtils.stringToObject(LEGACY_METADATA_JSON, ColumnPartitionMetadata.class);
+    assertEquals(actual, expected);
+  }
+
+  @Test
+  public void testPartitionsConfig() {
+    PropertiesConfiguration config = new PropertiesConfiguration();
+    config.setProperty("partitions", PARTITIONS);
+    Set<Integer> actual = ColumnPartitionMetadata.extractPartitions(config.getList("partitions"));
+    assertEquals(actual, PARTITIONS);
+  }
+
+  @Test
+  public void testLegacyPartitionRangesConfig() {
+    PropertiesConfiguration config = new PropertiesConfiguration();
+    config.setProperty("partitionRanges", LEGACY_PARTITION_RANGES_STRING);
+    Set<Integer> actual = ColumnPartitionMetadata.extractPartitions(config.getList("partitionRanges"));
+    assertEquals(actual, PARTITIONS);
+  }
+}
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
index 6786a7b..cdb0b74 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
@@ -46,7 +46,6 @@ import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.io.FileUtils;
-import org.apache.commons.lang.math.IntRange;
 import org.apache.helix.AccessOption;
 import org.apache.helix.HelixAdmin;
 import org.apache.helix.HelixManager;
@@ -290,7 +289,7 @@ public class PinotLLCRealtimeSegmentManager {
         ColumnPartitionConfig columnPartitionConfig = entry.getValue();
         partitionMetadataMap.put(column,
             new ColumnPartitionMetadata(columnPartitionConfig.getFunctionName(), numPartitions,
-                Collections.singletonList(new IntRange(partitionId))));
+                Collections.singleton(partitionId)));
       }
       partitionMetadata = new SegmentPartitionMetadata(partitionMetadataMap);
     }
@@ -309,10 +308,10 @@ public class PinotLLCRealtimeSegmentManager {
       String columnName = entry.getKey();
       ColumnMetadata columnMetadata = entry.getValue();
       // Check if the column metadata contains the partition information
-      if (columnMetadata.getPartitionFunction() != null && columnMetadata.getPartitionRanges() != null) {
+      if (columnMetadata.getPartitionFunction() != null) {
         partitionMetadataMap.put(columnName,
             new ColumnPartitionMetadata(columnMetadata.getPartitionFunction().toString(),
-                columnMetadata.getNumPartitions(), columnMetadata.getPartitionRanges()));
+                columnMetadata.getNumPartitions(), columnMetadata.getPartitions()));
       }
     }
     return new SegmentPartitionMetadata(partitionMetadataMap);
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/sharding/ReplicaGroupSegmentAssignmentStrategy.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/sharding/ReplicaGroupSegmentAssignmentStrategy.java
index 1cd3eca..8f58c1b 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/sharding/ReplicaGroupSegmentAssignmentStrategy.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/sharding/ReplicaGroupSegmentAssignmentStrategy.java
@@ -67,16 +67,13 @@ public class ReplicaGroupSegmentAssignmentStrategy implements SegmentAssignmentS
         tableConfig.getValidationConfig().getReplicaGroupStrategyConfig();
     boolean mirrorAssignmentAcrossReplicaGroups = replicaGroupStrategyConfig.getMirrorAssignmentAcrossReplicaGroups();
 
-    String partitionColumn = replicaGroupStrategyConfig.getPartitionColumn();
-
     int partitionNumber = 0;
+    String partitionColumn = replicaGroupStrategyConfig.getPartitionColumn();
     if (partitionColumn != null) {
-      // TODO: Need to address when we have multiple partition numbers.
+      // TODO: support multiple partitions
       partitionNumber =
-          ((SegmentMetadataImpl) segmentMetadata).getColumnMetadataFor(replicaGroupStrategyConfig.getPartitionColumn())
-              .getPartitionRanges()
-              .get(0)
-              .getMaximumInteger();
+          ((SegmentMetadataImpl) segmentMetadata).getColumnMetadataFor(partitionColumn).getPartitions().iterator()
+              .next();
     }
 
     // Perform the segment assignment.
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/util/ZKMetadataUtils.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/util/ZKMetadataUtils.java
index b56ac38..884fbf0 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/util/ZKMetadataUtils.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/util/ZKMetadataUtils.java
@@ -61,7 +61,7 @@ public class ZKMetadataUtils {
         if (partitionFunction != null) {
           ColumnPartitionMetadata columnPartitionMetadata =
               new ColumnPartitionMetadata(partitionFunction.toString(), columnMetadata.getNumPartitions(),
-                  columnMetadata.getPartitionRanges());
+                  columnMetadata.getPartitions());
           columnPartitionMap.put(column, columnPartitionMetadata);
         }
       }
diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/utils/SegmentMetadataMockUtils.java b/pinot-controller/src/test/java/org/apache/pinot/controller/utils/SegmentMetadataMockUtils.java
index 77ad7d6..0c4ade0 100644
--- a/pinot-controller/src/test/java/org/apache/pinot/controller/utils/SegmentMetadataMockUtils.java
+++ b/pinot-controller/src/test/java/org/apache/pinot/controller/utils/SegmentMetadataMockUtils.java
@@ -18,10 +18,9 @@
  */
 package org.apache.pinot.controller.utils;
 
-import java.util.ArrayList;
-import java.util.List;
+import java.util.Collections;
+import java.util.Set;
 import java.util.concurrent.TimeUnit;
-import org.apache.commons.lang.math.IntRange;
 import org.apache.pinot.common.metadata.segment.RealtimeSegmentZKMetadata;
 import org.apache.pinot.common.segment.SegmentMetadata;
 import org.apache.pinot.core.segment.index.ColumnMetadata;
@@ -74,9 +73,8 @@ public class SegmentMetadataMockUtils {
   public static SegmentMetadata mockSegmentMetadataWithPartitionInfo(String tableName, String segmentName,
       String columnName, int partitionNumber) {
     ColumnMetadata columnMetadata = mock(ColumnMetadata.class);
-    List<IntRange> partitionRanges = new ArrayList<>();
-    partitionRanges.add(new IntRange(partitionNumber));
-    when(columnMetadata.getPartitionRanges()).thenReturn(partitionRanges);
+    Set<Integer> partitions = Collections.singleton(partitionNumber);
+    when(columnMetadata.getPartitions()).thenReturn(partitions);
 
     SegmentMetadataImpl segmentMetadata = mock(SegmentMetadataImpl.class);
     if (columnName != null) {
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/pruner/PartitionSegmentPruner.java b/pinot-core/src/main/java/org/apache/pinot/core/query/pruner/PartitionSegmentPruner.java
index d3f8c86..635ca12 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/pruner/PartitionSegmentPruner.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/pruner/PartitionSegmentPruner.java
@@ -22,7 +22,6 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import org.apache.commons.configuration.Configuration;
-import org.apache.commons.lang.math.IntRange;
 import org.apache.pinot.common.request.FilterOperator;
 import org.apache.pinot.common.utils.request.FilterQueryTree;
 import org.apache.pinot.core.data.partition.PartitionFunction;
@@ -95,25 +94,19 @@ public class PartitionSegmentPruner extends AbstractSegmentPruner {
 
     // Leaf node
     String column = filterQueryTree.getColumn();
-    ColumnMetadata metadata = columnMetadataMap.get(column);
-    if (metadata == null) {
-      return false;
+    ColumnMetadata columnMetadata = columnMetadataMap.get(column);
+    // NOTE: should have already been pruned in DataSchemaSegmentPruner
+    if (columnMetadata == null) {
+      return true;
     }
 
-    List<IntRange> partitionRanges = metadata.getPartitionRanges();
-    if (partitionRanges == null || partitionRanges.isEmpty()) {
+    PartitionFunction partitionFunction = columnMetadata.getPartitionFunction();
+    if (partitionFunction != null) {
+      Comparable value = getValue(filterQueryTree.getValue().get(0), columnMetadata.getDataType());
+      int partition = partitionFunction.getPartition(value);
+      return !columnMetadata.getPartitions().contains(partition);
+    } else {
       return false;
     }
-
-    Comparable value = getValue(filterQueryTree.getValue().get(0), metadata.getDataType());
-    PartitionFunction partitionFunction = metadata.getPartitionFunction();
-    int partition = partitionFunction.getPartition(value);
-
-    for (IntRange partitionRange : partitionRanges) {
-      if (partitionRange.containsInteger(partition)) {
-        return false;
-      }
-    }
-    return true;
   }
 }
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/realtime/converter/RealtimeSegmentConverter.java b/pinot-core/src/main/java/org/apache/pinot/core/realtime/converter/RealtimeSegmentConverter.java
index ed296f6..084595d 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/realtime/converter/RealtimeSegmentConverter.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/realtime/converter/RealtimeSegmentConverter.java
@@ -134,9 +134,8 @@ public class RealtimeSegmentConverter {
     if (segmentPartitionConfig != null && segmentPartitionConfig.getColumnPartitionMap() != null) {
       Map<String, ColumnPartitionConfig> columnPartitionMap = segmentPartitionConfig.getColumnPartitionMap();
       for (String columnName : columnPartitionMap.keySet()) {
-        int partitionRangeWidth = driver.getSegmentStats().getColumnProfileFor(columnName).getPartitionRangeWidth();
-        serverMetrics.addValueToTableGauge(tableName, ServerGauge.REALTIME_SEGMENT_PARTITION_WIDTH,
-            partitionRangeWidth);
+        int numPartitions = driver.getSegmentStats().getColumnProfileFor(columnName).getPartitions().size();
+        serverMetrics.addValueToTableGauge(tableName, ServerGauge.REALTIME_SEGMENT_NUM_PARTITIONS, numPartitions);
       }
     }
   }
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/realtime/converter/stats/RealtimeColumnStatistics.java b/pinot-core/src/main/java/org/apache/pinot/core/realtime/converter/stats/RealtimeColumnStatistics.java
index 565132b..714a0a9 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/realtime/converter/stats/RealtimeColumnStatistics.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/realtime/converter/stats/RealtimeColumnStatistics.java
@@ -18,9 +18,8 @@
  */
 package org.apache.pinot.core.realtime.converter.stats;
 
-import java.util.Arrays;
-import java.util.List;
-import org.apache.commons.lang.math.IntRange;
+import java.util.HashSet;
+import java.util.Set;
 import org.apache.pinot.common.config.ColumnPartitionConfig;
 import org.apache.pinot.common.data.FieldSpec;
 import org.apache.pinot.core.common.Block;
@@ -45,8 +44,7 @@ public class RealtimeColumnStatistics implements ColumnStatistics {
   private final Block _block;
   private PartitionFunction _partitionFunction;
   private int _numPartitions;
-  private int _partitionRangeStart = Integer.MAX_VALUE;
-  private int _partitionRangeEnd = Integer.MIN_VALUE;
+  private Set<Integer> _partitions;
 
   public RealtimeColumnStatistics(ColumnDataSource dataSource, int[] sortedDocIdIterationOrder,
       ColumnPartitionConfig columnPartitionConfig) {
@@ -60,7 +58,12 @@ public class RealtimeColumnStatistics implements ColumnStatistics {
       _partitionFunction =
           (functionName != null) ? PartitionFunctionFactory.getPartitionFunction(functionName, _numPartitions) : null;
       if (_partitionFunction != null) {
-        updatePartition();
+        // Iterate over the dictionary to check the partitioning
+        _partitions = new HashSet<>();
+        int length = _dictionaryReader.length();
+        for (int i = 0; i < length; i++) {
+          _partitions.add(_partitionFunction.getPartition(_dictionaryReader.get(i)));
+        }
       }
     }
   }
@@ -203,36 +206,7 @@ public class RealtimeColumnStatistics implements ColumnStatistics {
   }
 
   @Override
-  public List<IntRange> getPartitionRanges() {
-    if (_partitionRangeStart <= _partitionRangeEnd) {
-      return Arrays.asList(new IntRange(_partitionRangeStart, _partitionRangeEnd));
-    } else {
-      return null;
-    }
-  }
-
-  /**
-   * Update partition ranges based on column values.
-   *
-   */
-  void updatePartition() {
-    // Iterate over the dictionary to check the partitioning
-    final int length = _dictionaryReader.length();
-    for (int i = 0; i < length; i++) {
-      int partition = _partitionFunction.getPartition(_dictionaryReader.get(i));
-
-      if (partition < _partitionRangeStart) {
-        _partitionRangeStart = partition;
-      }
-
-      if (partition > _partitionRangeEnd) {
-        _partitionRangeEnd = partition;
-      }
-    }
-  }
-
-  @Override
-  public int getPartitionRangeWidth() {
-    return _partitionRangeEnd - _partitionRangeStart + 1;
+  public Set<Integer> getPartitions() {
+    return _partitions;
   }
 }
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/realtime/converter/stats/RealtimeNoDictionaryColStatistics.java b/pinot-core/src/main/java/org/apache/pinot/core/realtime/converter/stats/RealtimeNoDictionaryColStatistics.java
index 1615558..ea85788 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/realtime/converter/stats/RealtimeNoDictionaryColStatistics.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/realtime/converter/stats/RealtimeNoDictionaryColStatistics.java
@@ -18,8 +18,7 @@
  */
 package org.apache.pinot.core.realtime.converter.stats;
 
-import java.util.List;
-import org.apache.commons.lang.math.IntRange;
+import java.util.Set;
 import org.apache.pinot.core.common.Block;
 import org.apache.pinot.core.common.BlockValSet;
 import org.apache.pinot.core.data.partition.PartitionFunction;
@@ -28,6 +27,7 @@ import org.apache.pinot.core.segment.index.data.source.ColumnDataSource;
 
 import static org.apache.pinot.core.common.Constants.UNKNOWN_CARDINALITY;
 
+
 public class RealtimeNoDictionaryColStatistics implements ColumnStatistics {
 
   final BlockValSet _blockValSet;
@@ -102,15 +102,10 @@ public class RealtimeNoDictionaryColStatistics implements ColumnStatistics {
   }
 
   @Override
-  public List<IntRange> getPartitionRanges() {
+  public Set<Integer> getPartitions() {
     return null;
   }
 
-  @Override
-  public int getPartitionRangeWidth() {
-    return 0;
-  }
-
   private int lengthOfDataType() {
     switch (_blockValSet.getValueType()) {
       case INT:
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/ColumnIndexCreationInfo.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/ColumnIndexCreationInfo.java
index 2755743..2d6b860 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/ColumnIndexCreationInfo.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/ColumnIndexCreationInfo.java
@@ -18,8 +18,7 @@
  */
 package org.apache.pinot.core.segment.creator;
 
-import java.util.List;
-import org.apache.commons.lang.math.IntRange;
+import java.util.Set;
 import org.apache.commons.lang3.ArrayUtils;
 import org.apache.pinot.core.common.Constants;
 import org.apache.pinot.core.data.partition.PartitionFunction;
@@ -103,8 +102,8 @@ public class ColumnIndexCreationInfo {
     return columnStatistics.getLengthOfLargestElement();
   }
 
-  public List<IntRange> getPartitionRanges() {
-    return columnStatistics.getPartitionRanges();
+  public Set<Integer> getPartitions() {
+    return columnStatistics.getPartitions();
   }
 
   public PartitionFunction getPartitionFunction() {
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/ColumnStatistics.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/ColumnStatistics.java
index d497002eb..e80838f 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/ColumnStatistics.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/ColumnStatistics.java
@@ -18,8 +18,7 @@
  */
 package org.apache.pinot.core.segment.creator;
 
-import java.util.List;
-import org.apache.commons.lang.math.IntRange;
+import java.util.Set;
 import org.apache.pinot.core.data.partition.PartitionFunction;
 
 
@@ -86,10 +85,5 @@ public interface ColumnStatistics {
 
     int getNumPartitions();
 
-    List<IntRange> getPartitionRanges();
-
-    /**
-     * Returns the width of the partition range for this column, used when doing per-column partitioning.
-     */
-    int getPartitionRangeWidth();
+    Set<Integer> getPartitions();
 }
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/SegmentColumnarIndexCreator.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/SegmentColumnarIndexCreator.java
index 87a9493..76d4652 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/SegmentColumnarIndexCreator.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/SegmentColumnarIndexCreator.java
@@ -24,13 +24,10 @@ import java.io.IOException;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.HashSet;
-import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import org.apache.commons.configuration.ConfigurationException;
 import org.apache.commons.configuration.PropertiesConfiguration;
-import org.apache.commons.lang.math.IntRange;
-import org.apache.pinot.common.config.ColumnPartitionConfig;
 import org.apache.pinot.common.data.DateTimeFieldSpec;
 import org.apache.pinot.common.data.FieldSpec;
 import org.apache.pinot.common.data.FieldSpec.FieldType;
@@ -417,14 +414,13 @@ public class SegmentColumnarIndexCreator implements SegmentCreator {
         String.valueOf(columnIndexCreationInfo.isAutoGenerated()));
 
     PartitionFunction partitionFunction = columnIndexCreationInfo.getPartitionFunction();
-    int numPartitions = columnIndexCreationInfo.getNumPartitions();
-    List<IntRange> partitionRanges = columnIndexCreationInfo.getPartitionRanges();
-    if (partitionFunction != null && partitionRanges != null) {
+    if (partitionFunction != null) {
       properties.setProperty(V1Constants.MetadataKeys.Column.getKeyFor(column, PARTITION_FUNCTION),
           partitionFunction.toString());
-      properties.setProperty(V1Constants.MetadataKeys.Column.getKeyFor(column, NUM_PARTITIONS), numPartitions);
-      String partitionValues = ColumnPartitionConfig.rangesToString(partitionRanges);
-      properties.setProperty(V1Constants.MetadataKeys.Column.getKeyFor(column, PARTITION_VALUES), partitionValues);
+      properties.setProperty(V1Constants.MetadataKeys.Column.getKeyFor(column, NUM_PARTITIONS),
+          columnIndexCreationInfo.getNumPartitions());
+      properties.setProperty(V1Constants.MetadataKeys.Column.getKeyFor(column, PARTITION_VALUES),
+          columnIndexCreationInfo.getPartitions());
     }
 
     // datetime field
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/stats/AbstractColumnStatisticsCollector.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/stats/AbstractColumnStatisticsCollector.java
index f1e2ead..a1ae75f 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/stats/AbstractColumnStatisticsCollector.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/stats/AbstractColumnStatisticsCollector.java
@@ -18,10 +18,9 @@
  */
 package org.apache.pinot.core.segment.creator.impl.stats;
 
-import java.util.Arrays;
-import java.util.List;
+import java.util.HashSet;
+import java.util.Set;
 import org.apache.avro.reflect.Nullable;
-import org.apache.commons.lang.math.IntRange;
 import org.apache.pinot.common.data.FieldSpec;
 import org.apache.pinot.common.data.FieldSpec.DataType;
 import org.apache.pinot.core.data.partition.PartitionFunction;
@@ -40,7 +39,6 @@ import org.apache.pinot.core.segment.creator.impl.V1Constants;
  * compute max
  * see if column isSorted
  */
-
 public abstract class AbstractColumnStatisticsCollector implements ColumnStatistics {
   protected static final int INITIAL_HASH_SET_SIZE = 1000;
   private Object previousValue = null;
@@ -50,11 +48,9 @@ public abstract class AbstractColumnStatisticsCollector implements ColumnStatist
 
   protected int totalNumberOfEntries = 0;
   protected int maxNumberOfMultiValues = 0;
-  private int numInputNullValues = 0;  // Number of rows in which this column was null in the input.
   private PartitionFunction partitionFunction;
   private final int numPartitions;
-  private int partitionRangeStart = Integer.MAX_VALUE;
-  private int partitionRangeEnd = Integer.MIN_VALUE;
+  private final Set<Integer> _partitions;
 
   void updateTotalNumberOfEntries(Object[] entries) {
     totalNumberOfEntries += entries.length;
@@ -69,6 +65,11 @@ public abstract class AbstractColumnStatisticsCollector implements ColumnStatist
     fieldSpec = statsCollectorConfig.getFieldSpecForColumn(column);
     partitionFunction = statsCollectorConfig.getPartitionFunction(column);
     numPartitions = statsCollectorConfig.getNumPartitions(column);
+    if (partitionFunction != null) {
+      _partitions = new HashSet<>();
+    } else {
+      _partitions = null;
+    }
     addressNull(previousValue, fieldSpec.getDataType());
     previousValue = null;
   }
@@ -167,17 +168,13 @@ public abstract class AbstractColumnStatisticsCollector implements ColumnStatist
   }
 
   /**
-   * Returns the partition range within which the column values exist.
+   * Returns the partitions within which the column values exist.
    *
    * @return List of ranges for the column values.
    */
   @Nullable
-  public List<IntRange> getPartitionRanges() {
-    if (partitionRangeStart <= partitionRangeEnd) {
-      return Arrays.asList(new IntRange(partitionRangeStart, partitionRangeEnd));
-    } else {
-      return null;
-    }
+  public Set<Integer> getPartitions() {
+    return _partitions;
   }
 
   /**
@@ -187,19 +184,7 @@ public abstract class AbstractColumnStatisticsCollector implements ColumnStatist
    */
   protected void updatePartition(Object value) {
     if (partitionFunction != null) {
-      int partition = partitionFunction.getPartition(value);
-
-      if (partition < partitionRangeStart) {
-        partitionRangeStart = partition;
-      }
-      if (partition > partitionRangeEnd) {
-        partitionRangeEnd = partition;
-      }
+      _partitions.add(partitionFunction.getPartition(value));
     }
   }
-
-  @Override
-  public int getPartitionRangeWidth() {
-    return partitionRangeEnd - partitionRangeStart + 1;
-  }
 }
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/stats/SegmentPreIndexStatsCollectorImpl.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/stats/SegmentPreIndexStatsCollectorImpl.java
index 1a80dc7..860b046 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/stats/SegmentPreIndexStatsCollectorImpl.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/stats/SegmentPreIndexStatsCollectorImpl.java
@@ -156,7 +156,7 @@ public class SegmentPreIndexStatsCollectorImpl implements SegmentPreIndexStatsCo
         LOGGER.info("column type : " + _statsCollectorConfig.getSchema().getFieldSpecFor(column).getDataType());
 
         if (statisticsCollector.getPartitionFunction() != null) {
-          LOGGER.info("min partition value: " + statisticsCollector.getPartitionRanges().toString());
+          LOGGER.info("partitions: " + statisticsCollector.getPartitions().toString());
         }
         LOGGER.info("***********************************************");
       }
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/ColumnMetadata.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/ColumnMetadata.java
index 3105390..1a37ffa 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/ColumnMetadata.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/ColumnMetadata.java
@@ -20,12 +20,10 @@ package org.apache.pinot.core.segment.index;
 
 import com.fasterxml.jackson.annotation.JsonProperty;
 import java.lang.reflect.Field;
-import java.util.List;
+import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import org.apache.commons.configuration.PropertiesConfiguration;
 import org.apache.commons.lang.StringEscapeUtils;
-import org.apache.commons.lang.math.IntRange;
-import org.apache.pinot.common.config.ColumnPartitionConfig;
 import org.apache.pinot.common.data.DateTimeFieldSpec;
 import org.apache.pinot.common.data.DimensionFieldSpec;
 import org.apache.pinot.common.data.FieldSpec;
@@ -34,6 +32,7 @@ import org.apache.pinot.common.data.FieldSpec.FieldType;
 import org.apache.pinot.common.data.MetricFieldSpec;
 import org.apache.pinot.common.data.MetricFieldSpec.DerivedMetricType;
 import org.apache.pinot.common.data.TimeFieldSpec;
+import org.apache.pinot.common.metadata.segment.ColumnPartitionMetadata;
 import org.apache.pinot.core.data.partition.PartitionFunction;
 import org.apache.pinot.core.data.partition.PartitionFunctionFactory;
 import org.apache.pinot.core.segment.creator.impl.V1Constants;
@@ -81,7 +80,7 @@ public class ColumnMetadata {
   private final Comparable maxValue;
   private final PartitionFunction partitionFunction;
   private final int numPartitions;
-  private final List<IntRange> partitionRanges;
+  private final Set<Integer> _partitions;
   private final String dateTimeFormat;
   private final String dateTimeGranularity;
 
@@ -190,9 +189,8 @@ public class ColumnMetadata {
           PartitionFunctionFactory.getPartitionFunction(partitionFunctionName, numPartitions);
       builder.setPartitionFunction(partitionFunction);
       builder.setNumPartitions(numPartitions);
-
-      String[] valueString = config.getStringArray(getKeyFor(column, V1Constants.MetadataKeys.Column.PARTITION_VALUES));
-      builder.setPartitionValues(ColumnPartitionConfig.rangesFromString(valueString));
+      builder.setPartitions(ColumnPartitionMetadata
+          .extractPartitions(config.getList(getKeyFor(column, V1Constants.MetadataKeys.Column.PARTITION_VALUES))));
     }
 
     return builder.build();
@@ -206,8 +204,8 @@ public class ColumnMetadata {
     return numPartitions;
   }
 
-  public List<IntRange> getPartitionRanges() {
-    return partitionRanges;
+  public Set<Integer> getPartitions() {
+    return _partitions;
   }
 
   public static class Builder {
@@ -238,8 +236,8 @@ public class ColumnMetadata {
     private Comparable minValue;
     private Comparable maxValue;
     private PartitionFunction partitionFunction;
-    private List<IntRange> partitionValues = null;
     private int numPartitions;
+    private Set<Integer> _partitions;
     private String dateTimeFormat;
     private String dateTimeGranularity;
 
@@ -382,8 +380,8 @@ public class ColumnMetadata {
       this.numPartitions = numPartitions;
     }
 
-    public Builder setPartitionValues(List<IntRange> partitionValues) {
-      this.partitionValues = partitionValues;
+    public Builder setPartitions(Set<Integer> partitions) {
+      _partitions = partitions;
       return this;
     }
 
@@ -400,9 +398,9 @@ public class ColumnMetadata {
     public ColumnMetadata build() {
       return new ColumnMetadata(columnName, cardinality, totalDocs, totalRawDocs, totalAggDocs, dataType,
           bitsPerElement, columnMaxLength, fieldType, isSorted, containsNulls, hasDictionary, hasInvertedIndex,
-          isSingleValue, maxNumberOfMultiValues, totalNumberOfEntries, isAutoGenerated, isVirtual, defaultNullValueString,
-          timeUnit, paddingCharacter, derivedMetricType, fieldSize, originColumnName, minValue, maxValue,
-          partitionFunction, numPartitions, partitionValues, dateTimeFormat, dateTimeGranularity);
+          isSingleValue, maxNumberOfMultiValues, totalNumberOfEntries, isAutoGenerated, isVirtual,
+          defaultNullValueString, timeUnit, paddingCharacter, derivedMetricType, fieldSize, originColumnName, minValue,
+          maxValue, partitionFunction, numPartitions, _partitions, dateTimeFormat, dateTimeGranularity);
     }
   }
 
@@ -412,7 +410,7 @@ public class ColumnMetadata {
       int maxNumberOfMultiValues, int totalNumberOfEntries, boolean isAutoGenerated, boolean isVirtual,
       String defaultNullValueString, TimeUnit timeUnit, char paddingCharacter, DerivedMetricType derivedMetricType,
       int fieldSize, String originColumnName, Comparable minValue, Comparable maxValue,
-      PartitionFunction partitionFunction, int numPartitions, List<IntRange> partitionRanges, String dateTimeFormat,
+      PartitionFunction partitionFunction, int numPartitions, Set<Integer> partitions, String dateTimeFormat,
       String dateTimeGranularity) {
     this.columnName = columnName;
     this.cardinality = cardinality;
@@ -442,7 +440,7 @@ public class ColumnMetadata {
     this.maxValue = maxValue;
     this.partitionFunction = partitionFunction;
     this.numPartitions = numPartitions;
-    this.partitionRanges = partitionRanges;
+    _partitions = partitions;
     this.dateTimeFormat = dateTimeFormat;
     this.dateTimeGranularity = dateTimeGranularity;
 
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/SegmentMetadataImpl.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/SegmentMetadataImpl.java
index dcc5485..71e39b7 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/SegmentMetadataImpl.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/SegmentMetadataImpl.java
@@ -275,7 +275,7 @@ public class SegmentMetadataImpl implements SegmentMetadata {
    */
   private static void addPhysicalColumns(List src, Collection<String> dest) {
     for (Object o : src) {
-      String column = (String) o;
+      String column = o.toString();
       if (!column.isEmpty() && column.charAt(0) != '$') {
         dest.add(column);
       }
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/loader/defaultcolumn/DefaultColumnStatistics.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/loader/defaultcolumn/DefaultColumnStatistics.java
index e9c02ff..4b7e993 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/loader/defaultcolumn/DefaultColumnStatistics.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/loader/defaultcolumn/DefaultColumnStatistics.java
@@ -18,8 +18,7 @@
  */
 package org.apache.pinot.core.segment.index.loader.defaultcolumn;
 
-import java.util.List;
-import org.apache.commons.lang.math.IntRange;
+import java.util.Set;
 import org.apache.pinot.core.data.partition.PartitionFunction;
 import org.apache.pinot.core.segment.creator.ColumnStatistics;
 
@@ -36,10 +35,6 @@ public class DefaultColumnStatistics implements ColumnStatistics {
   private final int _totalNumberOfEntries;
   private final int _maxNumberOfMultiValues;
   private final boolean _hasNull = false;
-  private final PartitionFunction _partitionFunction = null;
-  private final int _numPartitions = -1;
-  private final List<IntRange> _partitionRanges = null;
-  private final int _partitionRangeWidth = -1;
 
   public DefaultColumnStatistics(
       Object minValue,
@@ -109,21 +104,16 @@ public class DefaultColumnStatistics implements ColumnStatistics {
 
   @Override
   public PartitionFunction getPartitionFunction() {
-    return _partitionFunction;
+    return null;
   }
 
   @Override
   public int getNumPartitions() {
-    return _numPartitions;
+    return 0;
   }
 
   @Override
-  public List<IntRange> getPartitionRanges() {
-    return _partitionRanges;
-  }
-
-  @Override
-  public int getPartitionRangeWidth() {
-    return _partitionRangeWidth;
+  public Set<Integer> getPartitions() {
+    return null;
   }
 }
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/segment/index/creator/SegmentPartitionTest.java b/pinot-core/src/test/java/org/apache/pinot/core/segment/index/creator/SegmentPartitionTest.java
index 6ce8c6d..a313809 100644
--- a/pinot-core/src/test/java/org/apache/pinot/core/segment/index/creator/SegmentPartitionTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/core/segment/index/creator/SegmentPartitionTest.java
@@ -21,12 +21,14 @@ package org.apache.pinot.core.segment.index.creator;
 import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Random;
+import java.util.Set;
 import org.apache.commons.io.FileUtils;
-import org.apache.commons.lang.math.IntRange;
 import org.apache.pinot.common.config.ColumnPartitionConfig;
 import org.apache.pinot.common.config.SegmentPartitionConfig;
 import org.apache.pinot.common.data.DimensionFieldSpec;
@@ -38,6 +40,7 @@ import org.apache.pinot.common.segment.ReadMode;
 import org.apache.pinot.common.utils.request.FilterQueryTree;
 import org.apache.pinot.common.utils.request.RequestUtils;
 import org.apache.pinot.core.data.GenericRow;
+import org.apache.pinot.core.data.partition.ModuloPartitionFunction;
 import org.apache.pinot.core.data.readers.GenericRowRecordReader;
 import org.apache.pinot.core.indexsegment.IndexSegment;
 import org.apache.pinot.core.indexsegment.generator.SegmentGeneratorConfig;
@@ -47,11 +50,12 @@ import org.apache.pinot.core.segment.creator.impl.SegmentIndexCreationDriverImpl
 import org.apache.pinot.core.segment.index.ColumnMetadata;
 import org.apache.pinot.core.segment.index.SegmentMetadataImpl;
 import org.apache.pinot.pql.parsers.Pql2Compiler;
-import org.testng.Assert;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
+import static org.testng.Assert.*;
+
 
 /**
  * Unit test for Segment partitioning:
@@ -74,8 +78,9 @@ public class SegmentPartitionTest {
   private static final int NUM_PARTITIONS = 20; // For modulo function
   private static final int PARTITION_DIVISOR = 5; // Allowed partition values
   private static final int MAX_PARTITION_VALUE = (PARTITION_DIVISOR - 1);
-  private static final String EXPECTED_PARTITION_VALUE_STRING = "[0 " + MAX_PARTITION_VALUE + "]";
-  private static final String EXPECTED_PARTITION_FUNCTION = "MoDuLo";
+  private static final String PARTITION_FUNCTION_NAME = "MoDuLo";
+
+  private final Set<Integer> _expectedPartitions = new HashSet<>();
   private IndexSegment _segment;
 
   @BeforeClass
@@ -99,29 +104,20 @@ public class SegmentPartitionTest {
    *   <li> Partitioning metadata is dropped for column that does not comply to partitioning scheme. </li>
    *   <li> Partitioning metadata is not written out for column for which the metadata was not specified. </li>
    * </ul>
-   * @throws Exception
    */
   @Test
-  public void testMetadata()
-      throws Exception {
+  public void testMetadata() {
     SegmentMetadataImpl metadata = (SegmentMetadataImpl) _segment.getSegmentMetadata();
     ColumnMetadata columnMetadata = metadata.getColumnMetadataFor(PARTITIONED_COLUMN_NAME);
 
-    Assert.assertEquals(columnMetadata.getPartitionFunction().toString().toLowerCase(),
-        EXPECTED_PARTITION_FUNCTION.toLowerCase());
+    assertTrue(columnMetadata.getPartitionFunction() instanceof ModuloPartitionFunction);
 
-    List<IntRange> partitionValues = columnMetadata.getPartitionRanges();
-    Assert.assertEquals(partitionValues.size(), 1);
-    List<IntRange> expectedPartitionValues = ColumnPartitionConfig.rangesFromString(EXPECTED_PARTITION_VALUE_STRING);
-
-    IntRange actualValue = partitionValues.get(0);
-    IntRange expectedPartitionValue = expectedPartitionValues.get(0);
-    Assert.assertEquals(actualValue.getMinimumInteger(), expectedPartitionValue.getMinimumInteger());
-    Assert.assertEquals(actualValue.getMaximumInteger(), expectedPartitionValue.getMaximumInteger());
+    Set<Integer> actualPartitions = columnMetadata.getPartitions();
+    assertEquals(actualPartitions, _expectedPartitions);
 
     columnMetadata = metadata.getColumnMetadataFor(NON_PARTITIONED_COLUMN_NAME);
-    Assert.assertNull(columnMetadata.getPartitionFunction());
-    Assert.assertNull(columnMetadata.getPartitionRanges());
+    assertNull(columnMetadata.getPartitionFunction());
+    assertNull(columnMetadata.getPartitions());
   }
 
   /**
@@ -147,14 +143,14 @@ public class SegmentPartitionTest {
       BrokerRequest brokerRequest = compiler.compileToBrokerRequest(query);
       FilterQueryTree filterQueryTree = RequestUtils.generateFilterQueryTree(brokerRequest);
 
-      Assert.assertEquals(pruner.prune(_segment, filterQueryTree), (columnValue % NUM_PARTITIONS > MAX_PARTITION_VALUE),
+      assertEquals(pruner.prune(_segment, filterQueryTree), (columnValue % NUM_PARTITIONS > MAX_PARTITION_VALUE),
           "Failed for column value: " + columnValue);
 
       // Test for non partitioned column.
       query = buildQuery(TABLE_NAME, NON_PARTITIONED_COLUMN_NAME, columnValue);
       brokerRequest = compiler.compileToBrokerRequest(query);
       filterQueryTree = RequestUtils.generateFilterQueryTree(brokerRequest);
-      Assert.assertFalse(pruner.prune(_segment, filterQueryTree));
+      assertFalse(pruner.prune(_segment, filterQueryTree));
 
       // Test for AND query: Segment can be pruned out if partitioned column has value outside of range.
       int partitionColumnValue = Math.abs(random.nextInt());
@@ -164,7 +160,7 @@ public class SegmentPartitionTest {
 
       brokerRequest = compiler.compileToBrokerRequest(query);
       filterQueryTree = RequestUtils.generateFilterQueryTree(brokerRequest);
-      Assert.assertEquals(pruner.prune(_segment, filterQueryTree),
+      assertEquals(pruner.prune(_segment, filterQueryTree),
           (partitionColumnValue % NUM_PARTITIONS) > MAX_PARTITION_VALUE);
 
       // Test for OR query: Segment should never be pruned as there's an OR with non partitioned column.
@@ -172,54 +168,7 @@ public class SegmentPartitionTest {
           nonPartitionColumnValue, FilterOperator.OR);
       brokerRequest = compiler.compileToBrokerRequest(query);
       filterQueryTree = RequestUtils.generateFilterQueryTree(brokerRequest);
-      Assert.assertFalse(pruner.prune(_segment, filterQueryTree));
-    }
-  }
-
-  /**
-   * Unit test for utility the converts String ranges into IntRanges and back.
-   * <ul>
-   *   <li> Generates a list of String ranges</li>
-   *   <li> Ensures that conversion to IntRanges is as expected</li>
-   *   <li> Ensures that the IntRanges when converted back to String ranges are as expected. </li>
-   * </ul>
-   */
-  @Test
-  public void testStringRangeConversions() {
-    Random random = new Random();
-
-    for (int i = 0; i < 1000; i++) {
-      int numRanges = 1 + random.nextInt(1000);
-      String[] ranges = new String[numRanges];
-      List<IntRange> expected = new ArrayList<>(numRanges);
-      StringBuilder builder = new StringBuilder();
-
-      for (int j = 0; j < numRanges; j++) {
-        int start = random.nextInt();
-        int end = random.nextInt();
-
-        // Generate random ranges such that start <= end.
-        if (start > end) {
-          start ^= end;
-          end = start ^ end;
-          start = start ^ end;
-        }
-
-        ranges[j] = buildRangeString(start, end);
-        expected.add(new IntRange(start, end));
-
-        builder.append(ranges[j]);
-        if (j < numRanges - 1) {
-          builder.append(ColumnPartitionConfig.PARTITION_VALUE_DELIMITER);
-        }
-      }
-      String expectedString = builder.toString();
-
-      List<IntRange> actual = ColumnPartitionConfig.rangesFromString(ranges);
-      Assert.assertEquals(actual, expected);
-
-      String actualString = ColumnPartitionConfig.rangesToString(actual);
-      Assert.assertEquals(actualString, expectedString);
+      assertFalse(pruner.prune(_segment, filterQueryTree));
     }
   }
 
@@ -249,10 +198,10 @@ public class SegmentPartitionTest {
       String partitionColumn = entry.getKey();
 
       ColumnPartitionConfig expectedColumnConfig = expectedMap.get(partitionColumn);
-      Assert.assertNotNull(expectedColumnConfig);
+      assertNotNull(expectedColumnConfig);
       ColumnPartitionConfig actualColumnConfig = entry.getValue();
 
-      Assert.assertEquals(actualColumnConfig.getFunctionName(), expectedColumnConfig.getFunctionName());
+      assertEquals(actualColumnConfig.getFunctionName(), expectedColumnConfig.getFunctionName());
     }
 
     // Test that adding new fields does not break json de-serialization.
@@ -261,7 +210,7 @@ public class SegmentPartitionTest {
     String jsonStringWithoutNewField =
         "{\"columnPartitionMap\":{\"column_0\":{\"functionName\":\"function\",\"numPartitions\":10}}}";
 
-    Assert.assertEquals(jsonStringWithoutNewField,
+    assertEquals(jsonStringWithoutNewField,
         SegmentPartitionConfig.fromJsonString(jsonStringWithNewField).toJsonString());
   }
 
@@ -275,8 +224,8 @@ public class SegmentPartitionTest {
         + operator + " " + nonPartitionColumn + " = " + nonPartitionedColumnValue;
   }
 
-  private String buildRangeString(int start, int end) {
-    return "[" + start + " " + end + "]";
+  private List buildPartitionList(int partition) {
+    return Collections.singletonList(partition);
   }
 
   /**
@@ -297,8 +246,8 @@ public class SegmentPartitionTest {
     Random random = new Random();
     Map<String, ColumnPartitionConfig> partitionFunctionMap = new HashMap<>();
 
-    partitionFunctionMap.put(PARTITIONED_COLUMN_NAME, new ColumnPartitionConfig(EXPECTED_PARTITION_FUNCTION,
-        NUM_PARTITIONS));
+    partitionFunctionMap
+        .put(PARTITIONED_COLUMN_NAME, new ColumnPartitionConfig(PARTITION_FUNCTION_NAME, NUM_PARTITIONS));
 
     SegmentPartitionConfig segmentPartitionConfig = new SegmentPartitionConfig(partitionFunctionMap);
     SegmentGeneratorConfig config = new SegmentGeneratorConfig(schema);
@@ -312,7 +261,9 @@ public class SegmentPartitionTest {
     for (int i = 0; i < NUM_ROWS; i++) {
       HashMap<String, Object> map = new HashMap<>();
 
-      int validPartitionedValue = random.nextInt(100) * 20 + random.nextInt(PARTITION_DIVISOR);
+      int partition = random.nextInt(PARTITION_DIVISOR);
+      int validPartitionedValue = random.nextInt(100) * 20 + partition;
+      _expectedPartitions.add(partition);
       map.put(PARTITIONED_COLUMN_NAME, validPartitionedValue);
       map.put(NON_PARTITIONED_COLUMN_NAME, validPartitionedValue);
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org