You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ma...@apache.org on 2019/07/06 03:41:10 UTC

[incubator-pinot] branch master updated: #4317 Feature/variable length bytes offline dictionary for indexing bytes and string dicts. (#4321)

This is an automated email from the ASF dual-hosted git repository.

mayanks pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 49d8fa7  #4317 Feature/variable length bytes offline dictionary for indexing bytes and string dicts. (#4321)
49d8fa7 is described below

commit 49d8fa7907efc4192c3ecd5fef67c33e657c39f5
Author: Buchi Reddy Busi Reddy <ma...@gmail.com>
AuthorDate: Fri Jul 5 20:41:04 2019 -0700

    #4317 Feature/variable length bytes offline dictionary for indexing bytes and string dicts. (#4321)
    
    * Adding a variable length bytes value reader for offline dictionary index.
    
    * Make variables private.
    
    * Cleaned up the changes a bit and removed the hacks to use the
    variable length byte dictionary.
    
    In the next iteration, we'll be deciding dynamically whether to use
    fixed or variable length impl.
    
    * Minor cleanup.
    
    * Adding stats about the string values length in the StringDictionaryPerfTest.java
    
    * Improved the magic header for VariableLength dictionary.
    
    Also fixed other review comments from Kishore.
    
    * Making the feature of whether to use variable length dictionary
    for a column configurable through TableConfig.
    
    * Use the variable length dictionary config in realtime segments path too.
    
    * Fixed a bug that was caught by the unit tests.
    
    * Adding variable length dictionary config in more places like:
    - IndexLoadingConfig
    - RealtimeSegmentConfig
    
    * Use separate file names for different tests in a single class. This was
    causing a race condition and unit test failure.
    
    * Dont set null value for varLengthDictionaryColumns
    
    * Fixed review comments.
    
    * Fixed review comments and formatted the code as per Pinot code style.
    
    * Variable length dictionary shouldn't support getPaddedString()
    since we don't expect it to be ever called.
    
    * The way byte[] returned from ImmutableDictionaryReader is reused,
    we can't really reuse it so removing the ThreadLocal byte[].
    
    Thanks to the unit test which caught this issue :thumbsup:
    
    * Explicitly persist the data section offset in the header of the store
    so that we don't need to make any assumptions about where the data starts.
---
 .../apache/pinot/common/config/IndexingConfig.java |  20 +-
 .../pinot/common/config/IndexingConfigTest.java    |   3 +
 .../realtime/HLRealtimeSegmentDataManager.java     |   5 +
 .../realtime/LLRealtimeSegmentDataManager.java     |   6 +-
 .../generator/SegmentGeneratorConfig.java          |  13 ++
 .../org/apache/pinot/core/io/util/ValueReader.java |   3 +-
 .../io/util/VarLengthBytesValueReaderWriter.java   | 240 +++++++++++++++++++++
 .../converter/RealtimeSegmentConverter.java        |  11 +-
 .../core/realtime/impl/RealtimeSegmentConfig.java  |  20 +-
 .../segment/creator/ColumnIndexCreationInfo.java   |  10 +-
 .../creator/impl/SegmentColumnarIndexCreator.java  |   3 +-
 .../creator/impl/SegmentDictionaryCreator.java     |  72 +++++--
 .../impl/SegmentIndexCreationDriverImpl.java       |   6 +-
 .../segment/index/loader/IndexLoadingConfig.java   |  11 +
 .../defaultcolumn/BaseDefaultColumnHandler.java    |   8 +-
 .../index/readers/ImmutableDictionaryReader.java   |  18 +-
 .../segment/store/SegmentLocalFSDirectory.java     |   2 +-
 .../MutableSegmentImplAggregateMetricsTest.java    |   1 +
 .../mutable/MutableSegmentImplTest.java            |   3 +-
 .../mutable/MutableSegmentImplTestUtils.java       |   4 +-
 .../util/VarLengthBytesValueReaderWriterTest.java  | 118 ++++++++++
 .../pinot/perf/BenchmarkDictionaryCreation.java    |  12 ++
 .../pinot/perf/StringDictionaryPerfTest.java       |  86 ++++++--
 .../realtime/provisioning/MemoryEstimator.java     |  18 +-
 24 files changed, 626 insertions(+), 67 deletions(-)

