You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by jl...@apache.org on 2021/01/22 21:06:18 UTC

[incubator-pinot] branch segment-creation-in-one-pass created (now 7bcaa01)

This is an automated email from the ASF dual-hosted git repository.

jlli pushed a change to branch segment-creation-in-one-pass
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git.


      at 7bcaa01  Support data ingestion for offline segment in one pass

This branch includes the following new commits:

     new 7bcaa01  Support data ingestion for offline segment in one pass

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[incubator-pinot] 01/01: Support data ingestion for offline segment in one pass

Posted by jl...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jlli pushed a commit to branch segment-creation-in-one-pass
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git

commit 7bcaa01d55070b36a1f3219e0cf33de8724e7084
Author: Jack Li(Analytics Engineering) <jl...@jlli-mn1.linkedin.biz>
AuthorDate: Fri Jan 22 13:04:21 2021 -0800

    Support data ingestion for offline segment in one pass
---
 .../readers/IntermediateSegmentRecordReader.java   |  78 +++++
 .../generator/SegmentGeneratorConfig.java          |   9 +
 .../indexsegment/mutable/IntermediateSegment.java  | 373 +++++++++++++++++++++
 .../RealtimeIndexOffHeapMemoryManager.java         |   8 +-
 ...tatistics.java => MutableColumnStatistics.java} |   4 +-
 ....java => MutableNoDictionaryColStatistics.java} |   4 +-
 .../stats/RealtimeSegmentStatsContainer.java       |   4 +-
 ...termediateSegmentSegmentCreationDataSource.java |  56 ++++
 .../creator/IntermediateSegmentStatsContainer.java |  52 +++
 .../creator/impl/SegmentColumnarIndexCreator.java  |   2 +-
 .../impl/SegmentIndexCreationDriverImpl.java       |  11 +-
 .../core/segment/creator/impl/V1Constants.java     |   2 +-
 .../index/column/IntermediateIndexContainer.java   | 134 ++++++++
 .../core/segment/index/column/NumValuesInfo.java   |  41 +++
 .../segment/index/loader/IndexLoadingConfig.java   |   3 +
 .../segment/index/metadata/ColumnMetadata.java     |   2 +-
 .../core/indexsegment/IntermediateSegmentTest.java | 222 ++++++++++++
 17 files changed, 992 insertions(+), 13 deletions(-)

diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/readers/IntermediateSegmentRecordReader.java b/pinot-core/src/main/java/org/apache/pinot/core/data/readers/IntermediateSegmentRecordReader.java
new file mode 100644
index 0000000..0c2daaf
--- /dev/null
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/readers/IntermediateSegmentRecordReader.java
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.data.readers;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Set;
+import javax.annotation.Nullable;
+import org.apache.pinot.core.indexsegment.mutable.IntermediateSegment;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.spi.data.readers.RecordReader;
+import org.apache.pinot.spi.data.readers.RecordReaderConfig;
+
+
+public class IntermediateSegmentRecordReader implements RecordReader {
+  private final IntermediateSegment _intermediateSegment;
+  private final int _numDocs;
+
+  private int _nextDocId = 0;
+
+  public IntermediateSegmentRecordReader(IntermediateSegment intermediateSegment) {
+    _intermediateSegment = intermediateSegment;
+    _numDocs = intermediateSegment.getNumDocsIndexed();
+  }
+
+  @Override
+  public void init(File dataFile, Set<String> fieldsToRead, @Nullable RecordReaderConfig recordReaderConfig)
+      throws IOException {
+  }
+
+  @Override
+  public boolean hasNext() {
+    return _nextDocId < _numDocs;
+  }
+
+  @Override
+  public GenericRow next()
+      throws IOException {
+    return next(new GenericRow());
+  }
+
+  @Override
+  public GenericRow next(GenericRow reuse)
+      throws IOException {
+    return _intermediateSegment.getRecord(_nextDocId++, reuse);
+  }
+
+  @Override
+  public void rewind()
+      throws IOException {
+    _nextDocId = 0;
+  }
+
+  @Override
+  public void close()
+      throws IOException {
+  }
+
+  public IntermediateSegment getIntermediateSegment() {
+    return _intermediateSegment;
+  }
+}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/generator/SegmentGeneratorConfig.java b/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/generator/SegmentGeneratorConfig.java
index c630a15..a65c312 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/generator/SegmentGeneratorConfig.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/generator/SegmentGeneratorConfig.java
@@ -102,6 +102,7 @@ public class SegmentGeneratorConfig implements Serializable {
   private boolean _onHeap = false;
   private boolean _skipTimeValueCheck = false;
   private boolean _nullHandlingEnabled = false;
+  private boolean _isIntermediateSegmentRecordReader = false;
 
   // constructed from FieldConfig
   private Map<String, Map<String, String>> _columnProperties = new HashMap<>();
@@ -650,4 +651,12 @@ public class SegmentGeneratorConfig implements Serializable {
   public void setNullHandlingEnabled(boolean nullHandlingEnabled) {
     _nullHandlingEnabled = nullHandlingEnabled;
   }
+
+  public boolean isIntermediateSegmentRecordReader() {
+    return _isIntermediateSegmentRecordReader;
+  }
+
+  public void setIntermediateSegmentRecordReader(boolean intermediateSegmentRecordReader) {
+    _isIntermediateSegmentRecordReader = intermediateSegmentRecordReader;
+  }
 }
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/mutable/IntermediateSegment.java b/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/mutable/IntermediateSegment.java
new file mode 100644
index 0000000..056a05b
--- /dev/null
+++ b/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/mutable/IntermediateSegment.java
@@ -0,0 +1,373 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.indexsegment.mutable;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import javax.annotation.Nullable;
+import org.apache.pinot.core.common.DataSource;
+import org.apache.pinot.core.data.partition.PartitionFunction;
+import org.apache.pinot.core.data.partition.PartitionFunctionFactory;
+import org.apache.pinot.core.indexsegment.generator.SegmentGeneratorConfig;
+import org.apache.pinot.core.io.readerwriter.PinotDataBufferMemoryManager;
+import org.apache.pinot.core.io.writer.impl.DirectMemoryManager;
+import org.apache.pinot.core.io.writer.impl.MmapMemoryManager;
+import org.apache.pinot.core.realtime.impl.dictionary.MutableDictionaryFactory;
+import org.apache.pinot.core.realtime.impl.forward.FixedByteMVMutableForwardIndex;
+import org.apache.pinot.core.realtime.impl.forward.FixedByteSVMutableForwardIndex;
+import org.apache.pinot.core.segment.creator.impl.V1Constants;
+import org.apache.pinot.core.segment.index.column.IntermediateIndexContainer;
+import org.apache.pinot.core.segment.index.column.NumValuesInfo;
+import org.apache.pinot.core.segment.index.loader.IndexLoadingConfig;
+import org.apache.pinot.core.segment.index.metadata.SegmentMetadata;
+import org.apache.pinot.core.segment.index.readers.MutableDictionary;
+import org.apache.pinot.core.segment.index.readers.MutableForwardIndex;
+import org.apache.pinot.core.segment.index.readers.ValidDocIndexReader;
+import org.apache.pinot.core.startree.v2.StarTreeV2;
+import org.apache.pinot.spi.config.table.ColumnPartitionConfig;
+import org.apache.pinot.spi.config.table.SegmentPartitionConfig;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.spi.stream.RowMetadata;
+import org.apache.pinot.spi.utils.ByteArray;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class IntermediateSegment implements MutableSegment {
+  private static final Logger LOGGER = LoggerFactory.getLogger(IntermediateSegment.class);
+
+  private static final int MAX_MULTI_VALUES_PER_ROW = 1000;
+  private static final int DEFAULT_CAPACITY = 100_000;
+  private static final int DEFAULT_EST_AVG_COL_SIZE = 32;
+  private static final int DEFAULT_EST_CARDINALITY = 5000;
+
+  private static final String STATS_FILE_NAME = "segment-stats.ser";
+
+  private final SegmentGeneratorConfig _segmentGeneratorConfig;
+  private final Schema _schema;
+  private final TableConfig _tableConfig;
+  private final String _segmentName;
+  private final PartitionFunction _partitionFunction;
+  private final String _partitionColumn;
+  private final Map<String, IntermediateIndexContainer> _indexContainerMap = new HashMap<>();
+  private final PinotDataBufferMemoryManager _memoryManager;
+
+  private final int _capacity = DEFAULT_CAPACITY;
+  private volatile int _numDocsIndexed = 0;
+
+  public IntermediateSegment(SegmentGeneratorConfig segmentGeneratorConfig) {
+    _segmentGeneratorConfig = segmentGeneratorConfig;
+    _schema = segmentGeneratorConfig.getSchema();
+    _tableConfig = segmentGeneratorConfig.getTableConfig();
+    _segmentName = _segmentGeneratorConfig.getSegmentName();
+
+    Collection<FieldSpec> allFieldSpecs = _schema.getAllFieldSpecs();
+    List<FieldSpec> physicalFieldSpecs = new ArrayList<>(allFieldSpecs.size());
+    physicalFieldSpecs.addAll(allFieldSpecs);
+    Collection<FieldSpec> physicalFieldSpecs1 = Collections.unmodifiableCollection(physicalFieldSpecs);
+
+    SegmentPartitionConfig segmentPartitionConfig = segmentGeneratorConfig.getSegmentPartitionConfig();
+    if (segmentPartitionConfig != null) {
+      Map<String, ColumnPartitionConfig> segmentPartitionConfigColumnPartitionMap =
+          segmentPartitionConfig.getColumnPartitionMap();
+      _partitionColumn = segmentPartitionConfigColumnPartitionMap.keySet().iterator().next();
+      _partitionFunction = PartitionFunctionFactory
+          .getPartitionFunction(segmentPartitionConfig.getFunctionName(_partitionColumn),
+              segmentPartitionConfig.getNumPartitions(_partitionColumn));
+    } else {
+      _partitionColumn = null;
+      _partitionFunction = null;
+    }
+
+    IndexLoadingConfig indexLoadingConfig = new IndexLoadingConfig(null, _tableConfig);
+    boolean offHeap = indexLoadingConfig.isRealtimeOffHeapAllocation();
+    boolean directOffHeap = indexLoadingConfig.isDirectRealtimeOffHeapAllocation();
+    if (offHeap && !directOffHeap) {
+      _memoryManager = new MmapMemoryManager(null, _segmentName, null);
+    } else {
+      _memoryManager = new DirectMemoryManager(_segmentName, null);
+    }
+
+    // Initialize for each column
+    for (FieldSpec fieldSpec : physicalFieldSpecs1) {
+      String column = fieldSpec.getName();
+
+      // Partition info
+      PartitionFunction partitionFunction = null;
+      Set<Integer> partitions = null;
+      if (column.equals(_partitionColumn)) {
+        partitionFunction = _partitionFunction;
+        partitions = new HashSet<>();
+        partitions.add(segmentGeneratorConfig.getSequenceId());
+      }
+
+      FieldSpec.DataType dataType = fieldSpec.getDataType();
+      boolean isFixedWidthColumn = dataType.isFixedWidth();
+      MutableForwardIndex forwardIndex;
+      MutableDictionary dictionary;
+
+      int dictionaryColumnSize;
+      if (isFixedWidthColumn) {
+        dictionaryColumnSize = dataType.size();
+      } else {
+        dictionaryColumnSize = DEFAULT_EST_AVG_COL_SIZE;
+      }
+      // NOTE: preserve 10% buffer for cardinality to reduce the chance of re-sizing the dictionary
+      int estimatedCardinality = (int) (DEFAULT_EST_CARDINALITY * 1.1);
+      String dictionaryAllocationContext =
+          buildAllocationContext(_segmentName, column, V1Constants.Dict.FILE_EXTENSION);
+      dictionary = MutableDictionaryFactory
+          .getMutableDictionary(dataType, offHeap, _memoryManager, dictionaryColumnSize,
+              Math.min(estimatedCardinality, _capacity), dictionaryAllocationContext);
+
+      if (fieldSpec.isSingleValueField()) {
+        // Single-value dictionary-encoded forward index
+        String allocationContext =
+            buildAllocationContext(_segmentName, column, V1Constants.Indexes.UNSORTED_SV_FORWARD_INDEX_FILE_EXTENSION);
+        forwardIndex = new FixedByteSVMutableForwardIndex(true, FieldSpec.DataType.INT, _capacity, _memoryManager,
+            allocationContext);
+      } else {
+        // Multi-value dictionary-encoded forward index
+        String allocationContext =
+            buildAllocationContext(_segmentName, column, V1Constants.Indexes.UNSORTED_MV_FORWARD_INDEX_FILE_EXTENSION);
+        // TODO: Start with a smaller capacity on FixedByteMVForwardIndexReaderWriter and let it expand
+        forwardIndex = new FixedByteMVMutableForwardIndex(MAX_MULTI_VALUES_PER_ROW,
+            indexLoadingConfig.getRealtimeAvgMultiValueCount(), _capacity, Integer.BYTES, _memoryManager,
+            allocationContext);
+      }
+
+      _indexContainerMap.put(column,
+          new IntermediateIndexContainer(fieldSpec, partitionFunction, partitions, new NumValuesInfo(), forwardIndex,
+              dictionary));
+    }
+  }
+
+  @Override
+  public boolean index(GenericRow row, @Nullable RowMetadata rowMetadata)
+      throws IOException {
+    updateDictionary(row);
+    addNewRow(row);
+    _numDocsIndexed++;
+    return true;
+  }
+
+  @Override
+  public int getNumDocsIndexed() {
+    return _numDocsIndexed;
+  }
+
+  @Override
+  public String getSegmentName() {
+    return _segmentName;
+  }
+
+  @Override
+  public SegmentMetadata getSegmentMetadata() {
+    return null;
+  }
+
+  @Override
+  public Set<String> getColumnNames() {
+    return _schema.getColumnNames();
+  }
+
+  @Override
+  public Set<String> getPhysicalColumnNames() {
+    return _schema.getPhysicalColumnNames();
+  }
+
+  @Override
+  public DataSource getDataSource(String columnName) {
+    return _indexContainerMap.get(columnName).toDataSource(_numDocsIndexed);
+  }
+
+  @Override
+  public List<StarTreeV2> getStarTrees() {
+    return null;
+  }
+
+  @Nullable
+  @Override
+  public ValidDocIndexReader getValidDocIndex() {
+    return null;
+  }
+
+  @Override
+  public GenericRow getRecord(int docId, GenericRow reuse) {
+    for (Map.Entry<String, IntermediateIndexContainer> entry : _indexContainerMap.entrySet()) {
+      String column = entry.getKey();
+      IntermediateIndexContainer indexContainer = entry.getValue();
+      Object value = getValue(docId, indexContainer.getForwardIndex(), indexContainer.getDictionary(),
+          indexContainer.getNumValuesInfo().getMaxNumValuesPerMVEntry());
+      reuse.putValue(column, value);
+    }
+    return reuse;
+  }
+
+  @Override
+  public void destroy() {
+    String segmentName = getSegmentName();
+    LOGGER.info("Trying to destroy segment : {}", segmentName);
+    for (Map.Entry<String, IntermediateIndexContainer> entry : _indexContainerMap.entrySet()) {
+      try {
+        entry.getValue().close();
+      } catch (IOException e) {
+        LOGGER.error("Failed to close indexes for column: {}. Continuing with error.", entry.getKey(), e);
+      }
+    }
+  }
+
+  private void updateDictionary(GenericRow row) {
+    for (Map.Entry<String, IntermediateIndexContainer> entry : _indexContainerMap.entrySet()) {
+      String column = entry.getKey();
+      IntermediateIndexContainer indexContainer = entry.getValue();
+      Object value = row.getValue(column);
+      MutableDictionary dictionary = indexContainer.getDictionary();
+      if (dictionary != null) {
+        if (indexContainer.getFieldSpec().isSingleValueField()) {
+          indexContainer.setDictId(dictionary.index(value));
+        } else {
+          indexContainer.setDictIds(dictionary.index((Object[]) value));
+        }
+
+        // Update min/max value from dictionary
+        indexContainer.setMinValue(dictionary.getMinVal());
+        indexContainer.setMaxValue(dictionary.getMaxVal());
+      }
+    }
+  }
+
+  private void addNewRow(GenericRow row)
+      throws IOException {
+    int docId = _numDocsIndexed;
+    for (Map.Entry<String, IntermediateIndexContainer> entry : _indexContainerMap.entrySet()) {
+      String column = entry.getKey();
+      IntermediateIndexContainer indexContainer = entry.getValue();
+      Object value = row.getValue(column);
+      FieldSpec fieldSpec = indexContainer.getFieldSpec();
+      if (fieldSpec.isSingleValueField()) {
+        // Update numValues info
+        indexContainer.getNumValuesInfo().updateSVEntry();
+
+        // Update indexes
+        MutableForwardIndex forwardIndex = indexContainer.getForwardIndex();
+        int dictId = indexContainer.getDictId();
+        if (dictId >= 0) {
+          // Dictionary-encoded single-value column
+
+          // Update forward index
+          forwardIndex.setDictId(docId, dictId);
+        } else {
+          // Single-value column with raw index
+
+          // Update forward index
+          FieldSpec.DataType dataType = fieldSpec.getDataType();
+          switch (dataType) {
+            case INT:
+              forwardIndex.setInt(docId, (Integer) value);
+              break;
+            case LONG:
+              forwardIndex.setLong(docId, (Long) value);
+              break;
+            case FLOAT:
+              forwardIndex.setFloat(docId, (Float) value);
+              break;
+            case DOUBLE:
+              forwardIndex.setDouble(docId, (Double) value);
+              break;
+            case STRING:
+              forwardIndex.setString(docId, (String) value);
+              break;
+            case BYTES:
+              forwardIndex.setBytes(docId, (byte[]) value);
+              break;
+            default:
+              throw new UnsupportedOperationException(
+                  "Unsupported data type: " + dataType + " for no-dictionary column: " + column);
+          }
+
+          // Update min/max value from raw value
+          // NOTE: Skip updating min/max value for aggregated metrics because the value will change over time.
+          if (fieldSpec.getFieldType() != FieldSpec.FieldType.METRIC) {
+            Comparable comparable;
+            if (dataType == FieldSpec.DataType.BYTES) {
+              comparable = new ByteArray((byte[]) value);
+            } else {
+              comparable = (Comparable) value;
+            }
+            if (indexContainer.getMinValue() == null) {
+              indexContainer.setMinValue(comparable);
+              indexContainer.setMaxValue(comparable);
+            } else {
+              if (comparable.compareTo(indexContainer.getMinValue()) < 0) {
+                indexContainer.setMinValue(comparable);
+              }
+              if (comparable.compareTo(indexContainer.getMaxValue()) > 0) {
+                indexContainer.setMaxValue(comparable);
+              }
+            }
+          }
+        }
+      } else {
+        // Multi-value column (always dictionary-encoded)
+        int[] dictIds = indexContainer.getDictIds();
+
+        // Update numValues info
+        indexContainer.getNumValuesInfo().updateMVEntry(dictIds.length);
+
+        // Update forward index
+        indexContainer.getForwardIndex().setDictIdMV(docId, dictIds);
+      }
+    }
+  }
+
+  private String buildAllocationContext(String segmentName, String columnName, String indexType) {
+    return segmentName + ":" + columnName + indexType;
+  }
+
+  /**
+   * Helper method to read the value for the given document id.
+   */
+  private static Object getValue(int docId, MutableForwardIndex forwardIndex, MutableDictionary dictionary,
+      int maxNumMultiValues) {
+    // Dictionary based
+    if (forwardIndex.isSingleValue()) {
+      int dictId = forwardIndex.getDictId(docId);
+      return dictionary.get(dictId);
+    } else {
+      int[] dictIds = new int[maxNumMultiValues];
+      int numValues = forwardIndex.getDictIdMV(docId, dictIds);
+      Object[] value = new Object[numValues];
+      for (int i = 0; i < numValues; i++) {
+        value[i] = dictionary.get(dictIds[i]);
+      }
+      return value;
+    }
+  }
+}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/io/readerwriter/RealtimeIndexOffHeapMemoryManager.java b/pinot-core/src/main/java/org/apache/pinot/core/io/readerwriter/RealtimeIndexOffHeapMemoryManager.java
index 2687ad8..e3612f1 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/io/readerwriter/RealtimeIndexOffHeapMemoryManager.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/io/readerwriter/RealtimeIndexOffHeapMemoryManager.java
@@ -79,7 +79,9 @@ public abstract class RealtimeIndexOffHeapMemoryManager implements PinotDataBuff
     PinotDataBuffer buffer = allocateInternal(size, allocationContext);
     _totalAllocatedBytes += size;
     _buffers.add(buffer);
-    _serverMetrics.addValueToTableGauge(_tableName, ServerGauge.REALTIME_OFFHEAP_MEMORY_USED, size);
+    if (_serverMetrics != null) {
+      _serverMetrics.addValueToTableGauge(_tableName, ServerGauge.REALTIME_OFFHEAP_MEMORY_USED, size);
+    }
     return buffer;
   }
 
