You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2022/06/19 01:04:07 UTC

[pinot] branch master updated: Reduce the heap memory usage for segment creation (#8846)

This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 7fef95ebe6 Reduce the heap memory usage for segment creation (#8846)
7fef95ebe6 is described below

commit 7fef95ebe6bf76821df9d40ceb46b622c8037ad6
Author: Xiaotian (Jackie) Jiang <17...@users.noreply.github.com>
AuthorDate: Sat Jun 18 18:04:00 2022 -0700

    Reduce the heap memory usage for segment creation (#8846)
    
    - Enhances `StatsCollector` to early release one copy when the values are no longer needed
    - Early release the value-to-index map in the `SegmentDictionaryCreator`
    - Clean up some unnecessary operations
---
 .../pinot/perf/BenchmarkDictionaryCreation.java    |  29 ++---
 .../creator/impl/SegmentColumnarIndexCreator.java  |  14 +-
 .../creator/impl/SegmentDictionaryCreator.java     | 141 +++++++++------------
 .../impl/SegmentIndexCreationDriverImpl.java       |   2 +
 .../stats/AbstractColumnStatisticsCollector.java   |  61 ++++-----
 .../BigDecimalColumnPreIndexStatsCollector.java    |  16 ++-
 .../stats/BytesColumnPredIndexStatsCollector.java  |  14 +-
 .../stats/DoubleColumnPreIndexStatsCollector.java  |  22 ++--
 .../stats/FloatColumnPreIndexStatsCollector.java   |  22 ++--
 .../stats/IntColumnPreIndexStatsCollector.java     |  22 ++--
 .../stats/LongColumnPreIndexStatsCollector.java    |  22 ++--
 .../stats/SegmentPreIndexStatsCollectorImpl.java   |   4 +-
 .../stats/StringColumnPreIndexStatsCollector.java  |  16 ++-
 .../defaultcolumn/BaseDefaultColumnHandler.java    |   9 +-
 .../local/segment/creator/DictionariesTest.java    |  22 ++--
 .../index/readers/ImmutableDictionaryTest.java     |  34 ++---
 .../ImmutableDictionaryTypeConversionTest.java     |  31 ++---
 17 files changed, 229 insertions(+), 252 deletions(-)

diff --git a/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkDictionaryCreation.java b/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkDictionaryCreation.java
index b86fb1bad5..5bc4de98e6 100644
--- a/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkDictionaryCreation.java
+++ b/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkDictionaryCreation.java
@@ -75,8 +75,8 @@ public class BenchmarkDictionaryCreation {
   @OutputTimeUnit(TimeUnit.MILLISECONDS)
   public int benchmarkIntDictionaryCreation()
       throws IOException {
-    try (SegmentDictionaryCreator dictionaryCreator = new SegmentDictionaryCreator(_sortedInts, INT_FIELD, INDEX_DIR)) {
-      dictionaryCreator.build();
+    try (SegmentDictionaryCreator dictionaryCreator = new SegmentDictionaryCreator(INT_FIELD, INDEX_DIR)) {
+      dictionaryCreator.build(_sortedInts);
       return dictionaryCreator.indexOfSV(0);
     }
   }
@@ -86,9 +86,8 @@ public class BenchmarkDictionaryCreation {
   @OutputTimeUnit(TimeUnit.MILLISECONDS)
   public int benchmarkLongDictionaryCreation()
       throws IOException {
-    try (SegmentDictionaryCreator dictionaryCreator = new SegmentDictionaryCreator(_sortedLongs, LONG_FIELD,
-        INDEX_DIR)) {
-      dictionaryCreator.build();
+    try (SegmentDictionaryCreator dictionaryCreator = new SegmentDictionaryCreator(LONG_FIELD, INDEX_DIR)) {
+      dictionaryCreator.build(_sortedLongs);
       return dictionaryCreator.indexOfSV(0L);
     }
   }
@@ -98,9 +97,8 @@ public class BenchmarkDictionaryCreation {
   @OutputTimeUnit(TimeUnit.MILLISECONDS)
   public int benchmarkFloatDictionaryCreation()
       throws IOException {
-    try (SegmentDictionaryCreator dictionaryCreator = new SegmentDictionaryCreator(_sortedFloats, FLOAT_FIELD,
-        INDEX_DIR)) {
-      dictionaryCreator.build();
+    try (SegmentDictionaryCreator dictionaryCreator = new SegmentDictionaryCreator(FLOAT_FIELD, INDEX_DIR)) {
+      dictionaryCreator.build(_sortedFloats);
       return dictionaryCreator.indexOfSV(0f);
     }
   }
@@ -110,9 +108,8 @@ public class BenchmarkDictionaryCreation {
   @OutputTimeUnit(TimeUnit.MILLISECONDS)
   public int benchmarkDoubleDictionaryCreation()
       throws IOException {
-    try (SegmentDictionaryCreator dictionaryCreator = new SegmentDictionaryCreator(_sortedDoubles, DOUBLE_FIELD,
-        INDEX_DIR)) {
-      dictionaryCreator.build();
+    try (SegmentDictionaryCreator dictionaryCreator = new SegmentDictionaryCreator(DOUBLE_FIELD, INDEX_DIR)) {
+      dictionaryCreator.build(_sortedDoubles);
       return dictionaryCreator.indexOfSV(0d);
     }
   }
@@ -122,9 +119,8 @@ public class BenchmarkDictionaryCreation {
   @OutputTimeUnit(TimeUnit.MILLISECONDS)
   public int benchmarkStringDictionaryCreation()
       throws IOException {
-    try (SegmentDictionaryCreator dictionaryCreator = new SegmentDictionaryCreator(_sortedStrings, STRING_FIELD,
-        INDEX_DIR)) {
-      dictionaryCreator.build();
+    try (SegmentDictionaryCreator dictionaryCreator = new SegmentDictionaryCreator(STRING_FIELD, INDEX_DIR)) {
+      dictionaryCreator.build(_sortedStrings);
       return dictionaryCreator.indexOfSV("0");
     }
   }
@@ -134,9 +130,8 @@ public class BenchmarkDictionaryCreation {
   @OutputTimeUnit(TimeUnit.MILLISECONDS)
   public int benchmarkVarLengthStringDictionaryCreation()
       throws IOException {
-    try (SegmentDictionaryCreator dictionaryCreator = new SegmentDictionaryCreator(_sortedStrings, STRING_FIELD,
-        INDEX_DIR, true)) {
-      dictionaryCreator.build();
+    try (SegmentDictionaryCreator dictionaryCreator = new SegmentDictionaryCreator(STRING_FIELD, INDEX_DIR, true)) {
+      dictionaryCreator.build(_sortedStrings);
       return dictionaryCreator.indexOfSV("0");
     }
   }
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/SegmentColumnarIndexCreator.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/SegmentColumnarIndexCreator.java
index 97def6c383..7357d8bd05 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/SegmentColumnarIndexCreator.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/SegmentColumnarIndexCreator.java
@@ -223,13 +223,14 @@ public class SegmentColumnarIndexCreator implements SegmentCreator {
       if (dictEnabledColumn) {
         // Create dictionary-encoded index
         // Initialize dictionary creator
+        // TODO: Dictionary creator holds all unique values on heap. Consider keeping dictionary instead of creator
+        //       which uses off-heap memory.
         SegmentDictionaryCreator dictionaryCreator =
-            new SegmentDictionaryCreator(columnIndexCreationInfo.getSortedUniqueElementsArray(), fieldSpec, _indexDir,
-                columnIndexCreationInfo.isUseVarLengthDictionary());
+            new SegmentDictionaryCreator(fieldSpec, _indexDir, columnIndexCreationInfo.isUseVarLengthDictionary());
         _dictionaryCreatorMap.put(columnName, dictionaryCreator);
         // Create dictionary
         try {
-          dictionaryCreator.build();
+          dictionaryCreator.build(columnIndexCreationInfo.getSortedUniqueElementsArray());
         } catch (Exception e) {
           LOGGER.error("Error building dictionary for field: {}, cardinality: {}, number of bytes per entry: {}",
               fieldSpec.getName(), columnIndexCreationInfo.getDistinctValueCount(),
@@ -239,7 +240,7 @@ public class SegmentColumnarIndexCreator implements SegmentCreator {
       }
 
       if (bloomFilterColumns.contains(columnName)) {
-        if (indexingConfig != null && indexingConfig.getBloomFilterConfigs() != null
+        if (indexingConfig.getBloomFilterConfigs() != null
             && indexingConfig.getBloomFilterConfigs().containsKey(columnName)) {
           _bloomFilterCreatorMap.put(columnName, _indexCreatorProvider.newBloomFilterCreator(
               context.forBloomFilter(indexingConfig.getBloomFilterConfigs().get(columnName))));
@@ -663,6 +664,9 @@ public class SegmentColumnarIndexCreator implements SegmentCreator {
   @Override
   public void seal()
       throws ConfigurationException, IOException {
+    for (SegmentDictionaryCreator dictionaryCreator : _dictionaryCreatorMap.values()) {
+      dictionaryCreator.postIndexingCleanup();
+    }
     for (DictionaryBasedInvertedIndexCreator invertedIndexCreator : _invertedIndexCreatorMap.values()) {
       invertedIndexCreator.seal();
     }
@@ -779,7 +783,7 @@ public class SegmentColumnarIndexCreator implements SegmentCreator {
       SegmentDictionaryCreator dictionaryCreator = _dictionaryCreatorMap.get(column);
       int dictionaryElementSize = (dictionaryCreator != null) ? dictionaryCreator.getNumBytesPerEntry() : 0;
       addColumnMetadataInfo(properties, column, columnIndexCreationInfo, _totalDocs, _schema.getFieldSpecFor(column),
-          _dictionaryCreatorMap.containsKey(column), dictionaryElementSize);
+          dictionaryCreator != null, dictionaryElementSize);
     }
 
     SegmentZKPropsConfig segmentZKPropsConfig = _config.getSegmentZKPropsConfig();
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/SegmentDictionaryCreator.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/SegmentDictionaryCreator.java
index 1dff12e765..2805975596 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/SegmentDictionaryCreator.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/SegmentDictionaryCreator.java
@@ -47,7 +47,6 @@ import static java.nio.charset.StandardCharsets.UTF_8;
 public class SegmentDictionaryCreator implements Closeable {
   private static final Logger LOGGER = LoggerFactory.getLogger(SegmentDictionaryCreator.class);
 
-  private final Object _sortedValues;
   private final String _columnName;
   private final DataType _storedType;
   private final File _dictionaryFile;
@@ -57,40 +56,34 @@ public class SegmentDictionaryCreator implements Closeable {
   private Long2IntOpenHashMap _longValueToIndexMap;
   private Float2IntOpenHashMap _floatValueToIndexMap;
   private Double2IntOpenHashMap _doubleValueToIndexMap;
-  private Object2IntOpenHashMap<BigDecimal> _bigDecimalValueToIndexMap;
-  private Object2IntOpenHashMap<String> _stringValueToIndexMap;
-  private Object2IntOpenHashMap<ByteArray> _bytesValueToIndexMap;
+  private Object2IntOpenHashMap<Object> _objectValueToIndexMap;
   private int _numBytesPerEntry = 0;
 
-  public SegmentDictionaryCreator(Object sortedValues, FieldSpec fieldSpec, File indexDir,
-      boolean useVarLengthDictionary)
-      throws IOException {
-    _sortedValues = sortedValues;
+  public SegmentDictionaryCreator(FieldSpec fieldSpec, File indexDir, boolean useVarLengthDictionary) {
     _columnName = fieldSpec.getName();
     _storedType = fieldSpec.getDataType().getStoredType();
     _dictionaryFile = new File(indexDir, _columnName + V1Constants.Dict.FILE_EXTENSION);
-    FileUtils.touch(_dictionaryFile);
     _useVarLengthDictionary = useVarLengthDictionary;
   }
 
-  public SegmentDictionaryCreator(Object sortedValues, FieldSpec fieldSpec, File indexDir)
-      throws IOException {
-    this(sortedValues, fieldSpec, indexDir, false);
+  public SegmentDictionaryCreator(FieldSpec fieldSpec, File indexDir) {
+    this(fieldSpec, indexDir, false);
   }
 
-  public void build()
+  public void build(Object sortedValues)
       throws IOException {
+    FileUtils.touch(_dictionaryFile);
+
     switch (_storedType) {
       case INT:
-        int[] sortedInts = (int[]) _sortedValues;
+        int[] sortedInts = (int[]) sortedValues;
         int numValues = sortedInts.length;
         Preconditions.checkState(numValues > 0);
         _intValueToIndexMap = new Int2IntOpenHashMap(numValues);
 
         // Backward-compatible: index file is always big-endian
-        try (PinotDataBuffer dataBuffer = PinotDataBuffer
-            .mapFile(_dictionaryFile, false, 0, (long) numValues * Integer.BYTES, ByteOrder.BIG_ENDIAN,
-                getClass().getSimpleName());
+        try (PinotDataBuffer dataBuffer = PinotDataBuffer.mapFile(_dictionaryFile, false, 0,
+            (long) numValues * Integer.BYTES, ByteOrder.BIG_ENDIAN, getClass().getSimpleName());
             FixedByteValueReaderWriter writer = new FixedByteValueReaderWriter(dataBuffer)) {
           for (int i = 0; i < numValues; i++) {
             int value = sortedInts[i];
@@ -98,21 +91,19 @@ public class SegmentDictionaryCreator implements Closeable {
             writer.writeInt(i, value);
           }
         }
-        LOGGER
-            .info("Created dictionary for INT column: {} with cardinality: {}, range: {} to {}", _columnName, numValues,
-                sortedInts[0], sortedInts[numValues - 1]);
+        LOGGER.info("Created dictionary for INT column: {} with cardinality: {}, range: {} to {}", _columnName,
+            numValues, sortedInts[0], sortedInts[numValues - 1]);
         return;
 
       case LONG:
-        long[] sortedLongs = (long[]) _sortedValues;
+        long[] sortedLongs = (long[]) sortedValues;
         numValues = sortedLongs.length;
         Preconditions.checkState(numValues > 0);
         _longValueToIndexMap = new Long2IntOpenHashMap(numValues);
 
         // Backward-compatible: index file is always big-endian
-        try (PinotDataBuffer dataBuffer = PinotDataBuffer
-            .mapFile(_dictionaryFile, false, 0, (long) numValues * Long.BYTES, ByteOrder.BIG_ENDIAN,
-                getClass().getSimpleName());
+        try (PinotDataBuffer dataBuffer = PinotDataBuffer.mapFile(_dictionaryFile, false, 0,
+            (long) numValues * Long.BYTES, ByteOrder.BIG_ENDIAN, getClass().getSimpleName());
             FixedByteValueReaderWriter writer = new FixedByteValueReaderWriter(dataBuffer)) {
           for (int i = 0; i < numValues; i++) {
             long value = sortedLongs[i];
@@ -125,15 +116,14 @@ public class SegmentDictionaryCreator implements Closeable {
         return;
 
       case FLOAT:
-        float[] sortedFloats = (float[]) _sortedValues;
+        float[] sortedFloats = (float[]) sortedValues;
         numValues = sortedFloats.length;
         Preconditions.checkState(numValues > 0);
         _floatValueToIndexMap = new Float2IntOpenHashMap(numValues);
 
         // Backward-compatible: index file is always big-endian
-        try (PinotDataBuffer dataBuffer = PinotDataBuffer
-            .mapFile(_dictionaryFile, false, 0, (long) numValues * Float.BYTES, ByteOrder.BIG_ENDIAN,
-                getClass().getSimpleName());
+        try (PinotDataBuffer dataBuffer = PinotDataBuffer.mapFile(_dictionaryFile, false, 0,
+            (long) numValues * Float.BYTES, ByteOrder.BIG_ENDIAN, getClass().getSimpleName());
             FixedByteValueReaderWriter writer = new FixedByteValueReaderWriter(dataBuffer)) {
           for (int i = 0; i < numValues; i++) {
             float value = sortedFloats[i];
@@ -146,15 +136,14 @@ public class SegmentDictionaryCreator implements Closeable {
         return;
 
       case DOUBLE:
-        double[] sortedDoubles = (double[]) _sortedValues;
+        double[] sortedDoubles = (double[]) sortedValues;
         numValues = sortedDoubles.length;
         Preconditions.checkState(numValues > 0);
         _doubleValueToIndexMap = new Double2IntOpenHashMap(numValues);
 
         // Backward-compatible: index file is always big-endian
-        try (PinotDataBuffer dataBuffer = PinotDataBuffer
-            .mapFile(_dictionaryFile, false, 0, (long) numValues * Double.BYTES, ByteOrder.BIG_ENDIAN,
-                getClass().getSimpleName());
+        try (PinotDataBuffer dataBuffer = PinotDataBuffer.mapFile(_dictionaryFile, false, 0,
+            (long) numValues * Double.BYTES, ByteOrder.BIG_ENDIAN, getClass().getSimpleName());
             FixedByteValueReaderWriter writer = new FixedByteValueReaderWriter(dataBuffer)) {
           for (int i = 0; i < numValues; i++) {
             double value = sortedDoubles[i];
@@ -167,36 +156,38 @@ public class SegmentDictionaryCreator implements Closeable {
         return;
 
       case BIG_DECIMAL:
-        BigDecimal[] sortedBigDecimals = (BigDecimal[]) _sortedValues;
+        BigDecimal[] sortedBigDecimals = (BigDecimal[]) sortedValues;
         numValues = sortedBigDecimals.length;
-
         Preconditions.checkState(numValues > 0);
-        _bigDecimalValueToIndexMap = new Object2IntOpenHashMap<>(numValues);
+        _objectValueToIndexMap = new Object2IntOpenHashMap<>(numValues);
 
+        // Get the maximum length of all entries
+        byte[][] sortedBigDecimalBytes = new byte[numValues][];
         for (int i = 0; i < numValues; i++) {
           BigDecimal value = sortedBigDecimals[i];
-          _bigDecimalValueToIndexMap.put(value, i);
-          _numBytesPerEntry = Math.max(_numBytesPerEntry, BigDecimalUtils.byteSize(value));
+          _objectValueToIndexMap.put(value, i);
+          byte[] valueBytes = BigDecimalUtils.serialize(value);
+          sortedBigDecimalBytes[i] = valueBytes;
+          _numBytesPerEntry = Math.max(_numBytesPerEntry, valueBytes.length);
         }
 
-        writeBytesValueDictionary(sortedBigDecimals);
-        LOGGER.info(
-            "Created dictionary for BIG_DECIMAL column: {}"
-                + " with cardinality: {}, max length in bytes: {}, range: {} to {}",
-            _columnName, numValues, _numBytesPerEntry, sortedBigDecimals[0], sortedBigDecimals[numValues - 1]);
+        writeBytesValueDictionary(sortedBigDecimalBytes);
+        LOGGER.info("Created dictionary for BIG_DECIMAL column: {} with cardinality: {}, max length in bytes: {}, "
+                + "range: {} to {}", _columnName, numValues, _numBytesPerEntry, sortedBigDecimals[0],
+            sortedBigDecimals[numValues - 1]);
         return;
 
       case STRING:
-        String[] sortedStrings = (String[]) _sortedValues;
+        String[] sortedStrings = (String[]) sortedValues;
         numValues = sortedStrings.length;
         Preconditions.checkState(numValues > 0);
-        _stringValueToIndexMap = new Object2IntOpenHashMap<>(numValues);
+        _objectValueToIndexMap = new Object2IntOpenHashMap<>(numValues);
 
         // Get the maximum length of all entries
         byte[][] sortedStringBytes = new byte[numValues][];
         for (int i = 0; i < numValues; i++) {
           String value = sortedStrings[i];
-          _stringValueToIndexMap.put(value, i);
+          _objectValueToIndexMap.put(value, i);
           byte[] valueBytes = value.getBytes(UTF_8);
           sortedStringBytes[i] = valueBytes;
           _numBytesPerEntry = Math.max(_numBytesPerEntry, valueBytes.length);
@@ -209,17 +200,17 @@ public class SegmentDictionaryCreator implements Closeable {
         return;
 
       case BYTES:
-        ByteArray[] sortedBytes = (ByteArray[]) _sortedValues;
+        ByteArray[] sortedBytes = (ByteArray[]) sortedValues;
         numValues = sortedBytes.length;
-
         Preconditions.checkState(numValues > 0);
-        _bytesValueToIndexMap = new Object2IntOpenHashMap<>(numValues);
+        _objectValueToIndexMap = new Object2IntOpenHashMap<>(numValues);
 
-        byte[][] sortedByteArrays = new byte[sortedBytes.length][];
+        // Get the maximum length of all entries
+        byte[][] sortedByteArrays = new byte[numValues][];
         for (int i = 0; i < numValues; i++) {
           ByteArray value = sortedBytes[i];
           sortedByteArrays[i] = value.getBytes();
-          _bytesValueToIndexMap.put(value, i);
+          _objectValueToIndexMap.put(value, i);
           _numBytesPerEntry = Math.max(_numBytesPerEntry, value.getBytes().length);
         }
 
@@ -252,9 +243,8 @@ public class SegmentDictionaryCreator implements Closeable {
     } else {
       // Backward-compatible: index file is always big-endian
       int numValues = bytesValues.length;
-      try (PinotDataBuffer dataBuffer = PinotDataBuffer
-          .mapFile(_dictionaryFile, false, 0, (long) numValues * _numBytesPerEntry, ByteOrder.BIG_ENDIAN,
-              getClass().getSimpleName());
+      try (PinotDataBuffer dataBuffer = PinotDataBuffer.mapFile(_dictionaryFile, false, 0,
+          (long) numValues * _numBytesPerEntry, ByteOrder.BIG_ENDIAN, getClass().getSimpleName());
           FixedByteValueReaderWriter writer = new FixedByteValueReaderWriter(dataBuffer)) {
         for (int i = 0; i < bytesValues.length; i++) {
           writer.writeBytes(i, _numBytesPerEntry, bytesValues[i]);
@@ -265,31 +255,6 @@ public class SegmentDictionaryCreator implements Closeable {
     }
   }
 
-  private void writeBytesValueDictionary(BigDecimal[] bigDecimalValues)
-      throws IOException {
-    if (_useVarLengthDictionary) {
-      try (VarLengthValueWriter writer = new VarLengthValueWriter(_dictionaryFile, bigDecimalValues.length)) {
-        for (BigDecimal value : bigDecimalValues) {
-          writer.add(BigDecimalUtils.serialize(value));
-        }
-      }
-      LOGGER.info("Using variable length dictionary for column: {}, size: {}", _columnName, _dictionaryFile.length());
-    } else {
-      // Backward-compatible: index file is always big-endian
-      int numValues = bigDecimalValues.length;
-      try (PinotDataBuffer dataBuffer = PinotDataBuffer
-          .mapFile(_dictionaryFile, false, 0, (long) numValues * _numBytesPerEntry, ByteOrder.BIG_ENDIAN,
-              getClass().getSimpleName());
-          FixedByteValueReaderWriter writer = new FixedByteValueReaderWriter(dataBuffer)) {
-        for (int i = 0; i < bigDecimalValues.length; i++) {
-          writer.writeBytes(i, _numBytesPerEntry, BigDecimalUtils.serialize(bigDecimalValues[i]));
-        }
-      }
-      LOGGER.info("Using fixed length dictionary for column: {}, size: {}", _columnName,
-          (long) numValues * _numBytesPerEntry);
-    }
-  }
-
   public int getNumBytesPerEntry() {
     return _numBytesPerEntry;
   }
@@ -305,11 +270,10 @@ public class SegmentDictionaryCreator implements Closeable {
       case DOUBLE:
         return _doubleValueToIndexMap.get((double) value);
       case STRING:
-        return _stringValueToIndexMap.getInt(value);
       case BIG_DECIMAL:
-        return _bigDecimalValueToIndexMap.getInt((BigDecimal) value);
+        return _objectValueToIndexMap.getInt(value);
       case BYTES:
-        return _bytesValueToIndexMap.get(new ByteArray((byte[]) value));
+        return _objectValueToIndexMap.getInt(new ByteArray((byte[]) value));
       default:
         throw new UnsupportedOperationException("Unsupported data type : " + _storedType);
     }
@@ -342,12 +306,12 @@ public class SegmentDictionaryCreator implements Closeable {
         break;
       case STRING:
         for (int i = 0; i < multiValues.length; i++) {
-          indexes[i] = _stringValueToIndexMap.getInt(multiValues[i]);
+          indexes[i] = _objectValueToIndexMap.getInt(multiValues[i]);
         }
         break;
       case BYTES:
         for (int i = 0; i < multiValues.length; i++) {
-          indexes[i] = _bytesValueToIndexMap.get(new ByteArray((byte[]) multiValues[i]));
+          indexes[i] = _objectValueToIndexMap.getInt(new ByteArray((byte[]) multiValues[i]));
         }
         break;
       default:
@@ -356,6 +320,17 @@ public class SegmentDictionaryCreator implements Closeable {
     return indexes;
   }
 
+  /**
+   * Cleans up the no longer needed objects after all the indexing is done to free up some memory.
+   */
+  public void postIndexingCleanup() {
+    _intValueToIndexMap = null;
+    _longValueToIndexMap = null;
+    _floatValueToIndexMap = null;
+    _doubleValueToIndexMap = null;
+    _objectValueToIndexMap = null;
+  }
+
   @Override
   public void close() {
   }
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/SegmentIndexCreationDriverImpl.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/SegmentIndexCreationDriverImpl.java
index dc00e4ce84..18a50dbf0b 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/SegmentIndexCreationDriverImpl.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/SegmentIndexCreationDriverImpl.java
@@ -203,6 +203,8 @@ public class SegmentIndexCreationDriverImpl implements SegmentIndexCreationDrive
 
     try {
       // Initialize the index creation using the per-column statistics information
+      // TODO: _indexCreationInfoMap holds the reference to all unique values on heap (ColumnIndexCreationInfo ->
+      //       ColumnStatistics) throughout the segment creation. Find a way to release the memory early.
       _indexCreator.init(_config, _segmentIndexCreationInfo, _indexCreationInfoMap, _dataSchema, _tempIndexDir);
 
       // Build the index
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/stats/AbstractColumnStatisticsCollector.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/stats/AbstractColumnStatisticsCollector.java
index 85e79e181b..72f8eabb74 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/stats/AbstractColumnStatisticsCollector.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/stats/AbstractColumnStatisticsCollector.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pinot.segment.local.segment.creator.impl.stats;
 
+import com.google.common.base.Preconditions;
 import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
@@ -39,32 +40,34 @@ import org.apache.pinot.spi.data.FieldSpec;
  * compute max
  * see if column isSorted
  */
+@SuppressWarnings({"rawtypes", "unchecked"})
 public abstract class AbstractColumnStatisticsCollector implements ColumnStatistics {
   protected static final int INITIAL_HASH_SET_SIZE = 1000;
-  private Object _previousValue = null;
   protected final FieldSpec _fieldSpec;
-  protected boolean _isSorted = true;
-  private final String _column;
 
-  protected int _totalNumberOfEntries = 0;
-  protected int _maxNumberOfMultiValues = 0;
-  protected int _maxLengthOfMultiValues = 0;
-  private PartitionFunction _partitionFunction;
+  private final Map<String, String> _partitionFunctionConfig;
+  private final PartitionFunction _partitionFunction;
   private final int _numPartitions;
-  private final Map<String, String> _functionConfig;
   private final Set<Integer> _partitions;
 
+  protected int _totalNumberOfEntries = 0;
+  protected int _maxNumberOfMultiValues = 0;
+  protected boolean _sorted = true;
+  private Comparable _previousValue = null;
+
   public AbstractColumnStatisticsCollector(String column, StatsCollectorConfig statsCollectorConfig) {
-    _column = column;
     _fieldSpec = statsCollectorConfig.getFieldSpecForColumn(column);
+    Preconditions.checkArgument(_fieldSpec != null, "Failed to find column: %s", column);
+    if (!_fieldSpec.isSingleValueField()) {
+      _sorted = false;
+    }
 
     String partitionFunctionName = statsCollectorConfig.getPartitionFunctionName(column);
-    int numPartitions = statsCollectorConfig.getNumPartitions(column);
-    _functionConfig = statsCollectorConfig.getPartitionFunctionConfig(column);
-    _partitionFunction = (partitionFunctionName != null) ? PartitionFunctionFactory
-        .getPartitionFunction(partitionFunctionName, numPartitions, _functionConfig) : null;
-
     _numPartitions = statsCollectorConfig.getNumPartitions(column);
+    _partitionFunctionConfig = statsCollectorConfig.getPartitionFunctionConfig(column);
+    _partitionFunction =
+        (partitionFunctionName != null) ? PartitionFunctionFactory.getPartitionFunction(partitionFunctionName,
+            _numPartitions, _partitionFunctionConfig) : null;
     if (_partitionFunction != null) {
       _partitions = new HashSet<>();
     } else {
@@ -76,28 +79,16 @@ public abstract class AbstractColumnStatisticsCollector implements ColumnStatist
     return _maxNumberOfMultiValues;
   }
 
-  public int getMaxLengthOfMultiValues() {
-    return _maxLengthOfMultiValues;
-  }
-
-  void addressSorted(Object entry) {
-    if (_isSorted) {
-      if (_previousValue != null) {
-        if (!entry.equals(_previousValue) && _previousValue != null) {
-          final Comparable prevValue = (Comparable) _previousValue;
-          final Comparable origin = (Comparable) entry;
-          if (origin.compareTo(prevValue) < 0) {
-            _isSorted = false;
-          }
-        }
-      }
+  protected void addressSorted(Comparable entry) {
+    if (_sorted) {
+      _sorted = _previousValue == null || entry.compareTo(_previousValue) >= 0;
       _previousValue = entry;
     }
   }
 
   @Override
   public boolean isSorted() {
-    return _fieldSpec.isSingleValueField() && _isSorted;
+    return _sorted;
   }
 
   /**
@@ -105,14 +96,6 @@ public abstract class AbstractColumnStatisticsCollector implements ColumnStatist
    */
   public abstract void collect(Object entry);
 
-  public abstract Object getMinValue();
-
-  public abstract Object getMaxValue();
-
-  public abstract Object getUniqueValuesSet();
-
-  public abstract int getCardinality();
-
   public int getLengthOfShortestElement() {
     return -1;
   }
@@ -147,7 +130,7 @@ public abstract class AbstractColumnStatisticsCollector implements ColumnStatist
    */
   @Nullable
   public Map<String, String> getPartitionFunctionConfig() {
-    return _functionConfig;
+    return _partitionFunctionConfig;
   }
 
   /**
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/stats/BigDecimalColumnPreIndexStatsCollector.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/stats/BigDecimalColumnPreIndexStatsCollector.java
index e8d76bb25a..1c4b7913ed 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/stats/BigDecimalColumnPreIndexStatsCollector.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/stats/BigDecimalColumnPreIndexStatsCollector.java
@@ -21,7 +21,6 @@ package org.apache.pinot.segment.local.segment.creator.impl.stats;
 import it.unimi.dsi.fastutil.objects.ObjectOpenHashSet;
 import java.math.BigDecimal;
 import java.util.Arrays;
-import java.util.Set;
 import org.apache.pinot.segment.spi.creator.StatsCollectorConfig;
 import org.apache.pinot.spi.utils.BigDecimalUtils;
 
@@ -30,21 +29,21 @@ import org.apache.pinot.spi.utils.BigDecimalUtils;
  * Extension of {@link AbstractColumnStatisticsCollector} for BigDecimal column type.
  */
 public class BigDecimalColumnPreIndexStatsCollector extends AbstractColumnStatisticsCollector {
-  private final Set<BigDecimal> _values = new ObjectOpenHashSet<>(INITIAL_HASH_SET_SIZE);
-
+  private ObjectOpenHashSet<BigDecimal> _values = new ObjectOpenHashSet<>(INITIAL_HASH_SET_SIZE);
   private int _minLength = Integer.MAX_VALUE;
   private int _maxLength = 0;
   private int _maxRowLength = 0;
   private BigDecimal[] _sortedValues;
   private boolean _sealed = false;
 
-  // todo: remove this class if not needed.
   public BigDecimalColumnPreIndexStatsCollector(String column, StatsCollectorConfig statsCollectorConfig) {
     super(column, statsCollectorConfig);
   }
 
   @Override
   public void collect(Object entry) {
+    assert !_sealed;
+
     if (entry instanceof Object[]) {
       throw new UnsupportedOperationException();
     } else {
@@ -113,8 +112,11 @@ public class BigDecimalColumnPreIndexStatsCollector extends AbstractColumnStatis
 
   @Override
   public void seal() {
-    _sortedValues = _values.toArray(new BigDecimal[0]);
-    Arrays.sort(_sortedValues);
-    _sealed = true;
+    if (!_sealed) {
+      _sortedValues = _values.toArray(new BigDecimal[0]);
+      _values = null;
+      Arrays.sort(_sortedValues);
+      _sealed = true;
+    }
   }
 }
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/stats/BytesColumnPredIndexStatsCollector.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/stats/BytesColumnPredIndexStatsCollector.java
index b003b029c7..90732d1acc 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/stats/BytesColumnPredIndexStatsCollector.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/stats/BytesColumnPredIndexStatsCollector.java
@@ -29,8 +29,7 @@ import org.apache.pinot.spi.utils.ByteArray;
  * Extension of {@link AbstractColumnStatisticsCollector} for byte[] column type.
  */
 public class BytesColumnPredIndexStatsCollector extends AbstractColumnStatisticsCollector {
-  private final Set<ByteArray> _values = new ObjectOpenHashSet<>(INITIAL_HASH_SET_SIZE);
-
+  private Set<ByteArray> _values = new ObjectOpenHashSet<>(INITIAL_HASH_SET_SIZE);
   private int _minLength = Integer.MAX_VALUE;
   private int _maxLength = 0;
   private int _maxRowLength = 0;
@@ -43,6 +42,8 @@ public class BytesColumnPredIndexStatsCollector extends AbstractColumnStatistics
 
   @Override
   public void collect(Object entry) {
+    assert !_sealed;
+
     if (entry instanceof Object[]) {
       Object[] values = (Object[]) entry;
       int rowLength = 0;
@@ -123,8 +124,11 @@ public class BytesColumnPredIndexStatsCollector extends AbstractColumnStatistics
 
   @Override
   public void seal() {
-    _sortedValues = _values.toArray(new ByteArray[0]);
-    Arrays.sort(_sortedValues);
-    _sealed = true;
+    if (!_sealed) {
+      _sortedValues = _values.toArray(new ByteArray[0]);
+      _values = null;
+      Arrays.sort(_sortedValues);
+      _sealed = true;
+    }
   }
 }
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/stats/DoubleColumnPreIndexStatsCollector.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/stats/DoubleColumnPreIndexStatsCollector.java
index 4788851407..2a80f353b8 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/stats/DoubleColumnPreIndexStatsCollector.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/stats/DoubleColumnPreIndexStatsCollector.java
@@ -25,8 +25,7 @@ import org.apache.pinot.segment.spi.creator.StatsCollectorConfig;
 
 
 public class DoubleColumnPreIndexStatsCollector extends AbstractColumnStatisticsCollector {
-  private final DoubleSet _values = new DoubleOpenHashSet(INITIAL_HASH_SET_SIZE);
-
+  private DoubleSet _values = new DoubleOpenHashSet(INITIAL_HASH_SET_SIZE);
   private double[] _sortedValues;
   private boolean _sealed = false;
   private double _prevValue = Double.NEGATIVE_INFINITY;
@@ -37,6 +36,8 @@ public class DoubleColumnPreIndexStatsCollector extends AbstractColumnStatistics
 
   @Override
   public void collect(Object entry) {
+    assert !_sealed;
+
     if (entry instanceof Object[]) {
       Object[] values = (Object[]) entry;
       for (Object obj : values) {
@@ -57,11 +58,9 @@ public class DoubleColumnPreIndexStatsCollector extends AbstractColumnStatistics
     }
   }
 
-  void addressSorted(double entry) {
-    if (_isSorted) {
-      if (entry < _prevValue) {
-        _isSorted = false;
-      }
+  private void addressSorted(double entry) {
+    if (_sorted) {
+      _sorted = entry >= _prevValue;
       _prevValue = entry;
     }
   }
@@ -100,8 +99,11 @@ public class DoubleColumnPreIndexStatsCollector extends AbstractColumnStatistics
 
   @Override
   public void seal() {
-    _sortedValues = _values.toDoubleArray();
-    Arrays.sort(_sortedValues);
-    _sealed = true;
+    if (!_sealed) {
+      _sortedValues = _values.toDoubleArray();
+      _values = null;
+      Arrays.sort(_sortedValues);
+      _sealed = true;
+    }
   }
 }
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/stats/FloatColumnPreIndexStatsCollector.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/stats/FloatColumnPreIndexStatsCollector.java
index d4b19d51aa..09051c6517 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/stats/FloatColumnPreIndexStatsCollector.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/stats/FloatColumnPreIndexStatsCollector.java
@@ -25,8 +25,7 @@ import org.apache.pinot.segment.spi.creator.StatsCollectorConfig;
 
 
 public class FloatColumnPreIndexStatsCollector extends AbstractColumnStatisticsCollector {
-  private final FloatSet _values = new FloatOpenHashSet(INITIAL_HASH_SET_SIZE);
-
+  private FloatSet _values = new FloatOpenHashSet(INITIAL_HASH_SET_SIZE);
   private float[] _sortedValues;
   private boolean _sealed = false;
   private float _prevValue = Float.NEGATIVE_INFINITY;
@@ -37,6 +36,8 @@ public class FloatColumnPreIndexStatsCollector extends AbstractColumnStatisticsC
 
   @Override
   public void collect(Object entry) {
+    assert !_sealed;
+
     if (entry instanceof Object[]) {
       Object[] values = (Object[]) entry;
       for (Object obj : values) {
@@ -57,11 +58,9 @@ public class FloatColumnPreIndexStatsCollector extends AbstractColumnStatisticsC
     }
   }
 
-  void addressSorted(float entry) {
-    if (_isSorted) {
-      if (entry < _prevValue) {
-        _isSorted = false;
-      }
+  private void addressSorted(float entry) {
+    if (_sorted) {
+      _sorted = entry >= _prevValue;
       _prevValue = entry;
     }
   }
@@ -100,8 +99,11 @@ public class FloatColumnPreIndexStatsCollector extends AbstractColumnStatisticsC
 
   @Override
   public void seal() {
-    _sortedValues = _values.toFloatArray();
-    Arrays.sort(_sortedValues);
-    _sealed = true;
+    if (!_sealed) {
+      _sortedValues = _values.toFloatArray();
+      _values = null;
+      Arrays.sort(_sortedValues);
+      _sealed = true;
+    }
   }
 }
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/stats/IntColumnPreIndexStatsCollector.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/stats/IntColumnPreIndexStatsCollector.java
index eeee934c1b..d65429e2d7 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/stats/IntColumnPreIndexStatsCollector.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/stats/IntColumnPreIndexStatsCollector.java
@@ -25,8 +25,7 @@ import org.apache.pinot.segment.spi.creator.StatsCollectorConfig;
 
 
 public class IntColumnPreIndexStatsCollector extends AbstractColumnStatisticsCollector {
-  private final IntSet _values = new IntOpenHashSet(INITIAL_HASH_SET_SIZE);
-
+  private IntSet _values = new IntOpenHashSet(INITIAL_HASH_SET_SIZE);
   private int[] _sortedValues;
   private boolean _sealed = false;
   private int _prevValue = Integer.MIN_VALUE;
@@ -37,6 +36,8 @@ public class IntColumnPreIndexStatsCollector extends AbstractColumnStatisticsCol
 
   @Override
   public void collect(Object entry) {
+    assert !_sealed;
+
     if (entry instanceof Object[]) {
       Object[] values = (Object[]) entry;
       for (Object obj : values) {
@@ -57,11 +58,9 @@ public class IntColumnPreIndexStatsCollector extends AbstractColumnStatisticsCol
     }
   }
 
-  void addressSorted(int entry) {
-    if (_isSorted) {
-      if (entry < _prevValue) {
-        _isSorted = false;
-      }
+  private void addressSorted(int entry) {
+    if (_sorted) {
+      _sorted = entry >= _prevValue;
       _prevValue = entry;
     }
   }
@@ -100,8 +99,11 @@ public class IntColumnPreIndexStatsCollector extends AbstractColumnStatisticsCol
 
   @Override
   public void seal() {
-    _sortedValues = _values.toIntArray();
-    Arrays.sort(_sortedValues);
-    _sealed = true;
+    if (!_sealed) {
+      _sortedValues = _values.toIntArray();
+      _values = null;
+      Arrays.sort(_sortedValues);
+      _sealed = true;
+    }
   }
 }
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/stats/LongColumnPreIndexStatsCollector.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/stats/LongColumnPreIndexStatsCollector.java
index d2dc7a5519..39cb4fa54b 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/stats/LongColumnPreIndexStatsCollector.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/stats/LongColumnPreIndexStatsCollector.java
@@ -25,8 +25,7 @@ import org.apache.pinot.segment.spi.creator.StatsCollectorConfig;
 
 
 public class LongColumnPreIndexStatsCollector extends AbstractColumnStatisticsCollector {
-  private final LongSet _values = new LongOpenHashSet(INITIAL_HASH_SET_SIZE);
-
+  private LongSet _values = new LongOpenHashSet(INITIAL_HASH_SET_SIZE);
   private long[] _sortedValues;
   private boolean _sealed = false;
   private long _prevValue = Long.MIN_VALUE;
@@ -37,6 +36,8 @@ public class LongColumnPreIndexStatsCollector extends AbstractColumnStatisticsCo
 
   @Override
   public void collect(Object entry) {
+    assert !_sealed;
+
     if (entry instanceof Object[]) {
       Object[] values = (Object[]) entry;
       for (Object obj : values) {
@@ -57,11 +58,9 @@ public class LongColumnPreIndexStatsCollector extends AbstractColumnStatisticsCo
     }
   }
 
-  void addressSorted(long entry) {
-    if (_isSorted) {
-      if (entry < _prevValue) {
-        _isSorted = false;
-      }
+  private void addressSorted(long entry) {
+    if (_sorted) {
+      _sorted = entry >= _prevValue;
       _prevValue = entry;
     }
   }
@@ -100,8 +99,11 @@ public class LongColumnPreIndexStatsCollector extends AbstractColumnStatisticsCo
 
   @Override
   public void seal() {
-    _sortedValues = _values.toLongArray();
-    Arrays.sort(_sortedValues);
-    _sealed = true;
+    if (!_sealed) {
+      _sortedValues = _values.toLongArray();
+      _values = null;
+      Arrays.sort(_sortedValues);
+      _sealed = true;
+    }
   }
 }
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/stats/SegmentPreIndexStatsCollectorImpl.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/stats/SegmentPreIndexStatsCollectorImpl.java
index 0ac7e4f7a4..d6436d22da 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/stats/SegmentPreIndexStatsCollectorImpl.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/stats/SegmentPreIndexStatsCollectorImpl.java
@@ -79,8 +79,8 @@ public class SegmentPreIndexStatsCollectorImpl implements SegmentPreIndexStatsCo
 
   @Override
   public void build() {
-    for (final String column : _columnStatsCollectorMap.keySet()) {
-      _columnStatsCollectorMap.get(column).seal();
+    for (AbstractColumnStatisticsCollector columnStatsCollector : _columnStatsCollectorMap.values()) {
+      columnStatsCollector.seal();
     }
   }
 
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/stats/StringColumnPreIndexStatsCollector.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/stats/StringColumnPreIndexStatsCollector.java
index ec3f31b17e..980fcd5ba5 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/stats/StringColumnPreIndexStatsCollector.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/stats/StringColumnPreIndexStatsCollector.java
@@ -19,16 +19,15 @@
 package org.apache.pinot.segment.local.segment.creator.impl.stats;
 
 import it.unimi.dsi.fastutil.objects.ObjectOpenHashSet;
-import it.unimi.dsi.fastutil.objects.ObjectSet;
 import java.util.Arrays;
+import java.util.Set;
 import org.apache.pinot.segment.spi.creator.StatsCollectorConfig;
 
 import static java.nio.charset.StandardCharsets.UTF_8;
 
 
 public class StringColumnPreIndexStatsCollector extends AbstractColumnStatisticsCollector {
-  private final ObjectSet<String> _values = new ObjectOpenHashSet<>(INITIAL_HASH_SET_SIZE);
-
+  private Set<String> _values = new ObjectOpenHashSet<>(INITIAL_HASH_SET_SIZE);
   private int _minLength = Integer.MAX_VALUE;
   private int _maxLength = 0;
   private int _maxRowLength = 0;
@@ -41,6 +40,8 @@ public class StringColumnPreIndexStatsCollector extends AbstractColumnStatistics
 
   @Override
   public void collect(Object entry) {
+    assert !_sealed;
+
     if (entry instanceof Object[]) {
       Object[] values = (Object[]) entry;
       int rowLength = 0;
@@ -118,8 +119,11 @@ public class StringColumnPreIndexStatsCollector extends AbstractColumnStatistics
 
   @Override
   public void seal() {
-    _sortedValues = _values.toArray(new String[0]);
-    Arrays.sort(_sortedValues);
-    _sealed = true;
+    if (!_sealed) {
+      _sortedValues = _values.toArray(new String[0]);
+      _values = null;
+      Arrays.sort(_sortedValues);
+      _sealed = true;
+    }
   }
 }
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/defaultcolumn/BaseDefaultColumnHandler.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/defaultcolumn/BaseDefaultColumnHandler.java
index 892ad8f9b5..cdbfababa9 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/defaultcolumn/BaseDefaultColumnHandler.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/defaultcolumn/BaseDefaultColumnHandler.java
@@ -447,8 +447,8 @@ public abstract class BaseDefaultColumnHandler implements DefaultColumnHandler {
 
     // Create dictionary.
     // We will have only one value in the dictionary.
-    try (SegmentDictionaryCreator creator = new SegmentDictionaryCreator(sortedArray, fieldSpec, _indexDir, false)) {
-      creator.build();
+    try (SegmentDictionaryCreator creator = new SegmentDictionaryCreator(fieldSpec, _indexDir, false)) {
+      creator.build(sortedArray);
     }
 
     // Create forward index.
@@ -640,10 +640,9 @@ public abstract class BaseDefaultColumnHandler implements DefaultColumnHandler {
       }
 
       // Create dictionary
-      try (SegmentDictionaryCreator dictionaryCreator = new SegmentDictionaryCreator(
-          indexCreationInfo.getSortedUniqueElementsArray(), fieldSpec, _indexDir,
+      try (SegmentDictionaryCreator dictionaryCreator = new SegmentDictionaryCreator(fieldSpec, _indexDir,
           indexCreationInfo.isUseVarLengthDictionary())) {
-        dictionaryCreator.build();
+        dictionaryCreator.build(indexCreationInfo.getSortedUniqueElementsArray());
 
         // Create forward index
         int cardinality = indexCreationInfo.getDistinctValueCount();
diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/creator/DictionariesTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/creator/DictionariesTest.java
index 8f80819b06..893f1db85e 100644
--- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/creator/DictionariesTest.java
+++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/creator/DictionariesTest.java
@@ -92,9 +92,9 @@ public class DictionariesTest {
       FileUtils.deleteQuietly(INDEX_DIR);
     }
 
-    final SegmentGeneratorConfig config = SegmentTestUtils
-        .getSegmentGenSpecWithSchemAndProjectedColumns(new File(filePath), INDEX_DIR, "time_day", TimeUnit.DAYS,
-            "test");
+    final SegmentGeneratorConfig config =
+        SegmentTestUtils.getSegmentGenSpecWithSchemAndProjectedColumns(new File(filePath), INDEX_DIR, "time_day",
+            TimeUnit.DAYS, "test");
     _tableConfig = config.getTableConfig();
 
     // The segment generation code in SegmentColumnarIndexCreator will throw
@@ -335,8 +335,7 @@ public class DictionariesTest {
 
   @Test
   public void testBigDecimalColumnPreIndexStatsCollector() {
-    AbstractColumnStatisticsCollector statsCollector =
-        buildStatsCollector("column1", DataType.BIG_DECIMAL, false);
+    AbstractColumnStatisticsCollector statsCollector = buildStatsCollector("column1", DataType.BIG_DECIMAL, false);
     statsCollector.collect(BigDecimal.valueOf(1d));
     Assert.assertTrue(statsCollector.isSorted());
     statsCollector.collect(BigDecimal.valueOf(2d));
@@ -458,11 +457,11 @@ public class DictionariesTest {
         new String(new byte[]{67, -61, -76, 116, 101, 32, 100, 39, 73, 118, 111, 105, 114, 101}); // "Côte d'Ivoire";
     Arrays.sort(inputStrings);
 
-    try (SegmentDictionaryCreator dictionaryCreator = new SegmentDictionaryCreator(inputStrings, fieldSpec, indexDir)) {
-      dictionaryCreator.build();
+    try (SegmentDictionaryCreator dictionaryCreator = new SegmentDictionaryCreator(fieldSpec, indexDir)) {
+      dictionaryCreator.build(inputStrings);
       for (String inputString : inputStrings) {
-        Assert
-            .assertTrue(dictionaryCreator.indexOfSV(inputString) >= 0, "Value not found in dictionary " + inputString);
+        Assert.assertTrue(dictionaryCreator.indexOfSV(inputString) >= 0,
+            "Value not found in dictionary " + inputString);
       }
     }
 
@@ -479,9 +478,8 @@ public class DictionariesTest {
     indexDir.deleteOnExit();
     FieldSpec fieldSpec = new DimensionFieldSpec("test", DataType.STRING, true);
 
-    try (SegmentDictionaryCreator dictionaryCreator = new SegmentDictionaryCreator(new String[]{""}, fieldSpec,
-        indexDir)) {
-      dictionaryCreator.build();
+    try (SegmentDictionaryCreator dictionaryCreator = new SegmentDictionaryCreator(fieldSpec, indexDir)) {
+      dictionaryCreator.build(new String[]{""});
       Assert.assertEquals(dictionaryCreator.getNumBytesPerEntry(), 0);
       Assert.assertEquals(dictionaryCreator.indexOfSV(""), 0);
     }
diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/readers/ImmutableDictionaryTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/readers/ImmutableDictionaryTest.java
index 943c841ef8..4344e39970 100644
--- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/readers/ImmutableDictionaryTest.java
+++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/readers/ImmutableDictionaryTest.java
@@ -131,42 +131,44 @@ public class ImmutableDictionaryTest {
     _bytesValues = bytesSet.toArray(new ByteArray[NUM_VALUES]);
     Arrays.sort(_bytesValues);
 
-    try (SegmentDictionaryCreator dictionaryCreator = new SegmentDictionaryCreator(_intValues,
+    try (SegmentDictionaryCreator dictionaryCreator = new SegmentDictionaryCreator(
         new DimensionFieldSpec(INT_COLUMN_NAME, DataType.INT, true), TEMP_DIR)) {
-      dictionaryCreator.build();
+      dictionaryCreator.build(_intValues);
     }
 
-    try (SegmentDictionaryCreator dictionaryCreator = new SegmentDictionaryCreator(_longValues,
+    try (SegmentDictionaryCreator dictionaryCreator = new SegmentDictionaryCreator(
         new DimensionFieldSpec(LONG_COLUMN_NAME, DataType.LONG, true), TEMP_DIR)) {
-      dictionaryCreator.build();
+      dictionaryCreator.build(_longValues);
     }
 
-    try (SegmentDictionaryCreator dictionaryCreator = new SegmentDictionaryCreator(_floatValues,
+    try (SegmentDictionaryCreator dictionaryCreator = new SegmentDictionaryCreator(
         new DimensionFieldSpec(FLOAT_COLUMN_NAME, DataType.FLOAT, true), TEMP_DIR)) {
-      dictionaryCreator.build();
+      dictionaryCreator.build(_floatValues);
     }
 
-    try (SegmentDictionaryCreator dictionaryCreator = new SegmentDictionaryCreator(_doubleValues,
+    try (SegmentDictionaryCreator dictionaryCreator = new SegmentDictionaryCreator(
         new DimensionFieldSpec(DOUBLE_COLUMN_NAME, DataType.DOUBLE, true), TEMP_DIR)) {
-      dictionaryCreator.build();
+      dictionaryCreator.build(_doubleValues);
     }
 
     // Note: BigDecimalDictionary requires setting useVarLengthDictionary to true.
     boolean useVarLengthDictionary = true;
-    try (SegmentDictionaryCreator dictionaryCreator = new SegmentDictionaryCreator(_bigDecimalValues,
-        new MetricFieldSpec(BIG_DECIMAL_COLUMN_NAME, DataType.BIG_DECIMAL), TEMP_DIR, useVarLengthDictionary)) {
-      dictionaryCreator.build();
+    MetricFieldSpec bigDecimalMetricField = new MetricFieldSpec(BIG_DECIMAL_COLUMN_NAME, DataType.BIG_DECIMAL);
+    bigDecimalMetricField.setSingleValueField(true);
+    try (SegmentDictionaryCreator dictionaryCreator = new SegmentDictionaryCreator(bigDecimalMetricField, TEMP_DIR,
+        useVarLengthDictionary)) {
+      dictionaryCreator.build(_bigDecimalValues);
     }
 
-    try (SegmentDictionaryCreator dictionaryCreator = new SegmentDictionaryCreator(_stringValues,
+    try (SegmentDictionaryCreator dictionaryCreator = new SegmentDictionaryCreator(
         new DimensionFieldSpec(STRING_COLUMN_NAME, DataType.STRING, true), TEMP_DIR)) {
-      dictionaryCreator.build();
+      dictionaryCreator.build(_stringValues);
       _numBytesPerStringValue = dictionaryCreator.getNumBytesPerEntry();
     }
 
-    try (SegmentDictionaryCreator dictionaryCreator = new SegmentDictionaryCreator(_bytesValues,
+    try (SegmentDictionaryCreator dictionaryCreator = new SegmentDictionaryCreator(
         new DimensionFieldSpec(BYTES_COLUMN_NAME, DataType.BYTES, true), TEMP_DIR)) {
-      dictionaryCreator.build();
+      dictionaryCreator.build(_bytesValues);
       assertEquals(dictionaryCreator.getNumBytesPerEntry(), BYTES_LENGTH);
     }
   }
@@ -195,7 +197,7 @@ public class ImmutableDictionaryTest {
     for (int i = 0; i < NUM_VALUES; i++) {
       assertEquals(intDictionary.get(i), _intValues[i]);
       assertEquals(intDictionary.getIntValue(i), _intValues[i]);
-      assertEquals(intDictionary.getLongValue(i), (long) _intValues[i]);
+      assertEquals(intDictionary.getLongValue(i), _intValues[i]);
       assertEquals(intDictionary.getFloatValue(i), (float) _intValues[i]);
       assertEquals(intDictionary.getDoubleValue(i), (double) _intValues[i]);
       Assert.assertEquals(Integer.parseInt(intDictionary.getStringValue(i)), _intValues[i]);
diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/readers/ImmutableDictionaryTypeConversionTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/readers/ImmutableDictionaryTypeConversionTest.java
index b89c0cdd6a..56b31dea69 100644
--- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/readers/ImmutableDictionaryTypeConversionTest.java
+++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/readers/ImmutableDictionaryTypeConversionTest.java
@@ -112,40 +112,41 @@ public class ImmutableDictionaryTypeConversionTest {
       _bytesValues[i] = BytesUtils.toByteArray(_stringValues[i]);
     }
 
-    try (SegmentDictionaryCreator dictionaryCreator = new SegmentDictionaryCreator(_intValues,
+    try (SegmentDictionaryCreator dictionaryCreator = new SegmentDictionaryCreator(
         new DimensionFieldSpec(INT_COLUMN_NAME, DataType.INT, true), TEMP_DIR)) {
-      dictionaryCreator.build();
+      dictionaryCreator.build(_intValues);
     }
 
-    try (SegmentDictionaryCreator dictionaryCreator = new SegmentDictionaryCreator(_longValues,
+    try (SegmentDictionaryCreator dictionaryCreator = new SegmentDictionaryCreator(
         new DimensionFieldSpec(LONG_COLUMN_NAME, DataType.LONG, true), TEMP_DIR)) {
-      dictionaryCreator.build();
+      dictionaryCreator.build(_longValues);
     }
 
-    try (SegmentDictionaryCreator dictionaryCreator = new SegmentDictionaryCreator(_floatValues,
+    try (SegmentDictionaryCreator dictionaryCreator = new SegmentDictionaryCreator(
         new DimensionFieldSpec(FLOAT_COLUMN_NAME, DataType.FLOAT, true), TEMP_DIR)) {
-      dictionaryCreator.build();
+      dictionaryCreator.build(_floatValues);
     }
 
-    try (SegmentDictionaryCreator dictionaryCreator = new SegmentDictionaryCreator(_doubleValues,
+    try (SegmentDictionaryCreator dictionaryCreator = new SegmentDictionaryCreator(
         new DimensionFieldSpec(DOUBLE_COLUMN_NAME, DataType.DOUBLE, true), TEMP_DIR)) {
-      dictionaryCreator.build();
+      dictionaryCreator.build(_doubleValues);
     }
 
-    try (SegmentDictionaryCreator dictionaryCreator = new SegmentDictionaryCreator(_bigDecimalValues,
-        new MetricFieldSpec(BIG_DECIMAL_COLUMN_NAME, DataType.BIG_DECIMAL), TEMP_DIR)) {
-      dictionaryCreator.build();
+    MetricFieldSpec bigDecimalMetricField = new MetricFieldSpec(BIG_DECIMAL_COLUMN_NAME, DataType.BIG_DECIMAL);
+    bigDecimalMetricField.setSingleValueField(true);
+    try (SegmentDictionaryCreator dictionaryCreator = new SegmentDictionaryCreator(bigDecimalMetricField, TEMP_DIR)) {
+      dictionaryCreator.build(_bigDecimalValues);
     }
 
-    try (SegmentDictionaryCreator dictionaryCreator = new SegmentDictionaryCreator(_stringValues,
+    try (SegmentDictionaryCreator dictionaryCreator = new SegmentDictionaryCreator(
         new DimensionFieldSpec(STRING_COLUMN_NAME, DataType.STRING, true), TEMP_DIR)) {
-      dictionaryCreator.build();
+      dictionaryCreator.build(_stringValues);
       assertEquals(dictionaryCreator.getNumBytesPerEntry(), STRING_LENGTH);
     }
 
-    try (SegmentDictionaryCreator dictionaryCreator = new SegmentDictionaryCreator(_bytesValues,
+    try (SegmentDictionaryCreator dictionaryCreator = new SegmentDictionaryCreator(
         new DimensionFieldSpec(BYTES_COLUMN_NAME, DataType.BYTES, true), TEMP_DIR)) {
-      dictionaryCreator.build();
+      dictionaryCreator.build(_bytesValues);
       assertEquals(dictionaryCreator.getNumBytesPerEntry(), BYTES_LENGTH);
     }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org