diff --git a/pinot-common/src/main/java/org/apache/pinot/common/config/IndexingConfig.java b/pinot-common/src/main/java/org/apache/pinot/common/config/IndexingConfig.java
index 6c0fb65..6172f52 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/config/IndexingConfig.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/config/IndexingConfig.java
@@ -87,6 +87,14 @@ public class IndexingConfig {
   @ConfigKey("aggregateMetrics")
   private boolean _aggregateMetrics;
 
+  /**
+   * The list of columns for which the variable length dictionary needs to be enabled in offline
+   * segments. This is only valid for string and bytes columns and has no impact for columns of
+   * other data types.
+   */
+  @ConfigKey("varLengthDictionaryColumns")
+  private List<String> _varLengthDictionaryColumns;
+
   public List<String> getInvertedIndexColumns() {
     return _invertedIndexColumns;
   }
@@ -223,6 +231,14 @@ public class IndexingConfig {
     return _aggregateMetrics;
   }
 
+  public List<String> getVarLengthDictionaryColumns() {
+    return _varLengthDictionaryColumns;
+  }
+
+  public void setVarLengthDictionaryColumns(List<String> varLengthDictionaryColumns) {
+    _varLengthDictionaryColumns = varLengthDictionaryColumns;
+  }
+
   @Override
   public String toString() {
     final StringBuilder result = new StringBuilder();
@@ -279,7 +295,8 @@ public class IndexingConfig {
         .isEqual(_onHeapDictionaryColumns, that._onHeapDictionaryColumns) && EqualityUtils
         .isEqual(_starTreeIndexSpec, that._starTreeIndexSpec) && EqualityUtils
         .isEqual(_segmentPartitionConfig, that._segmentPartitionConfig) && EqualityUtils
-        .isEqual(_bloomFilterColumns, that._bloomFilterColumns);
+        .isEqual(_bloomFilterColumns, that._bloomFilterColumns) && EqualityUtils
+        .isEqual(_varLengthDictionaryColumns, that._varLengthDictionaryColumns);
   }
 
   @Override
@@ -298,6 +315,7 @@ public class IndexingConfig {
     result = EqualityUtils.hashCodeOf(result, _starTreeIndexSpec);
     result = EqualityUtils.hashCodeOf(result, _segmentPartitionConfig);
     result = EqualityUtils.hashCodeOf(result, _bloomFilterColumns);
+    result = EqualityUtils.hashCodeOf(result, _varLengthDictionaryColumns);
     return result;
   }
 }
diff --git a/pinot-common/src/test/java/org/apache/pinot/common/config/IndexingConfigTest.java b/pinot-common/src/test/java/org/apache/pinot/common/config/IndexingConfigTest.java
index e3d31d5..fd782fe 100644
--- a/pinot-common/src/test/java/org/apache/pinot/common/config/IndexingConfigTest.java
+++ b/pinot-common/src/test/java/org/apache/pinot/common/config/IndexingConfigTest.java
@@ -54,6 +54,8 @@ public class IndexingConfigTest {
     noDictionaryConfig.put("a", "SNAPPY");
     noDictionaryConfig.put("b", "PASS_THROUGH");
     indexingConfig.setnoDictionaryConfig(noDictionaryConfig);
+    List<String> varLengthDictionaryColumns = Arrays.asList("a", "x", "z");
+    indexingConfig.setVarLengthDictionaryColumns(varLengthDictionaryColumns);
 
     indexingConfig = JsonUtils.stringToObject(JsonUtils.objectToString(indexingConfig), IndexingConfig.class);
 
@@ -64,6 +66,7 @@ public class IndexingConfigTest {
     assertEquals(indexingConfig.getOnHeapDictionaryColumns(), onHeapDictionaryColumns);
     assertEquals(indexingConfig.getBloomFilterColumns(), bloomFilterColumns);
     assertEquals(indexingConfig.getNoDictionaryConfig(), noDictionaryConfig);
+    assertEquals(indexingConfig.getVarLengthDictionaryColumns(), varLengthDictionaryColumns);
   }
 
   @Test
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/HLRealtimeSegmentDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/HLRealtimeSegmentDataManager.java
index b8b7f17..d5b223a 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/HLRealtimeSegmentDataManager.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/HLRealtimeSegmentDataManager.java
@@ -92,6 +92,7 @@ public class HLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
   private final String sortedColumn;
   private final List<String> invertedIndexColumns;
   private final List<String> noDictionaryColumns;
+  private final List<String> varLengthDictionaryColumns;
   private Logger segmentLogger = LOGGER;
   private final SegmentVersion _segmentVersion;
 
@@ -145,6 +146,8 @@ public class HLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
     // No DictionaryColumns
     noDictionaryColumns = new ArrayList<>(indexLoadingConfig.getNoDictionaryColumns());
 
+    varLengthDictionaryColumns = new ArrayList<>(indexLoadingConfig.getVarLengthDictionaryColumns());
+
     _streamConfig = new StreamConfig(tableConfig.getIndexingConfig().getStreamConfigs());
 
     segmentLogger = LoggerFactory.getLogger(
@@ -181,6 +184,7 @@ public class HLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
             .setSchema(schema).setCapacity(capacity)
             .setAvgNumMultiValues(indexLoadingConfig.getRealtimeAvgMultiValueCount())
             .setNoDictionaryColumns(indexLoadingConfig.getNoDictionaryColumns())
+            .setVarLengthDictionaryColumns(indexLoadingConfig.getVarLengthDictionaryColumns())
             .setInvertedIndexColumns(invertedIndexColumns).setRealtimeSegmentZKMetadata(realtimeSegmentZKMetadata)
             .setOffHeap(indexLoadingConfig.isRealtimeOffheapAllocation()).setMemoryManager(
             getMemoryManager(realtimeTableDataManager.getConsumerDir(), segmentName,
@@ -259,6 +263,7 @@ public class HLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
               new RealtimeSegmentConverter(realtimeSegment, tempSegmentFolder.getAbsolutePath(), schema,
                   tableNameWithType, timeColumnName, realtimeSegmentZKMetadata.getSegmentName(),
                   sortedColumn, HLRealtimeSegmentDataManager.this.invertedIndexColumns, noDictionaryColumns,
+                  varLengthDictionaryColumns,
                   null/*StarTreeIndexSpec*/); // Star tree not supported for HLC.
 
           segmentLogger.info("Trying to build segment");
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
index 61ed73c..e2cb15c 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
@@ -237,6 +237,7 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
   private final String _timeColumnName;
   private final List<String> _invertedIndexColumns;
   private final List<String> _noDictionaryColumns;
+  private final List<String> _varLengthDictionaryColumns;
   private final StarTreeIndexSpec _starTreeIndexSpec;
   private final String _sortedColumn;
   private Logger segmentLogger;
@@ -662,7 +663,7 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
       RealtimeSegmentConverter converter =
           new RealtimeSegmentConverter(_realtimeSegment, tempSegmentFolder.getAbsolutePath(), _schema,
               _tableNameWithType, _timeColumnName, _segmentZKMetadata.getSegmentName(), _sortedColumn,
-              _invertedIndexColumns, _noDictionaryColumns, _starTreeIndexSpec);
+              _invertedIndexColumns, _noDictionaryColumns, _varLengthDictionaryColumns, _starTreeIndexSpec);
       segmentLogger.info("Trying to build segment");
       try {
         converter.build(_segmentVersion, _serverMetrics);
@@ -1088,6 +1089,8 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
     // No dictionary Columns
     _noDictionaryColumns = new ArrayList<>(indexLoadingConfig.getNoDictionaryColumns());
 
+    _varLengthDictionaryColumns = new ArrayList<>(indexLoadingConfig.getVarLengthDictionaryColumns());
+
     // Read the star tree config
     _starTreeIndexSpec = indexingConfig.getStarTreeIndexSpec();
 
@@ -1106,6 +1109,7 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
             .setSchema(schema).setCapacity(_segmentMaxRowCount)
             .setAvgNumMultiValues(indexLoadingConfig.getRealtimeAvgMultiValueCount())
             .setNoDictionaryColumns(indexLoadingConfig.getNoDictionaryColumns())
+            .setVarLengthDictionaryColumns(indexLoadingConfig.getVarLengthDictionaryColumns())
             .setInvertedIndexColumns(invertedIndexColumns).setRealtimeSegmentZKMetadata(segmentZKMetadata)
             .setOffHeap(_isOffHeap).setMemoryManager(_memoryManager)
             .setStatsHistory(realtimeTableDataManager.getStatsHistory())
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/generator/SegmentGeneratorConfig.java b/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/generator/SegmentGeneratorConfig.java
index bc176bd..4b4251c 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/generator/SegmentGeneratorConfig.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/generator/SegmentGeneratorConfig.java
@@ -80,6 +80,7 @@ public class SegmentGeneratorConfig {
   private Map<String, ChunkCompressorFactory.CompressionType> _rawIndexCompressionType = new HashMap<>();
   private List<String> _invertedIndexCreationColumns = new ArrayList<>();
   private List<String> _columnSortOrder = new ArrayList<>();
+  private List<String> _varLengthDictionaryColumns = new ArrayList<>();
   private String _dataDir = null;
   private String _inputFilePath = null;
   private FileFormat _format = FileFormat.AVRO;
@@ -128,6 +129,7 @@ public class SegmentGeneratorConfig {
     _rawIndexCompressionType.putAll(config._rawIndexCompressionType);
     _invertedIndexCreationColumns.addAll(config._invertedIndexCreationColumns);
     _columnSortOrder.addAll(config._columnSortOrder);
+    _varLengthDictionaryColumns.addAll(config._varLengthDictionaryColumns);
     _dataDir = config._dataDir;
     _inputFilePath = config._inputFilePath;
     _format = config._format;
@@ -187,6 +189,9 @@ public class SegmentGeneratorConfig {
         this.setRawIndexCompressionType(serializedNoDictionaryColumnMap);
       }
     }
+    if (indexingConfig.getVarLengthDictionaryColumns() != null) {
+      setVarLengthDictionaryColumns(indexingConfig.getVarLengthDictionaryColumns());
+    }
     _segmentPartitionConfig = indexingConfig.getSegmentPartitionConfig();
 
     // Star-tree V1 config
@@ -276,6 +281,14 @@ public class SegmentGeneratorConfig {
     _columnSortOrder.addAll(sortOrder);
   }
 
+  public List<String> getVarLengthDictionaryColumns() {
+    return _varLengthDictionaryColumns;
+  }
+
+  public void setVarLengthDictionaryColumns(List<String> varLengthDictionaryColumns) {
+    this._varLengthDictionaryColumns = varLengthDictionaryColumns;
+  }
+
   public void createInvertedIndexForColumn(String column) {
     Preconditions.checkNotNull(column);
     if (_schema != null && _schema.getFieldSpecFor(column) == null) {
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/io/util/ValueReader.java b/pinot-core/src/main/java/org/apache/pinot/core/io/util/ValueReader.java
index 644c06b..46340c4 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/io/util/ValueReader.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/io/util/ValueReader.java
@@ -39,6 +39,5 @@ public interface ValueReader {
 
   byte[] getBytes(int index, int numBytesPerValue, byte[] buffer);
 
-  void close()
-      throws IOException;
+  void close() throws IOException;
 }
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/io/util/VarLengthBytesValueReaderWriter.java b/pinot-core/src/main/java/org/apache/pinot/core/io/util/VarLengthBytesValueReaderWriter.java
new file mode 100644
index 0000000..45e840b
--- /dev/null
+++ b/pinot-core/src/main/java/org/apache/pinot/core/io/util/VarLengthBytesValueReaderWriter.java
@@ -0,0 +1,240 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.io.util;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.Arrays;
+import org.apache.pinot.common.utils.StringUtil;
+import org.apache.pinot.core.segment.memory.PinotDataBuffer;
+
+
+/**
+ * An immutable implementation of {@link ValueReader} that will allow each byte[] to be of variable
+ * length and there by avoiding the unnecessary padding. Since this is an immutable data structure,
+ * full data has to be given at initialization time an doesn't have any write() methods.
+ *
+ * The layout of the file is as follows:
+ * <p> Header Section: </p>
+ * <ul>
+ *   <li> Magic bytes: Chose this to be ".vl;" to avoid conflicts with the fixed size
+ *        {@link ValueReader} implementations. By having special characters, this avoids conflicts
+ *        with regular bytes/strings dictionaries.
+ *   </li>
+ *   <li> Version number: This is an integer and can be used for the evolution of the store
+ *        implementation by incrementing version for every incompatible change to the store/format.
+ *   </li>
+ *   <li> Number of elements in the store. </li>
+ *   <li> The offset where the data section starts. Though the data section usually starts right after
+ *        the header, having this explicitly will let the store to be evolved freely without any
+ *        assumptions about where the data section starts.
+ *   </li>
+ * </ul>
+ *
+ * <p> Data section: </p>
+ * <ul>
+ *   <li> Offsets Array: Integer offsets to start position of byte arrays.
+ *        Example: [O(1), O(2),...O(n)] where O is the Offset function. Length of nth element
+ *        is computed as: O(n+1) - O(n). Since the last element's length can't be computed
+ *        using this formula, we store an extra offset at the end to be able to compute last
+ *        element's length with the same formula, without depending on underlying buffer's size.
+ *   </li>
+ *   <li> All byte arrays or values. </li>
+ * </ul>
+ *
+ * @see FixedByteValueReaderWriter
+ */
+public class VarLengthBytesValueReaderWriter implements Closeable, ValueReader {
+
+  /**
+   * Magic bytes used to identify the dictionary files written in variable length bytes format.
+   */
+  private static final byte[] MAGIC_BYTES = StringUtil.encodeUtf8(".vl;");
+
+  /**
+   * Increment this version if there are any structural changes in the store format and
+   * deal with backward compatibility correctly based on old versions.
+   */
+  private static final int VERSION = 1;
+
+  // Offsets of different fields in the header. Having as constants for readability.
+  private static final int VERSION_OFFSET = MAGIC_BYTES.length;
+  private static final int NUM_ELEMENTS_OFFSET = VERSION_OFFSET + Integer.BYTES;
+  private static final int DATA_SECTION_OFFSET_POSITION = NUM_ELEMENTS_OFFSET + Integer.BYTES;
+  private static final int HEADER_LENGTH = DATA_SECTION_OFFSET_POSITION + Integer.BYTES;
+
+  private final PinotDataBuffer _dataBuffer;
+
+  /**
+   * The offset of the data section in the buffer/store. This info will be persisted in the header
+   * so it has to be read from the buffer while initializing the store in read cases.
+   */
+  private final int _dataSectionStartOffSet;
+
+  /**
+   * Total number of values present in the store.
+   */
+  private final int _numElements;
+
+  /**
+   * Constructor to create a VarLengthBytesValueReaderWriter from a previously written buffer.
+   */
+  public VarLengthBytesValueReaderWriter(PinotDataBuffer dataBuffer) {
+    _dataBuffer = dataBuffer;
+
+    // To prepare ourselves to start reading the data, initialize the data offset.
+    _numElements = dataBuffer.getInt(NUM_ELEMENTS_OFFSET);
+    _dataSectionStartOffSet = dataBuffer.getInt(DATA_SECTION_OFFSET_POSITION);
+  }
+
+  /**
+   * Constructor to create a new immutable store with the given data.
+   */
+  public VarLengthBytesValueReaderWriter(PinotDataBuffer dataBuffer, byte[][] byteArrays) {
+    _dataBuffer = dataBuffer;
+    _numElements = byteArrays.length;
+
+    // For now, start writing the data section right after the header but if this store evolves,
+    // we could decide to start the data section somewhere else.
+    _dataSectionStartOffSet = HEADER_LENGTH;
+
+    write(byteArrays);
+  }
+
+  public static long getRequiredSize(byte[][] byteArrays) {
+    // First include the header and then the data section.
+    // Remember there are n+1 offsets
+    long length = HEADER_LENGTH + Integer.BYTES * (byteArrays.length + 1);
+
+    for (byte[] array : byteArrays) {
+      length += array.length;
+    }
+    return length;
+  }
+
+  public static boolean isVarLengthBytesDictBuffer(PinotDataBuffer buffer) {
+    // If the buffer is smaller than header size, it's not var length dictionary.
+    if (buffer.size() > HEADER_LENGTH) {
+      byte[] magicBytes = new byte[MAGIC_BYTES.length];
+      buffer.copyTo(0, magicBytes, 0, MAGIC_BYTES.length);
+
+      if (Arrays.equals(MAGIC_BYTES, magicBytes)) {
+        // Verify the version.
+        if (VERSION == buffer.getInt(MAGIC_BYTES.length)) {
+          // Also verify that there is a valid numElements value and valid offset for data section.
+          return buffer.getInt(NUM_ELEMENTS_OFFSET) >= 0 && buffer.getInt(DATA_SECTION_OFFSET_POSITION) > 0;
+        }
+      }
+    }
+
+    return false;
+  }
+
+  private void writeHeader() {
+    for (int offset = 0; offset < MAGIC_BYTES.length; offset++) {
+      _dataBuffer.putByte(offset, MAGIC_BYTES[offset]);
+    }
+    _dataBuffer.putInt(VERSION_OFFSET, VERSION);
+    _dataBuffer.putInt(NUM_ELEMENTS_OFFSET, _numElements);
+    _dataBuffer.putInt(DATA_SECTION_OFFSET_POSITION, _dataSectionStartOffSet);
+  }
+
+  private void write(byte[][] byteArrays) {
+    writeHeader();
+
+    // Then write the offset of each of the byte array in the data buffer.
+    int nextOffset = _dataSectionStartOffSet;
+    int nextArrayStartOffset = _dataSectionStartOffSet + Integer.BYTES * (byteArrays.length + 1);
+    for (byte[] array : byteArrays) {
+      _dataBuffer.putInt(nextOffset, nextArrayStartOffset);
+      nextOffset += Integer.BYTES;
+      nextArrayStartOffset += array.length;
+    }
+
+    // Write the additional offset to easily get the length of last array.
+    _dataBuffer.putInt(nextOffset, nextArrayStartOffset);
+
+    // Finally write the byte arrays.
+    nextArrayStartOffset = _dataSectionStartOffSet + Integer.BYTES * (byteArrays.length + 1);
+    for (byte[] array : byteArrays) {
+      _dataBuffer.readFrom(nextArrayStartOffset, array);
+      nextArrayStartOffset += array.length;
+    }
+  }
+
+  public int getNumElements() {
+    return _numElements;
+  }
+
+  @Override
+  public int getInt(int index) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public long getLong(int index) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public float getFloat(int index) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public double getDouble(int index) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public String getUnpaddedString(int index, int numBytesPerValue, byte paddingByte, byte[] buffer) {
+    return StringUtil.decodeUtf8(getBytes(index, numBytesPerValue, buffer));
+  }
+
+  @Override
+  public String getPaddedString(int index, int numBytesPerValue, byte[] buffer) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public byte[] getBytes(int index, int numBytesPerValue, byte[] buffer) {
+    // Read the offset of the byte array first and then read the actual byte array.
+    int offset = _dataBuffer.getInt(_dataSectionStartOffSet + Integer.BYTES * index);
+
+    // To get the length of the byte array, we use the next byte array offset.
+    int length = _dataBuffer.getInt(_dataSectionStartOffSet + Integer.BYTES * (index + 1)) - offset;
+
+    byte[] b;
+    // If the caller didn't pass a buffer, create one with exact length.
+    if (buffer == null) {
+      b = new byte[length];
+    } else {
+      // If the buffer passed by the caller isn't big enough, create a new one.
+      b = buffer.length == length ? buffer : new byte[length];
+    }
+    _dataBuffer.copyTo(offset, b, 0, length);
+    return b;
+  }
+
+  @Override
+  public void close()
+      throws IOException {
+    _dataBuffer.close();
+  }
+}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/realtime/converter/RealtimeSegmentConverter.java b/pinot-core/src/main/java/org/apache/pinot/core/realtime/converter/RealtimeSegmentConverter.java
index 40be8e7..d833603 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/realtime/converter/RealtimeSegmentConverter.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/realtime/converter/RealtimeSegmentConverter.java
@@ -54,10 +54,12 @@ public class RealtimeSegmentConverter {
   private List<String> invertedIndexColumns;
   private List<String> noDictionaryColumns;
   private StarTreeIndexSpec starTreeIndexSpec;
+  private List<String> varLengthDictionaryColumns;
 
   public RealtimeSegmentConverter(MutableSegmentImpl realtimeSegment, String outputPath, Schema schema,
       String tableName, String timeColumnName, String segmentName, String sortedColumn,
-      List<String> invertedIndexColumns, List<String> noDictionaryColumns, StarTreeIndexSpec starTreeIndexSpec) {
+      List<String> invertedIndexColumns, List<String> noDictionaryColumns,
+      List<String> varLengthDictionaryColumns, StarTreeIndexSpec starTreeIndexSpec) {
     if (new File(outputPath).exists()) {
       throw new IllegalAccessError("path already exists:" + outputPath);
     }
@@ -73,13 +75,14 @@ public class RealtimeSegmentConverter {
     this.tableName = tableName;
     this.segmentName = segmentName;
     this.noDictionaryColumns = noDictionaryColumns;
+    this.varLengthDictionaryColumns = varLengthDictionaryColumns;
     this.starTreeIndexSpec = starTreeIndexSpec;
   }
 
   public RealtimeSegmentConverter(MutableSegmentImpl realtimeSegment, String outputPath, Schema schema,
       String tableName, String timeColumnName, String segmentName, String sortedColumn) {
     this(realtimeSegment, outputPath, schema, tableName, timeColumnName, segmentName, sortedColumn, new ArrayList<>(),
-        new ArrayList<>(), null/*StarTreeIndexSpec*/);
+        new ArrayList<>(), new ArrayList<>(), null/*StarTreeIndexSpec*/);
   }
 
   public void build(@Nullable SegmentVersion segmentVersion, ServerMetrics serverMetrics)
@@ -109,6 +112,10 @@ public class RealtimeSegmentConverter {
       genConfig.setRawIndexCompressionType(columnToCompressionType);
     }
 
+    if (varLengthDictionaryColumns != null) {
+      genConfig.setVarLengthDictionaryColumns(varLengthDictionaryColumns);
+    }
+
     // Presence of the spec enables star tree generation.
     if (starTreeIndexSpec != null) {
       genConfig.enableStarTreeIndex(starTreeIndexSpec);
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/realtime/impl/RealtimeSegmentConfig.java b/pinot-core/src/main/java/org/apache/pinot/core/realtime/impl/RealtimeSegmentConfig.java
index 63ebbbb..6d18265 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/realtime/impl/RealtimeSegmentConfig.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/realtime/impl/RealtimeSegmentConfig.java
@@ -32,6 +32,7 @@ public class RealtimeSegmentConfig {
   private final int _capacity;
   private final int _avgNumMultiValues;
   private final Set<String> _noDictionaryColumns;
+  private final Set<String> _varLengthDictionaryColumns;
   private final Set<String> _invertedIndexColumns;
   private final RealtimeSegmentZKMetadata _realtimeSegmentZKMetadata;
   private final boolean _offHeap;
@@ -41,8 +42,9 @@ public class RealtimeSegmentConfig {
   private final boolean _aggregateMetrics;
 
   private RealtimeSegmentConfig(String segmentName, String streamName, Schema schema, int capacity,
-      int avgNumMultiValues, Set<String> noDictionaryColumns, Set<String> invertedIndexColumns,
-      RealtimeSegmentZKMetadata realtimeSegmentZKMetadata, boolean offHeap, PinotDataBufferMemoryManager memoryManager,
+      int avgNumMultiValues, Set<String> noDictionaryColumns, Set<String> varLengthDictionaryColumns,
+      Set<String> invertedIndexColumns, RealtimeSegmentZKMetadata realtimeSegmentZKMetadata,
+      boolean offHeap, PinotDataBufferMemoryManager memoryManager,
       RealtimeSegmentStatsHistory statsHistory, SegmentPartitionConfig segmentPartitionConfig,
       boolean aggregateMetrics) {
     _segmentName = segmentName;
@@ -51,6 +53,7 @@ public class RealtimeSegmentConfig {
     _capacity = capacity;
     _avgNumMultiValues = avgNumMultiValues;
     _noDictionaryColumns = noDictionaryColumns;
+    _varLengthDictionaryColumns = varLengthDictionaryColumns;
     _invertedIndexColumns = invertedIndexColumns;
     _realtimeSegmentZKMetadata = realtimeSegmentZKMetadata;
     _offHeap = offHeap;
@@ -84,6 +87,10 @@ public class RealtimeSegmentConfig {
     return _noDictionaryColumns;
   }
 
+  public Set<String> getVarLengthDictionaryColumns() {
+    return _varLengthDictionaryColumns;
+  }
+
   public Set<String> getInvertedIndexColumns() {
     return _invertedIndexColumns;
   }
@@ -119,6 +126,7 @@ public class RealtimeSegmentConfig {
     private int _capacity;
     private int _avgNumMultiValues;
     private Set<String> _noDictionaryColumns;
+    private Set<String> _varLengthDictionaryColumns;
     private Set<String> _invertedIndexColumns;
     private RealtimeSegmentZKMetadata _realtimeSegmentZKMetadata;
     private boolean _offHeap;
@@ -160,6 +168,11 @@ public class RealtimeSegmentConfig {
       return this;
     }
 
+    public Builder setVarLengthDictionaryColumns(Set<String> varLengthDictionaryColumns) {
+      _varLengthDictionaryColumns = varLengthDictionaryColumns;
+      return this;
+    }
+
     public Builder setInvertedIndexColumns(Set<String> invertedIndexColumns) {
       _invertedIndexColumns = invertedIndexColumns;
       return this;
@@ -197,7 +210,8 @@ public class RealtimeSegmentConfig {
 
     public RealtimeSegmentConfig build() {
       return new RealtimeSegmentConfig(_segmentName, _streamName, _schema, _capacity, _avgNumMultiValues,
-          _noDictionaryColumns, _invertedIndexColumns, _realtimeSegmentZKMetadata, _offHeap, _memoryManager,
+          _noDictionaryColumns, _varLengthDictionaryColumns, _invertedIndexColumns,
+          _realtimeSegmentZKMetadata, _offHeap, _memoryManager,
           _statsHistory, _segmentPartitionConfig, _aggregateMetrics);
     }
   }
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/ColumnIndexCreationInfo.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/ColumnIndexCreationInfo.java
index 84f74d1..e435188 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/ColumnIndexCreationInfo.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/ColumnIndexCreationInfo.java
@@ -26,6 +26,7 @@ import org.apache.pinot.core.data.partition.PartitionFunction;
 
 public class ColumnIndexCreationInfo {
   private final boolean createDictionary;
+  private final boolean useVarLengthDictionary;
   private final ForwardIndexType forwardIndexType;
   private final InvertedIndexType invertedIndexType;
   private final boolean isAutoGenerated;
@@ -33,10 +34,11 @@ public class ColumnIndexCreationInfo {
   private final ColumnStatistics columnStatistics;
 
   public ColumnIndexCreationInfo(ColumnStatistics columnStatistics, boolean createDictionary,
-      ForwardIndexType forwardIndexType, InvertedIndexType invertedIndexType, boolean isAutoGenerated,
-      Object defaultNullValue) {
+      boolean useVarLengthDictionary, ForwardIndexType forwardIndexType,
+      InvertedIndexType invertedIndexType, boolean isAutoGenerated, Object defaultNullValue) {
     this.columnStatistics = columnStatistics;
     this.createDictionary = createDictionary;
+    this.useVarLengthDictionary = useVarLengthDictionary;
     this.forwardIndexType = forwardIndexType;
     this.invertedIndexType = invertedIndexType;
     this.isAutoGenerated = isAutoGenerated;
@@ -47,6 +49,10 @@ public class ColumnIndexCreationInfo {
     return createDictionary;
   }
 
+  public boolean isUseVarLengthDictionary() {
+    return useVarLengthDictionary;
+  }
+
   public Object getMin() {
     return columnStatistics.getMinValue();
   }
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/SegmentColumnarIndexCreator.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/SegmentColumnarIndexCreator.java
index 2016776..7b08a48 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/SegmentColumnarIndexCreator.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/SegmentColumnarIndexCreator.java
@@ -129,7 +129,8 @@ public class SegmentColumnarIndexCreator implements SegmentCreator {
 
         // Initialize dictionary creator
         SegmentDictionaryCreator dictionaryCreator =
-            new SegmentDictionaryCreator(indexCreationInfo.getSortedUniqueElementsArray(), fieldSpec, _indexDir);
+            new SegmentDictionaryCreator(indexCreationInfo.getSortedUniqueElementsArray(),
+                fieldSpec, _indexDir, indexCreationInfo.isUseVarLengthDictionary());
         _dictionaryCreatorMap.put(columnName, dictionaryCreator);
 
         // Create dictionary
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/SegmentDictionaryCreator.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/SegmentDictionaryCreator.java
index f9243ef..105c5fa 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/SegmentDictionaryCreator.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/SegmentDictionaryCreator.java
@@ -34,6 +34,7 @@ import org.apache.pinot.common.data.FieldSpec;
 import org.apache.pinot.common.utils.StringUtil;
 import org.apache.pinot.common.utils.primitive.ByteArray;
 import org.apache.pinot.core.io.util.FixedByteValueReaderWriter;
+import org.apache.pinot.core.io.util.VarLengthBytesValueReaderWriter;
 import org.apache.pinot.core.segment.memory.PinotDataBuffer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -45,6 +46,7 @@ public class SegmentDictionaryCreator implements Closeable {
   private final Object _sortedValues;
   private final FieldSpec _fieldSpec;
   private final File _dictionaryFile;
+  private final boolean _useVarLengthDictionary;
 
   private Int2IntOpenHashMap _intValueToIndexMap;
   private Long2IntOpenHashMap _longValueToIndexMap;
@@ -54,12 +56,18 @@ public class SegmentDictionaryCreator implements Closeable {
   private Object2IntOpenHashMap<ByteArray> _bytesValueToIndexMap;
   private int _numBytesPerEntry = 0;
 
-  public SegmentDictionaryCreator(Object sortedValues, FieldSpec fieldSpec, File indexDir)
-      throws IOException {
+  public SegmentDictionaryCreator(Object sortedValues, FieldSpec fieldSpec, File indexDir,
+      boolean useVarLengthDictionary) throws IOException {
     _sortedValues = sortedValues;
     _fieldSpec = fieldSpec;
     _dictionaryFile = new File(indexDir, fieldSpec.getName() + V1Constants.Dict.FILE_EXTENSION);
     FileUtils.touch(_dictionaryFile);
+    _useVarLengthDictionary = useVarLengthDictionary;
+  }
+
+  public SegmentDictionaryCreator(Object sortedValues, FieldSpec fieldSpec, File indexDir)
+      throws IOException {
+    this(sortedValues, fieldSpec, indexDir, false);
   }
 
   public void build()
@@ -163,16 +171,7 @@ public class SegmentDictionaryCreator implements Closeable {
           _numBytesPerEntry = Math.max(_numBytesPerEntry, valueBytes.length);
         }
 
-        // Backward-compatible: index file is always big-endian
-        try (PinotDataBuffer dataBuffer = PinotDataBuffer
-            .mapFile(_dictionaryFile, false, 0, (long) numValues * _numBytesPerEntry, ByteOrder.BIG_ENDIAN,
-                getClass().getSimpleName());
-            FixedByteValueReaderWriter writer = new FixedByteValueReaderWriter(dataBuffer)) {
-          for (int i = 0; i < numValues; i++) {
-            byte[] value = sortedStringBytes[i];
-            writer.writeUnpaddedString(i, _numBytesPerEntry, value);
-          }
-        }
+        writeBytesValueDictionary(sortedStringBytes);
         LOGGER.info(
             "Created dictionary for STRING column: {} with cardinality: {}, max length in bytes: {}, range: {} to {}",
             _fieldSpec.getName(), numValues, _numBytesPerEntry, sortedStrings[0], sortedStrings[numValues - 1]);
@@ -185,22 +184,15 @@ public class SegmentDictionaryCreator implements Closeable {
         Preconditions.checkState(numValues > 0);
         _bytesValueToIndexMap = new Object2IntOpenHashMap<>(numValues);
 
+        byte[][] sortedByteArrays = new byte[sortedBytes.length][];
         for (int i = 0; i < numValues; i++) {
           ByteArray value = sortedBytes[i];
+          sortedByteArrays[i] = value.getBytes();
           _bytesValueToIndexMap.put(value, i);
           _numBytesPerEntry = Math.max(_numBytesPerEntry, value.getBytes().length);
         }
 
-        // Backward-compatible: index file is always big-endian
-        try (PinotDataBuffer dataBuffer = PinotDataBuffer
-            .mapFile(_dictionaryFile, false, 0, (long) numValues * _numBytesPerEntry, ByteOrder.BIG_ENDIAN,
-                getClass().getSimpleName());
-            FixedByteValueReaderWriter writer = new FixedByteValueReaderWriter(dataBuffer)) {
-          for (int i = 0; i < numValues; i++) {
-            byte[] value = sortedBytes[i].getBytes();
-            writer.writeUnpaddedString(i, _numBytesPerEntry, value);
-          }
-        }
+        writeBytesValueDictionary(sortedByteArrays);
         LOGGER.info(
             "Created dictionary for BYTES column: {} with cardinality: {}, max length in bytes: {}, range: {} to {}",
             _fieldSpec.getName(), numValues, _numBytesPerEntry, sortedBytes[0], sortedBytes[numValues - 1]);
@@ -211,6 +203,42 @@ public class SegmentDictionaryCreator implements Closeable {
     }
   }
 
+  /**
+   * Helper method to write the given sorted byte[][] to an immutable bytes value dictionary.
+   * The dictionary implementation is chosen based on configuration at column level.
+   *
+   * @param sortedByteArrays The actual sorted byte arrays to be written to the store.
+   */
+  private void writeBytesValueDictionary(byte[][] sortedByteArrays)
+      throws IOException {
+
+    if (_useVarLengthDictionary) {
+      // Backward-compatible: index file is always big-endian
+      long size = VarLengthBytesValueReaderWriter.getRequiredSize(sortedByteArrays);
+      try (PinotDataBuffer dataBuffer = PinotDataBuffer
+          .mapFile(_dictionaryFile, false, 0, size, ByteOrder.BIG_ENDIAN, getClass().getSimpleName());
+          VarLengthBytesValueReaderWriter writer = new VarLengthBytesValueReaderWriter(dataBuffer, sortedByteArrays)) {
+
+        LOGGER.info("Using variable length bytes dictionary for column: {}, size: {}, numElements: {}",
+            _fieldSpec.getName(), size, writer.getNumElements());
+      }
+    } else {
+      // Backward-compatible: index file is always big-endian
+      try (PinotDataBuffer dataBuffer = PinotDataBuffer
+          .mapFile(_dictionaryFile, false, 0, (long) sortedByteArrays.length * _numBytesPerEntry, ByteOrder.BIG_ENDIAN,
+              getClass().getSimpleName());
+          FixedByteValueReaderWriter writer = new FixedByteValueReaderWriter(dataBuffer)) {
+        for (int i = 0; i < sortedByteArrays.length; i++) {
+          byte[] value = sortedByteArrays[i];
+          writer.writeUnpaddedString(i, _numBytesPerEntry, value);
+        }
+
+        LOGGER.info("Using fixed bytes value dictionary for column: {}, size: {}", _fieldSpec.getName(),
+            (long) sortedByteArrays.length * _numBytesPerEntry);
+      }
+    }
+  }
+
   public int getNumBytesPerEntry() {
     return _numBytesPerEntry;
   }
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/SegmentIndexCreationDriverImpl.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/SegmentIndexCreationDriverImpl.java
index 645165d..248f16d 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/SegmentIndexCreationDriverImpl.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/SegmentIndexCreationDriverImpl.java
@@ -25,10 +25,12 @@ import java.io.FileOutputStream;
 import java.io.IOException;
 import java.util.Collection;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
+import java.util.Set;
 import org.apache.commons.io.FileUtils;
 import org.apache.pinot.common.data.FieldSpec;
 import org.apache.pinot.common.data.MetricFieldSpec;
@@ -482,8 +484,10 @@ public class SegmentIndexCreationDriverImpl implements SegmentIndexCreationDrive
       }
 
       ColumnStatistics columnProfile = segmentStats.getColumnProfileFor(column);
+      Set<String> varLengthDictionaryColumns = new HashSet<>(config.getVarLengthDictionaryColumns());
       indexCreationInfoMap.put(column,
-          new ColumnIndexCreationInfo(columnProfile, true/*createDictionary*/, ForwardIndexType.FIXED_BIT_COMPRESSED,
+          new ColumnIndexCreationInfo(columnProfile, true/*createDictionary*/,
+              varLengthDictionaryColumns.contains(column), ForwardIndexType.FIXED_BIT_COMPRESSED,
               InvertedIndexType.ROARING_BITMAPS, false/*isAutoGenerated*/,
               dataSchema.getFieldSpecFor(column).getDefaultNullValue()));
     }
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/loader/IndexLoadingConfig.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/loader/IndexLoadingConfig.java
index 7560ec9..7dcda28 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/loader/IndexLoadingConfig.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/loader/IndexLoadingConfig.java
@@ -46,6 +46,7 @@ public class IndexLoadingConfig {
   private Set<String> _invertedIndexColumns = new HashSet<>();
   private Set<String> _noDictionaryColumns = new HashSet<>(); // TODO: replace this by _noDictionaryConfig.
   private Map<String, String> _noDictionaryConfig = new HashMap<>();
+  private Set<String> _varLengthDictionaryColumns = new HashSet<>();
   private Set<String> _onHeapDictionaryColumns = new HashSet<>();
   private Set<String> _bloomFilterColumns = new HashSet<>();
 
@@ -95,6 +96,11 @@ public class IndexLoadingConfig {
       _noDictionaryConfig.putAll(noDictionaryConfig);
     }
 
+    List<String> varLengthDictionaryColumns = indexingConfig.getVarLengthDictionaryColumns();
+    if (varLengthDictionaryColumns != null) {
+      _varLengthDictionaryColumns.addAll(varLengthDictionaryColumns);
+    }
+
     List<String> onHeapDictionaryColumns = indexingConfig.getOnHeapDictionaryColumns();
     if (onHeapDictionaryColumns != null) {
       _onHeapDictionaryColumns.addAll(onHeapDictionaryColumns);
@@ -192,6 +198,11 @@ public class IndexLoadingConfig {
   }
 
   @Nonnull
+  public Set<String> getVarLengthDictionaryColumns() {
+    return _varLengthDictionaryColumns;
+  }
+
+  @Nonnull
   public Set<String> getOnHeapDictionaryColumns() {
     return _onHeapDictionaryColumns;
   }
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/loader/defaultcolumn/BaseDefaultColumnHandler.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/loader/defaultcolumn/BaseDefaultColumnHandler.java
index 9509982..232bd91 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/loader/defaultcolumn/BaseDefaultColumnHandler.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/loader/defaultcolumn/BaseDefaultColumnHandler.java
@@ -311,12 +311,14 @@ public abstract class BaseDefaultColumnHandler implements DefaultColumnHandler {
             totalDocs, maxNumberOfMultiValueElements);
 
     ColumnIndexCreationInfo columnIndexCreationInfo =
-        new ColumnIndexCreationInfo(columnStatistics, true/*createDictionary*/, ForwardIndexType.FIXED_BIT_COMPRESSED,
-            InvertedIndexType.SORTED_INDEX, true/*isAutoGenerated*/, defaultValue/*defaultNullValue*/);
+        new ColumnIndexCreationInfo(columnStatistics, true/*createDictionary*/, false,
+            ForwardIndexType.FIXED_BIT_COMPRESSED, InvertedIndexType.SORTED_INDEX, true/*isAutoGenerated*/,
+            defaultValue/*defaultNullValue*/);
 
     // Create dictionary.
     // We will have only one value in the dictionary.
-    try (SegmentDictionaryCreator creator = new SegmentDictionaryCreator(sortedArray, fieldSpec, _indexDir)) {
+    try (SegmentDictionaryCreator creator =
+        new SegmentDictionaryCreator(sortedArray, fieldSpec, _indexDir, false)) {
       creator.build();
     }
 
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/readers/ImmutableDictionaryReader.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/readers/ImmutableDictionaryReader.java
index 80541df..181de10 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/readers/ImmutableDictionaryReader.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/readers/ImmutableDictionaryReader.java
@@ -25,6 +25,7 @@ import org.apache.pinot.common.utils.StringUtil;
 import org.apache.pinot.common.utils.primitive.ByteArray;
 import org.apache.pinot.core.io.util.FixedByteValueReaderWriter;
 import org.apache.pinot.core.io.util.ValueReader;
+import org.apache.pinot.core.io.util.VarLengthBytesValueReaderWriter;
 import org.apache.pinot.core.segment.memory.PinotDataBuffer;
 
 
@@ -36,11 +37,18 @@ public abstract class ImmutableDictionaryReader extends BaseDictionary {
   private final byte _paddingByte;
 
   protected ImmutableDictionaryReader(PinotDataBuffer dataBuffer, int length, int numBytesPerValue, byte paddingByte) {
-    Preconditions.checkState(dataBuffer.size() == length * numBytesPerValue);
-    _valueReader = new FixedByteValueReaderWriter(dataBuffer);
+    if (VarLengthBytesValueReaderWriter.isVarLengthBytesDictBuffer(dataBuffer)) {
+      _valueReader = new VarLengthBytesValueReaderWriter(dataBuffer);
+      _numBytesPerValue = -1;
+      _paddingByte = 0;
+    }
+    else {
+      Preconditions.checkState(dataBuffer.size() == length * numBytesPerValue);
+      _valueReader = new FixedByteValueReaderWriter(dataBuffer);
+      _numBytesPerValue = numBytesPerValue;
+      _paddingByte = paddingByte;
+    }
     _length = length;
-    _numBytesPerValue = numBytesPerValue;
-    _paddingByte = paddingByte;
   }
 
   protected ImmutableDictionaryReader(ValueReader valueReader, int length) {
@@ -244,6 +252,6 @@ public abstract class ImmutableDictionaryReader extends BaseDictionary {
   }
 
   protected byte[] getBuffer() {
-    return new byte[_numBytesPerValue];
+    return _numBytesPerValue == -1 ? null : new byte[_numBytesPerValue];
   }
 }
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/store/SegmentLocalFSDirectory.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/store/SegmentLocalFSDirectory.java
index 51fcdb0..c56da38 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/segment/store/SegmentLocalFSDirectory.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/store/SegmentLocalFSDirectory.java
@@ -126,7 +126,7 @@ class SegmentLocalFSDirectory extends SegmentDirectory {
       try {
         return FileUtils.sizeOfDirectory(segmentDirectory.toPath().toFile());
       } catch (IllegalArgumentException e) {
-        LOGGER.error("Failed to read disk size for direcotry: ", segmentDirectory.getAbsolutePath());
+        LOGGER.error("Failed to read disk size for directory: ", segmentDirectory.getAbsolutePath());
         return -1;
       }
     } else {
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/indexsegment/mutable/MutableSegmentImplAggregateMetricsTest.java b/pinot-core/src/test/java/org/apache/pinot/core/indexsegment/mutable/MutableSegmentImplAggregateMetricsTest.java
index ceed347..5974cb2 100644
--- a/pinot-core/src/test/java/org/apache/pinot/core/indexsegment/mutable/MutableSegmentImplAggregateMetricsTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/core/indexsegment/mutable/MutableSegmentImplAggregateMetricsTest.java
@@ -52,6 +52,7 @@ public class MutableSegmentImplAggregateMetricsTest {
         .build();
     _mutableSegmentImpl = MutableSegmentImplTestUtils
         .createMutableSegmentImpl(schema, new HashSet<>(Arrays.asList(DIMENSION_1, METRIC)),
+            new HashSet<>(Arrays.asList(DIMENSION_1)),
             Collections.singleton(DIMENSION_1), true);
   }
 
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/indexsegment/mutable/MutableSegmentImplTest.java b/pinot-core/src/test/java/org/apache/pinot/core/indexsegment/mutable/MutableSegmentImplTest.java
index af1a0ab..70ca0bb 100644
--- a/pinot-core/src/test/java/org/apache/pinot/core/indexsegment/mutable/MutableSegmentImplTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/core/indexsegment/mutable/MutableSegmentImplTest.java
@@ -76,7 +76,8 @@ public class MutableSegmentImplTest {
 
     _schema = config.getSchema();
     _mutableSegmentImpl = MutableSegmentImplTestUtils
-        .createMutableSegmentImpl(_schema, Collections.emptySet(), Collections.emptySet(), false);
+        .createMutableSegmentImpl(_schema, Collections.emptySet(), Collections.emptySet(),
+            Collections.emptySet(),false);
     _lastIngestionTimeMs = System.currentTimeMillis();
     StreamMessageMetadata defaultMetadata = new StreamMessageMetadata(_lastIngestionTimeMs);
     _startTimeMs = System.currentTimeMillis();
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/indexsegment/mutable/MutableSegmentImplTestUtils.java b/pinot-core/src/test/java/org/apache/pinot/core/indexsegment/mutable/MutableSegmentImplTestUtils.java
index 96e7bda..b60bed5 100644
--- a/pinot-core/src/test/java/org/apache/pinot/core/indexsegment/mutable/MutableSegmentImplTestUtils.java
+++ b/pinot-core/src/test/java/org/apache/pinot/core/indexsegment/mutable/MutableSegmentImplTestUtils.java
@@ -39,7 +39,8 @@ public class MutableSegmentImplTestUtils {
   private static final String STEAM_NAME = "testStream";
 
   public static MutableSegmentImpl createMutableSegmentImpl(@Nonnull Schema schema,
-      @Nonnull Set<String> noDictionaryColumns, @Nonnull Set<String> invertedIndexColumns, boolean aggregateMetrics) {
+      @Nonnull Set<String> noDictionaryColumns, @Nonnull Set<String> varLengthDictionaryColumns,
+      @Nonnull Set<String> invertedIndexColumns, boolean aggregateMetrics) {
     RealtimeSegmentStatsHistory statsHistory = mock(RealtimeSegmentStatsHistory.class);
     when(statsHistory.getEstimatedCardinality(anyString())).thenReturn(200);
     when(statsHistory.getEstimatedAvgColSize(anyString())).thenReturn(32);
@@ -47,6 +48,7 @@ public class MutableSegmentImplTestUtils {
     RealtimeSegmentConfig realtimeSegmentConfig =
         new RealtimeSegmentConfig.Builder().setSegmentName(SEGMENT_NAME).setStreamName(STEAM_NAME).setSchema(schema)
             .setCapacity(100000).setAvgNumMultiValues(2).setNoDictionaryColumns(noDictionaryColumns)
+            .setVarLengthDictionaryColumns(varLengthDictionaryColumns)
             .setInvertedIndexColumns(invertedIndexColumns).setRealtimeSegmentZKMetadata(new RealtimeSegmentZKMetadata())
             .setMemoryManager(new DirectMemoryManager(SEGMENT_NAME)).setStatsHistory(statsHistory)
             .setAggregateMetrics(aggregateMetrics).build();
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/io/util/VarLengthBytesValueReaderWriterTest.java b/pinot-core/src/test/java/org/apache/pinot/core/io/util/VarLengthBytesValueReaderWriterTest.java
new file mode 100644
index 0000000..069da08
--- /dev/null
+++ b/pinot-core/src/test/java/org/apache/pinot/core/io/util/VarLengthBytesValueReaderWriterTest.java
@@ -0,0 +1,118 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.io.util;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteOrder;
+import java.util.Arrays;
+import java.util.Random;
+import org.apache.commons.io.FileUtils;
+import org.apache.pinot.core.segment.memory.PinotDataBuffer;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+/**
+ * Unit test for {@link VarLengthBytesValueReaderWriter}
+ */
+public class VarLengthBytesValueReaderWriterTest {
+  private final Random random = new Random();
+
+  @Test
+  public void testEmptyDictionary()
+      throws IOException {
+    byte[][] byteArrays = new byte[][]{};
+    long size = VarLengthBytesValueReaderWriter.getRequiredSize(byteArrays);
+    Assert.assertEquals(size, 20);
+
+    final File tempFile =
+        new File(FileUtils.getTempDirectory(), VarLengthBytesValueReaderWriterTest.class.getName() + random.nextInt());
+
+    try (PinotDataBuffer buffer = PinotDataBuffer.mapFile(tempFile, false, 0, size, ByteOrder.BIG_ENDIAN, null)) {
+      VarLengthBytesValueReaderWriter readerWriter = new VarLengthBytesValueReaderWriter(buffer, byteArrays);
+      Assert.assertEquals(readerWriter.getNumElements(), 0);
+    }
+
+    try (PinotDataBuffer buffer = PinotDataBuffer.mapFile(tempFile, true, 0, size, ByteOrder.BIG_ENDIAN, null)) {
+      VarLengthBytesValueReaderWriter readerWriter = new VarLengthBytesValueReaderWriter(buffer);
+      Assert.assertEquals(readerWriter.getNumElements(), 0);
+    } finally {
+      FileUtils.forceDelete(tempFile);
+    }
+  }
+
+  @Test
+  public void testSingleByteArray()
+      throws IOException {
+    byte[] array = new byte[]{1, 2, 3, 4};
+    byte[][] byteArrays = new byte[][]{array};
+    long size = VarLengthBytesValueReaderWriter.getRequiredSize(byteArrays);
+    Assert.assertEquals(size, 28);
+
+    final File tempFile =
+        new File(FileUtils.getTempDirectory(), VarLengthBytesValueReaderWriterTest.class.getName() + random.nextInt());
+
+    try (PinotDataBuffer buffer = PinotDataBuffer.mapFile(tempFile, false, 0, size, ByteOrder.BIG_ENDIAN, null)) {
+      VarLengthBytesValueReaderWriter readerWriter = new VarLengthBytesValueReaderWriter(buffer, byteArrays);
+      Assert.assertEquals(readerWriter.getNumElements(), 1);
+    }
+
+    try (PinotDataBuffer buffer = PinotDataBuffer.mapFile(tempFile, true, 0, size, ByteOrder.BIG_ENDIAN, null)) {
+      VarLengthBytesValueReaderWriter readerWriter = new VarLengthBytesValueReaderWriter(buffer);
+      Assert.assertEquals(readerWriter.getNumElements(), 1);
+      byte[] newArray = readerWriter.getBytes(0, -1, null);
+      Assert.assertTrue(Arrays.equals(array, newArray));
+    } finally {
+      FileUtils.forceDelete(tempFile);
+    }
+  }
+
+  @Test
+  public void testArbitraryLengthByteArray()
+      throws IOException {
+    Random random = new Random();
+    int numByteArrays = random.nextInt(100);
+    byte[][] byteArrays = new byte[numByteArrays][];
+    for (int i = 0; i < numByteArrays; i++) {
+      byteArrays[i] = new byte[i + 1];
+      random.nextBytes(byteArrays[i]);
+    }
+    long size = VarLengthBytesValueReaderWriter.getRequiredSize(byteArrays);
+
+    final File tempFile =
+        new File(FileUtils.getTempDirectory(), VarLengthBytesValueReaderWriterTest.class.getName() + random.nextInt());
+
+    try (PinotDataBuffer buffer = PinotDataBuffer.mapFile(tempFile, false, 0, size, ByteOrder.BIG_ENDIAN, null)) {
+      VarLengthBytesValueReaderWriter readerWriter = new VarLengthBytesValueReaderWriter(buffer, byteArrays);
+      Assert.assertEquals(byteArrays.length, readerWriter.getNumElements());
+    }
+
+    try (PinotDataBuffer buffer = PinotDataBuffer.mapFile(tempFile, false, 0, size, ByteOrder.BIG_ENDIAN, null)) {
+      VarLengthBytesValueReaderWriter readerWriter = new VarLengthBytesValueReaderWriter(buffer);
+      Assert.assertEquals(byteArrays.length, readerWriter.getNumElements());
+      for (int i = 0; i < byteArrays.length; i++) {
+        byte[] array = byteArrays[i];
+        byte[] newArray = readerWriter.getBytes(i, -1, null);
+        Assert.assertTrue(Arrays.equals(array, newArray));
+      }
+    } finally {
+      FileUtils.forceDelete(tempFile);
+    }
+  }
+}
diff --git a/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkDictionaryCreation.java b/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkDictionaryCreation.java
index dcc63ca..83fb6e2 100644
--- a/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkDictionaryCreation.java
+++ b/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkDictionaryCreation.java
@@ -129,6 +129,18 @@ public class BenchmarkDictionaryCreation {
     }
   }
 
+  @Benchmark
+  @BenchmarkMode(Mode.SampleTime)
+  @OutputTimeUnit(TimeUnit.MILLISECONDS)
+  public int benchmarkVarLengthStringDictionaryCreation()
+      throws IOException {
+    try (SegmentDictionaryCreator dictionaryCreator =
+        new SegmentDictionaryCreator(_sortedStrings, STRING_FIELD, INDEX_DIR, true)) {
+      dictionaryCreator.build();
+      return dictionaryCreator.indexOfSV("0");
+    }
+  }
+
   @TearDown
   public void tearDown()
       throws Exception {
diff --git a/pinot-perf/src/main/java/org/apache/pinot/perf/StringDictionaryPerfTest.java b/pinot-perf/src/main/java/org/apache/pinot/perf/StringDictionaryPerfTest.java
index c703a3e..10b3370 100644
--- a/pinot-perf/src/main/java/org/apache/pinot/perf/StringDictionaryPerfTest.java
+++ b/pinot-perf/src/main/java/org/apache/pinot/perf/StringDictionaryPerfTest.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pinot.perf;
 
+import com.google.common.base.Joiner;
 import java.io.File;
 import java.util.ArrayList;
 import java.util.HashMap;
@@ -27,6 +28,7 @@ import java.util.Random;
 import java.util.Set;
 import org.apache.commons.io.FileUtils;
 import org.apache.commons.lang.RandomStringUtils;
+import org.apache.commons.math.stat.descriptive.DescriptiveStatistics;
 import org.apache.pinot.common.data.DimensionFieldSpec;
 import org.apache.pinot.common.data.FieldSpec;
 import org.apache.pinot.common.data.Schema;
@@ -45,12 +47,18 @@ import org.apache.pinot.core.segment.index.readers.Dictionary;
  * Performance test for lookup in string dictionary.
  */
 public class StringDictionaryPerfTest {
-  private static final int MAX_STRING_LENGTH = 100;
+  private static final int MAX_STRING_LENGTH = 1000;
+  private static final boolean USE_FIXED_SIZE_STRING = true;
   private static final String TMP_DIR = System.getProperty("java.io.tmpdir");
   private static final String COLUMN_NAME = "test";
-  private static final int TOTAL_NUM_LOOKUPS = 100_000;
-
-  String[] _inputStrings;
+  private static final String[] STATS_HEADERS = new String[] {
+      "DictSize", "TimeTaken(ms)", "SegmentSize", "NumLookups", "Min", "Max", "Mean",
+      "StdDev", "Median", "Skewness", "Kurtosis", "Variance", "BufferSize"
+  };
+  private static final Joiner COMMA_JOINER = Joiner.on(",");
+
+  private final DescriptiveStatistics _statistics = new DescriptiveStatistics();
+  private String[] _inputStrings;
   private File _indexDir;
   private int _dictLength;
 
@@ -64,7 +72,7 @@ public class StringDictionaryPerfTest {
    * @param dictLength Length of the dictionary
    * @throws Exception
    */
-  public void buildSegment(int dictLength)
+  private void buildSegment(int dictLength)
       throws Exception {
     Schema schema = new Schema();
     String segmentName = "perfTestSegment" + System.currentTimeMillis();
@@ -89,14 +97,17 @@ public class StringDictionaryPerfTest {
     int i = 0;
     while (i < dictLength) {
       HashMap<String, Object> map = new HashMap<>();
-      String randomString = RandomStringUtils.randomAlphanumeric(1 + random.nextInt(MAX_STRING_LENGTH));
+      String randomString = RandomStringUtils.randomAlphanumeric(
+          USE_FIXED_SIZE_STRING ? MAX_STRING_LENGTH : (1 + random.nextInt(MAX_STRING_LENGTH)));
 
       if (uniqueStrings.contains(randomString)) {
         continue;
       }
 
       _inputStrings[i] = randomString;
-      uniqueStrings.add(randomString);
+      if (uniqueStrings.add(randomString)) {
+        _statistics.addValue(randomString.length());
+      }
       map.put("test", _inputStrings[i++]);
 
       GenericRow genericRow = new GenericRow();
@@ -130,20 +141,61 @@ public class StringDictionaryPerfTest {
     }
 
     FileUtils.deleteQuietly(_indexDir);
-    System.out.println("Total time for " + TOTAL_NUM_LOOKUPS + " lookups: " + (System.currentTimeMillis() - start));
+    System.out.println("Total time for " + numLookups + " lookups: "
+        + (System.currentTimeMillis() - start) + "ms");
   }
 
-  public static void main(String[] args)
-      throws Exception {
-    if (args.length != 2) {
-      System.out.println("Usage: StringDictionaryPerfRunner <dictionary_length> <num_lookups> ");
+  /**
+   * Measures the performance of string dictionary lookups by dictId performing the provided
+   * number of reads from dictionary at random indices.
+   *
+   * @param numGetValues Number of values to read from the dictionary
+   * @throws Exception
+   */
+  private String[] perfTestGetValues(int numGetValues) throws Exception {
+    ImmutableSegment immutableSegment = ImmutableSegmentLoader.load(_indexDir, ReadMode.heap);
+    Dictionary dictionary = immutableSegment.getDictionary(COLUMN_NAME);
+
+    Random random = new Random(System.nanoTime());
+    long start = System.currentTimeMillis();
+
+    for (int i = 0; i < numGetValues; i++) {
+      int index = random.nextInt(_dictLength);
+      dictionary.get(index);
+    }
+
+    long segmentSize = immutableSegment.getSegmentSizeBytes();
+    FileUtils.deleteQuietly(_indexDir);
+    long time = System.currentTimeMillis() - start;
+    System.out.println("Total time for " + numGetValues + " lookups: " + time + "ms");
+    return new String[] {String.valueOf(_statistics.getN()), String.valueOf(time),
+        String.valueOf(segmentSize), String.valueOf(numGetValues),
+        String.valueOf(_statistics.getMin()), String.valueOf(_statistics.getMax()),
+        String.valueOf(_statistics.getMean()), String.valueOf(_statistics.getStandardDeviation()),
+        String.valueOf(_statistics.getPercentile(50.0D)),
+        String.valueOf(_statistics.getSkewness()), String.valueOf(_statistics.getKurtosis()),
+        String.valueOf(_statistics.getVariance())
+    };
+  }
+
+  public static void main(String[] args) throws Exception {
+    if (args.length < 2) {
+      System.out.println("Usage: StringDictionaryPerfTest <dictionary_length> <dictionary_length> ... <num_lookups> ");
     }
 
-    int dictLength = Integer.valueOf(args[0]);
-    int numLookups = Integer.valueOf(args[1]);
+    int numLookups = Integer.valueOf(args[args.length - 1]);
 
-    StringDictionaryPerfTest test = new StringDictionaryPerfTest();
-    test.buildSegment(dictLength);
-    test.perfTestLookups(numLookups);
+    String[][] stats = new String[args.length][];
+    stats[0] = STATS_HEADERS;
+    for (int i = 0; i < args.length - 1; i++) {
+      int dictLength = Integer.valueOf(args[i]);
+      StringDictionaryPerfTest test = new StringDictionaryPerfTest();
+      test.buildSegment(dictLength);
+//    test.perfTestLookups(numLookups);
+      stats[i + 1] = test.perfTestGetValues(numLookups);
+    }
+    for (String[] s: stats) {
+      System.out.println(COMMA_JOINER.join(s));
+    }
   }
 }
diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/realtime/provisioning/MemoryEstimator.java b/pinot-tools/src/main/java/org/apache/pinot/tools/realtime/provisioning/MemoryEstimator.java
index 52ba2b4..8baf3c6 100644
--- a/pinot-tools/src/main/java/org/apache/pinot/tools/realtime/provisioning/MemoryEstimator.java
+++ b/pinot-tools/src/main/java/org/apache/pinot/tools/realtime/provisioning/MemoryEstimator.java
@@ -59,6 +59,7 @@ public class MemoryEstimator {
   private long _sampleCompletedSegmentSizeBytes;
   private Set<String> _invertedIndexColumns = new HashSet<>();
   private Set<String> _noDictionaryColumns = new HashSet<>();
+  private Set<String> _varLengthDictionaryColumns = new HashSet<>();
   int _avgMultiValues;
   private File _tableDataDir;
 
@@ -83,6 +84,9 @@ public class MemoryEstimator {
     if (CollectionUtils.isNotEmpty(_tableConfig.getIndexingConfig().getNoDictionaryColumns())) {
       _noDictionaryColumns.addAll(_tableConfig.getIndexingConfig().getNoDictionaryColumns());
     }
+    if (CollectionUtils.isNotEmpty(_tableConfig.getIndexingConfig().getVarLengthDictionaryColumns())) {
+      _varLengthDictionaryColumns.addAll(_tableConfig.getIndexingConfig().getVarLengthDictionaryColumns());
+    }
     if (CollectionUtils.isNotEmpty(_tableConfig.getIndexingConfig().getInvertedIndexColumns())) {
       _invertedIndexColumns.addAll(_tableConfig.getIndexingConfig().getInvertedIndexColumns());
     }
@@ -123,8 +127,11 @@ public class MemoryEstimator {
         new RealtimeSegmentConfig.Builder().setSegmentName(_segmentMetadata.getName())
             .setStreamName(_tableNameWithType).setSchema(_segmentMetadata.getSchema())
             .setCapacity(_segmentMetadata.getTotalDocs()).setAvgNumMultiValues(_avgMultiValues)
-            .setNoDictionaryColumns(_noDictionaryColumns).setInvertedIndexColumns(_invertedIndexColumns)
-            .setRealtimeSegmentZKMetadata(segmentZKMetadata).setOffHeap(true).setMemoryManager(memoryManager)
+            .setNoDictionaryColumns(_noDictionaryColumns)
+            .setVarLengthDictionaryColumns(_varLengthDictionaryColumns)
+            .setInvertedIndexColumns(_invertedIndexColumns)
+            .setRealtimeSegmentZKMetadata(segmentZKMetadata).setOffHeap(true)
+            .setMemoryManager(memoryManager)
             .setStatsHistory(sampleStatsHistory);
 
     // create mutable segment impl
@@ -220,8 +227,11 @@ public class MemoryEstimator {
       RealtimeSegmentConfig.Builder realtimeSegmentConfigBuilder =
           new RealtimeSegmentConfig.Builder().setSegmentName(_segmentMetadata.getName())
               .setStreamName(_tableNameWithType).setSchema(_segmentMetadata.getSchema())
-              .setCapacity(totalDocs).setAvgNumMultiValues(_avgMultiValues).setNoDictionaryColumns(_noDictionaryColumns)
-              .setInvertedIndexColumns(_invertedIndexColumns).setRealtimeSegmentZKMetadata(segmentZKMetadata)
+              .setCapacity(totalDocs).setAvgNumMultiValues(_avgMultiValues)
+              .setNoDictionaryColumns(_noDictionaryColumns)
+              .setVarLengthDictionaryColumns(_varLengthDictionaryColumns)
+              .setInvertedIndexColumns(_invertedIndexColumns)
+              .setRealtimeSegmentZKMetadata(segmentZKMetadata)
               .setOffHeap(true).setMemoryManager(memoryManager).setStatsHistory(statsHistory);
 
       // create mutable segment impl


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org