@@ -104,7 +106,9 @@ public abstract class RealtimeIndexOffHeapMemoryManager implements PinotDataBuff
     for (PinotDataBuffer buffer : _buffers) {
       buffer.close();
     }
-    _serverMetrics.addValueToTableGauge(_tableName, ServerGauge.REALTIME_OFFHEAP_MEMORY_USED, -_totalAllocatedBytes);
+    if (_serverMetrics != null) {
+      _serverMetrics.addValueToTableGauge(_tableName, ServerGauge.REALTIME_OFFHEAP_MEMORY_USED, -_totalAllocatedBytes);
+    }
     doClose();
     _buffers.clear();
     _totalAllocatedBytes = 0;
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/realtime/converter/stats/RealtimeColumnStatistics.java b/pinot-core/src/main/java/org/apache/pinot/core/realtime/converter/stats/MutableColumnStatistics.java
similarity index 97%
rename from pinot-core/src/main/java/org/apache/pinot/core/realtime/converter/stats/RealtimeColumnStatistics.java
rename to pinot-core/src/main/java/org/apache/pinot/core/realtime/converter/stats/MutableColumnStatistics.java
index 8c4c93a..8b76bda 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/realtime/converter/stats/RealtimeColumnStatistics.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/realtime/converter/stats/MutableColumnStatistics.java
@@ -33,7 +33,7 @@ import org.apache.pinot.spi.data.FieldSpec;
  *
  * TODO: Gather more info on the fly to avoid scanning the segment
  */
-public class RealtimeColumnStatistics implements ColumnStatistics {
+public class MutableColumnStatistics implements ColumnStatistics {
   private final DataSource _dataSource;
   private final int[] _sortedDocIdIterationOrder;
 
@@ -41,7 +41,7 @@ public class RealtimeColumnStatistics implements ColumnStatistics {
   //       dictionary.
   private final Dictionary _dictionary;
 
-  public RealtimeColumnStatistics(DataSource dataSource, int[] sortedDocIdIterationOrder) {
+  public MutableColumnStatistics(DataSource dataSource, int[] sortedDocIdIterationOrder) {
     _dataSource = dataSource;
     _sortedDocIdIterationOrder = sortedDocIdIterationOrder;
     _dictionary = dataSource.getDictionary();
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/realtime/converter/stats/RealtimeNoDictionaryColStatistics.java b/pinot-core/src/main/java/org/apache/pinot/core/realtime/converter/stats/MutableNoDictionaryColStatistics.java
similarity index 95%
rename from pinot-core/src/main/java/org/apache/pinot/core/realtime/converter/stats/RealtimeNoDictionaryColStatistics.java
rename to pinot-core/src/main/java/org/apache/pinot/core/realtime/converter/stats/MutableNoDictionaryColStatistics.java
index d639b17..109884d 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/realtime/converter/stats/RealtimeNoDictionaryColStatistics.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/realtime/converter/stats/MutableNoDictionaryColStatistics.java
@@ -28,11 +28,11 @@ import org.apache.pinot.core.segment.creator.ColumnStatistics;
 import static org.apache.pinot.core.common.Constants.UNKNOWN_CARDINALITY;
 
 
-public class RealtimeNoDictionaryColStatistics implements ColumnStatistics {
+public class MutableNoDictionaryColStatistics implements ColumnStatistics {
   private final DataSourceMetadata _dataSourceMetadata;
   private final MutableForwardIndex _forwardIndex;
 
-  public RealtimeNoDictionaryColStatistics(DataSource dataSource) {
+  public MutableNoDictionaryColStatistics(DataSource dataSource) {
     _dataSourceMetadata = dataSource.getDataSourceMetadata();
     _forwardIndex = (MutableForwardIndex) dataSource.getForwardIndex();
   }
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/realtime/converter/stats/RealtimeSegmentStatsContainer.java b/pinot-core/src/main/java/org/apache/pinot/core/realtime/converter/stats/RealtimeSegmentStatsContainer.java
index bedf50c..d9b382d 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/realtime/converter/stats/RealtimeSegmentStatsContainer.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/realtime/converter/stats/RealtimeSegmentStatsContainer.java
@@ -42,10 +42,10 @@ public class RealtimeSegmentStatsContainer implements SegmentPreIndexStatsContai
     for (String columnName : realtimeSegment.getPhysicalColumnNames()) {
       DataSource dataSource = realtimeSegment.getDataSource(columnName);
       if (dataSource.getDictionary() != null) {
-        _columnStatisticsMap.put(columnName, new RealtimeColumnStatistics(realtimeSegment.getDataSource(columnName),
+        _columnStatisticsMap.put(columnName, new MutableColumnStatistics(realtimeSegment.getDataSource(columnName),
             realtimeSegmentRecordReader.getSortedDocIdIterationOrder()));
       } else {
-        _columnStatisticsMap.put(columnName, new RealtimeNoDictionaryColStatistics(dataSource));
+        _columnStatisticsMap.put(columnName, new MutableNoDictionaryColStatistics(dataSource));
       }
     }
   }
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/IntermediateSegmentSegmentCreationDataSource.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/IntermediateSegmentSegmentCreationDataSource.java
new file mode 100644
index 0000000..7a27ed5
--- /dev/null
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/IntermediateSegmentSegmentCreationDataSource.java
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.segment.creator;
+
+import org.apache.pinot.common.Utils;
+import org.apache.pinot.core.data.readers.IntermediateSegmentRecordReader;
+import org.apache.pinot.core.indexsegment.mutable.IntermediateSegment;
+import org.apache.pinot.spi.data.readers.RecordReader;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class IntermediateSegmentSegmentCreationDataSource implements SegmentCreationDataSource {
+  private static final Logger LOGGER = LoggerFactory.getLogger(IntermediateSegmentSegmentCreationDataSource.class);
+  private final IntermediateSegment _intermediateSegment;
+  private final IntermediateSegmentRecordReader _intermediateSegmentRecordReader;
+
+
+  public IntermediateSegmentSegmentCreationDataSource(IntermediateSegmentRecordReader intermediateSegmentRecordReader) {
+    _intermediateSegmentRecordReader = intermediateSegmentRecordReader;
+    _intermediateSegment = _intermediateSegmentRecordReader.getIntermediateSegment();
+  }
+
+  @Override
+  public SegmentPreIndexStatsContainer gatherStats(StatsCollectorConfig statsCollectorConfig) {
+
+    return new IntermediateSegmentStatsContainer(_intermediateSegment);
+  }
+
+  @Override
+  public RecordReader getRecordReader() {
+    try {
+      _intermediateSegmentRecordReader.rewind();
+    } catch (Exception e) {
+      LOGGER.error("Caught exception while rewinding record reader", e);
+      Utils.rethrowException(e);
+    }
+    return _intermediateSegmentRecordReader;
+  }
+}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/IntermediateSegmentStatsContainer.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/IntermediateSegmentStatsContainer.java
new file mode 100644
index 0000000..3e29d52
--- /dev/null
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/IntermediateSegmentStatsContainer.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.segment.creator;
+
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.pinot.core.common.DataSource;
+import org.apache.pinot.core.indexsegment.mutable.IntermediateSegment;
+import org.apache.pinot.core.realtime.converter.stats.MutableColumnStatistics;
+
+
+public class IntermediateSegmentStatsContainer implements SegmentPreIndexStatsContainer {
+  private final IntermediateSegment _intermediateSegment;
+  private final Map<String, ColumnStatistics> _columnStatisticsMap = new HashMap<>();
+
+  public IntermediateSegmentStatsContainer(IntermediateSegment intermediateSegment) {
+    _intermediateSegment = intermediateSegment;
+
+    // Create all column statistics
+    for (String columnName : intermediateSegment.getPhysicalColumnNames()) {
+      DataSource dataSource = intermediateSegment.getDataSource(columnName);
+      // Always use dictionary for intermediate segment stats
+      _columnStatisticsMap.put(columnName, new MutableColumnStatistics(dataSource, null));
+    }
+  }
+
+  @Override
+  public ColumnStatistics getColumnProfileFor(String column) {
+    return _columnStatisticsMap.get(column);
+  }
+
+  @Override
+  public int getTotalDocCount() {
+    return _intermediateSegment.getNumDocsIndexed();
+  }
+}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/SegmentColumnarIndexCreator.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/SegmentColumnarIndexCreator.java
index d2b4e52..3c79af4 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/SegmentColumnarIndexCreator.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/SegmentColumnarIndexCreator.java
@@ -602,7 +602,7 @@ public class SegmentColumnarIndexCreator implements SegmentCreator {
     properties.setProperty(getKeyFor(column, HAS_FST_INDEX), String.valueOf(hasFSTIndex));
     properties.setProperty(getKeyFor(column, HAS_JSON_INDEX), String.valueOf(hasJsonIndex));
     properties.setProperty(getKeyFor(column, IS_SINGLE_VALUED), String.valueOf(fieldSpec.isSingleValueField()));
-    properties.setProperty(getKeyFor(column, MAX_MULTI_VALUE_ELEMTS),
+    properties.setProperty(getKeyFor(column, MAX_MULTI_VALUE_ELEMENTS),
         String.valueOf(columnIndexCreationInfo.getMaxNumberOfMultiValueElements()));
     properties.setProperty(getKeyFor(column, TOTAL_NUMBER_OF_ENTRIES),
         String.valueOf(columnIndexCreationInfo.getTotalNumberOfEntries()));
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/SegmentIndexCreationDriverImpl.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/SegmentIndexCreationDriverImpl.java
index 3078dc7..1efabfc 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/SegmentIndexCreationDriverImpl.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/SegmentIndexCreationDriverImpl.java
@@ -32,6 +32,7 @@ import java.util.Set;
 import java.util.UUID;
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.io.FileUtils;
+import org.apache.pinot.core.data.readers.IntermediateSegmentRecordReader;
 import org.apache.pinot.core.data.readers.PinotSegmentRecordReader;
 import org.apache.pinot.core.data.recordtransformer.CompositeTransformer;
 import org.apache.pinot.core.data.recordtransformer.RecordTransformer;
@@ -39,6 +40,7 @@ import org.apache.pinot.core.indexsegment.generator.SegmentGeneratorConfig;
 import org.apache.pinot.core.indexsegment.generator.SegmentVersion;
 import org.apache.pinot.core.segment.creator.ColumnIndexCreationInfo;
 import org.apache.pinot.core.segment.creator.ColumnStatistics;
+import org.apache.pinot.core.segment.creator.IntermediateSegmentSegmentCreationDataSource;
 import org.apache.pinot.core.segment.creator.RecordReaderSegmentCreationDataSource;
 import org.apache.pinot.core.segment.creator.SegmentCreationDataSource;
 import org.apache.pinot.core.segment.creator.SegmentCreator;
@@ -136,8 +138,13 @@ public class SegmentIndexCreationDriverImpl implements SegmentIndexCreationDrive
 
   public void init(SegmentGeneratorConfig config, RecordReader recordReader)
       throws Exception {
-    init(config, new RecordReaderSegmentCreationDataSource(recordReader),
-        CompositeTransformer.getDefaultTransformer(config.getTableConfig(), config.getSchema()));
+    SegmentCreationDataSource dataSource;
+    if (config.isIntermediateSegmentRecordReader()) {
+      dataSource = new IntermediateSegmentSegmentCreationDataSource((IntermediateSegmentRecordReader) recordReader);
+    } else {
+      dataSource = new RecordReaderSegmentCreationDataSource(recordReader);
+    }
+    init(config, dataSource, CompositeTransformer.getDefaultTransformer(config.getTableConfig(), config.getSchema()));
   }
 
   public void init(SegmentGeneratorConfig config, SegmentCreationDataSource dataSource,
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/V1Constants.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/V1Constants.java
index b22ceac..82d4ae1 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/V1Constants.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/V1Constants.java
@@ -82,7 +82,7 @@ public class V1Constants {
       public static final String HAS_FST_INDEX = "hasFSTIndex";
       public static final String HAS_JSON_INDEX = "hasJsonIndex";
       public static final String IS_SINGLE_VALUED = "isSingleValues";
-      public static final String MAX_MULTI_VALUE_ELEMTS = "maxNumberOfMultiValues";
+      public static final String MAX_MULTI_VALUE_ELEMENTS = "maxNumberOfMultiValues";
       public static final String TOTAL_NUMBER_OF_ENTRIES = "totalNumberOfEntries";
       public static final String IS_AUTO_GENERATED = "isAutoGenerated";
       public static final String DEFAULT_NULL_VALUE = "defaultNullValue";
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/column/IntermediateIndexContainer.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/column/IntermediateIndexContainer.java
new file mode 100644
index 0000000..7c8263f
--- /dev/null
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/column/IntermediateIndexContainer.java
@@ -0,0 +1,134 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.segment.index.column;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.Set;
+import javax.annotation.Nullable;
+import org.apache.pinot.core.common.DataSource;
+import org.apache.pinot.core.data.partition.PartitionFunction;
+import org.apache.pinot.core.segment.index.datasource.MutableDataSource;
+import org.apache.pinot.core.segment.index.readers.MutableDictionary;
+import org.apache.pinot.core.segment.index.readers.MutableForwardIndex;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class IntermediateIndexContainer implements Closeable {
+  private static final Logger LOGGER = LoggerFactory.getLogger(IntermediateIndexContainer.class);
+
+  final FieldSpec _fieldSpec;
+  final PartitionFunction _partitionFunction;
+  final Set<Integer> _partitions;
+  final NumValuesInfo _numValuesInfo;
+  final MutableForwardIndex _forwardIndex;
+  final MutableDictionary _dictionary;
+
+  volatile Comparable _minValue;
+  volatile Comparable _maxValue;
+
+  // Hold the dictionary id for the latest record
+  int _dictId = Integer.MIN_VALUE;
+  int[] _dictIds;
+
+  public IntermediateIndexContainer(FieldSpec fieldSpec, @Nullable PartitionFunction partitionFunction,
+      @Nullable Set<Integer> partitions, NumValuesInfo numValuesInfo, MutableForwardIndex forwardIndex,
+      MutableDictionary dictionary) {
+    _fieldSpec = fieldSpec;
+    _partitionFunction = partitionFunction;
+    _partitions = partitions;
+    _numValuesInfo = numValuesInfo;
+    _forwardIndex = forwardIndex;
+    _dictionary = dictionary;
+  }
+
+  public DataSource toDataSource(int numDocsIndexed) {
+    return new MutableDataSource(_fieldSpec, numDocsIndexed, _numValuesInfo._numValues,
+        _numValuesInfo._maxNumValuesPerMVEntry, _partitionFunction, _partitions, _minValue, _maxValue, _forwardIndex,
+        _dictionary, null, null, null, false, null, null, null);
+  }
+
+  @Override
+  public void close()
+      throws IOException {
+    String column = _fieldSpec.getName();
+    try {
+      _forwardIndex.close();
+    } catch (Exception e) {
+      LOGGER.error("Caught exception while closing forward index for column: {}, continuing with error", column, e);
+    }
+    if (_dictionary != null) {
+      try {
+        _dictionary.close();
+      } catch (Exception e) {
+        LOGGER.error("Caught exception while closing dictionary for column: {}, continuing with error", column, e);
+      }
+    }
+  }
+
+  public FieldSpec getFieldSpec() {
+    return _fieldSpec;
+  }
+
+  public NumValuesInfo getNumValuesInfo() {
+    return _numValuesInfo;
+  }
+
+  public MutableForwardIndex getForwardIndex() {
+    return _forwardIndex;
+  }
+
+  public MutableDictionary getDictionary() {
+    return _dictionary;
+  }
+
+  public int getDictId() {
+    return _dictId;
+  }
+
+  public void setDictId(int dictId) {
+    _dictId = dictId;
+  }
+
+  public int[] getDictIds() {
+    return _dictIds;
+  }
+
+  public void setDictIds(int[] dictIds) {
+    _dictIds = dictIds;
+  }
+
+  public Comparable getMinValue() {
+    return _minValue;
+  }
+
+  public void setMinValue(Comparable minValue) {
+    _minValue = minValue;
+  }
+
+  public Comparable getMaxValue() {
+    return _maxValue;
+  }
+
+  public void setMaxValue(Comparable maxValue) {
+    _maxValue = maxValue;
+  }
+}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/column/NumValuesInfo.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/column/NumValuesInfo.java
new file mode 100644
index 0000000..6992c8f
--- /dev/null
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/column/NumValuesInfo.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.segment.index.column;
+
+public class NumValuesInfo {
+  volatile int _numValues = 0;
+  volatile int _maxNumValuesPerMVEntry = 0;
+
+  public void updateSVEntry() {
+    _numValues++;
+  }
+
+  public void updateMVEntry(int numValuesInMVEntry) {
+    _numValues += numValuesInMVEntry;
+    _maxNumValuesPerMVEntry = Math.max(_maxNumValuesPerMVEntry, numValuesInMVEntry);
+  }
+
+  public int getNumValues() {
+    return _numValues;
+  }
+
+  public int getMaxNumValuesPerMVEntry() {
+    return _maxNumValuesPerMVEntry;
+  }
+}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/loader/IndexLoadingConfig.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/loader/IndexLoadingConfig.java
index e307aaf..6056f6a 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/loader/IndexLoadingConfig.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/loader/IndexLoadingConfig.java
@@ -194,6 +194,9 @@ public class IndexLoadingConfig {
   }
 
   private void extractFromInstanceConfig(InstanceDataManagerConfig instanceDataManagerConfig) {
+    if (instanceDataManagerConfig == null) {
+      return;
+    }
     ReadMode instanceReadMode = instanceDataManagerConfig.getReadMode();
     if (instanceReadMode != null) {
       _readMode = instanceReadMode;
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/metadata/ColumnMetadata.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/metadata/ColumnMetadata.java
index 43990ab..b67b0fc 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/metadata/ColumnMetadata.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/metadata/ColumnMetadata.java
@@ -100,7 +100,7 @@ public class ColumnMetadata {
     builder.setHasInvertedIndex(config.getBoolean(getKeyFor(column, HAS_INVERTED_INDEX)));
     builder.setHasFSTIndex(config.getBoolean(getKeyFor(column, HAS_INVERTED_INDEX), false));
     builder.setSingleValue(config.getBoolean(getKeyFor(column, IS_SINGLE_VALUED)));
-    builder.setMaxNumberOfMultiValues(config.getInt(getKeyFor(column, MAX_MULTI_VALUE_ELEMTS)));
+    builder.setMaxNumberOfMultiValues(config.getInt(getKeyFor(column, MAX_MULTI_VALUE_ELEMENTS)));
     builder.setTotalNumberOfEntries(config.getInt(getKeyFor(column, TOTAL_NUMBER_OF_ENTRIES)));
     builder.setAutoGenerated(config.getBoolean(getKeyFor(column, IS_AUTO_GENERATED), false));
     builder.setDefaultNullValueString(config.getString(getKeyFor(column, DEFAULT_NULL_VALUE), null));
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/indexsegment/IntermediateSegmentTest.java b/pinot-core/src/test/java/org/apache/pinot/core/indexsegment/IntermediateSegmentTest.java
new file mode 100644
index 0000000..1220beb
--- /dev/null
+++ b/pinot-core/src/test/java/org/apache/pinot/core/indexsegment/IntermediateSegmentTest.java
@@ -0,0 +1,222 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.indexsegment;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URL;
+import java.util.Arrays;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import org.apache.commons.io.FileUtils;
+import org.apache.pinot.common.segment.ReadMode;
+import org.apache.pinot.core.common.DataSource;
+import org.apache.pinot.core.data.readers.IntermediateSegmentRecordReader;
+import org.apache.pinot.core.indexsegment.generator.SegmentGeneratorConfig;
+import org.apache.pinot.core.indexsegment.immutable.ImmutableSegmentLoader;
+import org.apache.pinot.core.indexsegment.mutable.IntermediateSegment;
+import org.apache.pinot.core.segment.creator.SegmentIndexCreationDriver;
+import org.apache.pinot.core.segment.creator.impl.SegmentIndexCreationDriverImpl;
+import org.apache.pinot.core.segment.index.readers.Dictionary;
+import org.apache.pinot.core.segment.index.readers.InvertedIndexReader;
+import org.apache.pinot.plugin.inputformat.avro.AvroRecordReader;
+import org.apache.pinot.segments.v1.creator.SegmentTestUtils;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.data.TimeGranularitySpec;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+
+
+public class IntermediateSegmentTest {
+  private static final String AVRO_DATA_SV = "data/test_data-sv.avro"; // "data/test_data-sv.avro";
+  private static final String AVRO_DATA_MV = "data/test_data-mv.avro"; // "data/test_data-sv.avro";
+  private static final String SEGMENT_NAME = "testSegmentName";
+  private static final File INDEX_DIR = new File(FileUtils.getTempDirectory(), "IntermediateSegmentTest");
+
+  @BeforeMethod
+  public void setUp()
+      throws Exception {
+    FileUtils.deleteQuietly(INDEX_DIR);
+  }
+
+  @AfterMethod
+  public void tearDown() {
+    FileUtils.deleteQuietly(INDEX_DIR);
+  }
+
+  @DataProvider(name = "segmentCreationTestCases")
+  private static Object[][] createSegmentCreationTestCases()
+      throws IOException {
+    return new Object[][]{{AVRO_DATA_SV, createSchema(AVRO_DATA_SV), createTableConfig(
+        AVRO_DATA_SV)}, {AVRO_DATA_MV, createSchema(AVRO_DATA_MV), createTableConfig(AVRO_DATA_MV)}};
+  }
+
+  @Test(dataProvider = "segmentCreationTestCases")
+  public void testOfflineSegmentCreationFromDifferentWays(String inputFile, Schema schema, TableConfig tableConfig)
+      throws Exception {
+    // Get resource file path.
+    URL resource = getClass().getClassLoader().getResource(inputFile);
+    assertNotNull(resource);
+    String filePath = resource.getFile();
+
+    // Create the segment generator config.
+    SegmentGeneratorConfig segmentGeneratorConfig = new SegmentGeneratorConfig(tableConfig, schema);
+    segmentGeneratorConfig.setInputFilePath(filePath);
+    segmentGeneratorConfig.setTableName("testTable");
+    segmentGeneratorConfig.setSegmentName(SEGMENT_NAME);
+    segmentGeneratorConfig.setOutDir(INDEX_DIR.getAbsolutePath());
+    segmentGeneratorConfig.setSkipTimeValueCheck(true);
+    segmentGeneratorConfig.setInvertedIndexCreationColumns(Arrays.asList("column6", "column7"));
+
+    IndexSegment segmentFromIntermediateSegment = buildSegmentFromIntermediateSegment(segmentGeneratorConfig);
+    IndexSegment segmentFromAvroRecordReader = buildSegmentFromAvroRecordReader(segmentGeneratorConfig);
+
+    assertNotNull(segmentFromIntermediateSegment);
+    assertNotNull(segmentFromAvroRecordReader);
+    assertEquals(segmentFromIntermediateSegment.getColumnNames(), segmentFromAvroRecordReader.getColumnNames());
+    Set<String> physicalColumnsFromIntermediateSegment = segmentFromIntermediateSegment.getPhysicalColumnNames();
+    Set<String> physicalColumnsFromAvroSegment = segmentFromAvroRecordReader.getPhysicalColumnNames();
+    assertEquals(physicalColumnsFromIntermediateSegment, physicalColumnsFromAvroSegment);
+
+    // Comparison for every columns
+    for (String column : physicalColumnsFromIntermediateSegment) {
+      DataSource dataSourceFromIntermediateSegment = segmentFromIntermediateSegment.getDataSource(column);
+      DataSource dataSourceFromAvroRecordReader = segmentFromAvroRecordReader.getDataSource(column);
+
+      // Comparison for dictionaries.
+      Dictionary actualDictionary = dataSourceFromIntermediateSegment.getDictionary();
+      Dictionary expectedDictionary = dataSourceFromAvroRecordReader.getDictionary();
+      assertEquals(actualDictionary.getMinVal(), expectedDictionary.getMinVal());
+      assertEquals(actualDictionary.getMaxVal(), expectedDictionary.getMaxVal());
+      assertEquals(actualDictionary.getValueType(), expectedDictionary.getValueType());
+      assertEquals(actualDictionary.length(), expectedDictionary.length());
+      int dictionaryLength = actualDictionary.length();
+      for (int i = 0; i < dictionaryLength; i++) {
+        assertEquals(actualDictionary.get(i), expectedDictionary.get(i));
+      }
+
+      // Comparison for inverted index
+      InvertedIndexReader actualInvertedIndexReader = dataSourceFromIntermediateSegment.getInvertedIndex();
+      InvertedIndexReader expectedInvertedIndexReader = dataSourceFromAvroRecordReader.getInvertedIndex();
+      if (actualInvertedIndexReader != null) {
+        for (int j = 0; j < dictionaryLength; j++) {
+          assertEquals(actualInvertedIndexReader.getDocIds(j), expectedInvertedIndexReader.getDocIds(j));
+        }
+      }
+    }
+  }
+
+  private IndexSegment buildSegmentFromIntermediateSegment(SegmentGeneratorConfig segmentGeneratorConfig)
+      throws Exception {
+    // Set intermediate segment record reader.
+    segmentGeneratorConfig.setIntermediateSegmentRecordReader(true);
+    String segmentName = SEGMENT_NAME + "_from_intermediate_segment";
+    segmentGeneratorConfig.setSegmentName(segmentName);
+
+    IntermediateSegment intermediateSegment = new IntermediateSegment(segmentGeneratorConfig);
+
+    // Ingest data.
+    ingestDataToIntermediateSegment(segmentGeneratorConfig, intermediateSegment);
+    IntermediateSegmentRecordReader intermediateSegmentRecordReader =
+        new IntermediateSegmentRecordReader(intermediateSegment);
+
+    // Build the segment from intermediate segment.
+    SegmentIndexCreationDriverImpl driver = new SegmentIndexCreationDriverImpl();
+    driver.init(segmentGeneratorConfig, intermediateSegmentRecordReader);
+    driver.build();
+
+    // Destroy intermediate segment after the segment creation.
+    intermediateSegment.destroy();
+
+    return ImmutableSegmentLoader.load(new File(INDEX_DIR, segmentName), ReadMode.heap);
+  }
+
+  private IndexSegment buildSegmentFromAvroRecordReader(SegmentGeneratorConfig segmentGeneratorConfig)
+      throws Exception {
+    // Reset default value to use avro record reader
+    segmentGeneratorConfig.setIntermediateSegmentRecordReader(false);
+    String segmentName = SEGMENT_NAME + "_from_avro_reader";
+    segmentGeneratorConfig.setSegmentName(segmentName);
+
+    // Build the index segment.
+    SegmentIndexCreationDriver driver = new SegmentIndexCreationDriverImpl();
+    driver.init(segmentGeneratorConfig);
+    driver.build();
+
+    return ImmutableSegmentLoader.load(new File(INDEX_DIR, segmentName), ReadMode.heap);
+  }
+
+  private void ingestDataToIntermediateSegment(SegmentGeneratorConfig segmentGeneratorConfig,
+      IntermediateSegment intermediateSegment)
+      throws IOException {
+    AvroRecordReader avroRecordReader = new AvroRecordReader();
+    avroRecordReader.init(new File(segmentGeneratorConfig.getInputFilePath()), null, null);
+
+    GenericRow genericRow = new GenericRow();
+    while (avroRecordReader.hasNext()) {
+      genericRow.clear();
+      genericRow = avroRecordReader.next(genericRow);
+      intermediateSegment.index(genericRow, null);
+    }
+  }
+
+  private static Schema createSchema(String inputFile)
+      throws IOException {
+    Schema schema;
+    if (AVRO_DATA_SV.equals(inputFile)) {
+      schema = new Schema.SchemaBuilder().setSchemaName("testTable").addMetric("column1", FieldSpec.DataType.INT)
+          .addMetric("column3", FieldSpec.DataType.INT).addSingleValueDimension("column5", FieldSpec.DataType.STRING)
+          .addSingleValueDimension("column6", FieldSpec.DataType.INT)
+          .addSingleValueDimension("column7", FieldSpec.DataType.INT)
+          .addSingleValueDimension("column9", FieldSpec.DataType.INT)
+          .addSingleValueDimension("column11", FieldSpec.DataType.STRING)
+          .addSingleValueDimension("column12", FieldSpec.DataType.STRING).addMetric("column17", FieldSpec.DataType.INT)
+          .addMetric("column18", FieldSpec.DataType.INT)
+          .addTime(new TimeGranularitySpec(FieldSpec.DataType.INT, TimeUnit.DAYS, "daysSinceEpoch"), null).build();
+    } else {
+      URL resource = IntermediateSegmentTest.class.getClassLoader().getResource(inputFile);
+      assertNotNull(resource);
+      String filePath = resource.getFile();
+      schema = SegmentTestUtils.extractSchemaFromAvroWithoutTime(new File(filePath));
+    }
+    return schema;
+  }
+
+  private static TableConfig createTableConfig(String inputFile) {
+    TableConfig tableConfig;
+    if (AVRO_DATA_SV.equals(inputFile)) {
+      tableConfig =
+          new TableConfigBuilder(TableType.OFFLINE).setTableName("testTable").setTimeColumnName("daysSinceEpoch")
+              .setInvertedIndexColumns(Arrays.asList("column6", "column7", "column11", "column17", "column18")).build();
+    } else {
+      tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName("testTable").build();
+    }
+    return tableConfig;
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org