You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2020/09/19 18:56:54 UTC

[incubator-pinot] branch master updated: Handle the partitioning mismatch between table config and stream (#6031)

This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 919f407  Handle the partitioning mismatch between table config and stream (#6031)
919f407 is described below

commit 919f40764b1057e80be1e96daed8af929601360f
Author: Xiaotian (Jackie) Jiang <17...@users.noreply.github.com>
AuthorDate: Sat Sep 19 11:56:44 2020 -0700

    Handle the partitioning mismatch between table config and stream (#6031)
    
    In consuming segment, update the partition info when ingesting new records. Log a warning and emit a metric **REALTIME_PARTITION_MISMATCH** when the partition is not aligned with the stream partition. The updated partition info will be persisted in the segment metadata, and when the segment is committed, also update the partition info stored in the segment ZK metadata.
    
    Added `SegmentPartitionLLCRealtimeClusterIntegrationTest` to test the expected behavior.
    
    NOTE: With the fix, the consuming segment can still be pruned out incorrectly if the partition info in the table config does not align with the stream. To fix that, we can only persist the partition info for the completed segments, but not the consuming segments. Need some perf test to verify the performance penalty.
---
 .../apache/pinot/common/metrics/ServerMeter.java   |   1 +
 .../realtime/PinotLLCRealtimeSegmentManager.java   |  31 +++-
 .../realtime/HLRealtimeSegmentDataManager.java     |   8 +-
 .../realtime/LLRealtimeSegmentDataManager.java     |  34 ++--
 .../indexsegment/mutable/MutableSegmentImpl.java   |  50 ++++--
 .../core/realtime/impl/RealtimeSegmentConfig.java  |  22 ++-
 .../index/datasource/MutableDataSource.java        |  10 +-
 .../mutable/MutableSegmentImplTestUtils.java       |  25 ++-
 ...PartitionLLCRealtimeClusterIntegrationTest.java | 185 +++++++++++++++++++++
 .../realtime/provisioning/MemoryEstimator.java     |  50 +++---
 10 files changed, 340 insertions(+), 76 deletions(-)

diff --git a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java
index fbdc73e..c680661 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java
@@ -39,6 +39,7 @@ public enum ServerMeter implements AbstractMetrics.Meter {
   REALTIME_CONSUMPTION_EXCEPTIONS("exceptions", true),
   REALTIME_OFFSET_COMMITS("commits", true),
   REALTIME_OFFSET_COMMIT_EXCEPTIONS("exceptions", false),
+  REALTIME_PARTITION_MISMATCH("mismatch", false),
   ROWS_WITH_ERRORS("rows", false),
   LLC_CONTROLLER_RESPONSE_NOT_SENT("messages", true),
   LLC_CONTROLLER_RESPONSE_COMMIT("messages", true),
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
index a06e0fb..43ea74c 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
@@ -67,6 +67,7 @@ import org.apache.pinot.controller.helix.core.realtime.segment.CommittingSegment
 import org.apache.pinot.controller.helix.core.realtime.segment.FlushThresholdUpdateManager;
 import org.apache.pinot.controller.helix.core.realtime.segment.FlushThresholdUpdater;
 import org.apache.pinot.controller.util.SegmentCompletionUtils;
+import org.apache.pinot.core.segment.index.metadata.ColumnMetadata;
 import org.apache.pinot.core.segment.index.metadata.SegmentMetadataImpl;
 import org.apache.pinot.spi.config.table.ColumnPartitionConfig;
 import org.apache.pinot.spi.config.table.SegmentPartitionConfig;
@@ -512,6 +513,11 @@ public class PinotLLCRealtimeSegmentManager {
     committingSegmentZKMetadata.setIndexVersion(segmentMetadata.getVersion());
     committingSegmentZKMetadata.setTotalDocs(segmentMetadata.getTotalDocs());
 
+    // Update the partition metadata based on the segment metadata
+    // NOTE: When the stream partition changes, or the records are not properly partitioned from the stream, the
+    //       partition of the segment (based on the actual consumed records) can be different from the stream partition.
+    committingSegmentZKMetadata.setPartitionMetadata(getPartitionMetadataFromSegmentMetadata(segmentMetadata));
+
     persistSegmentZKMetadata(realtimeTableName, committingSegmentZKMetadata, stat.getVersion());
     return committingSegmentZKMetadata;
   }
@@ -578,6 +584,25 @@ public class PinotLLCRealtimeSegmentManager {
     return new SegmentPartitionMetadata(partitionMetadataMap);
   }
 
+  @Nullable
+  private SegmentPartitionMetadata getPartitionMetadataFromSegmentMetadata(SegmentMetadataImpl segmentMetadata) {
+    Map<String, ColumnPartitionMetadata> partitionMetadataMap = new HashMap<>();
+    for (Map.Entry<String, ColumnMetadata> entry : segmentMetadata.getColumnMetadataMap().entrySet()) {
+      String columnName = entry.getKey();
+      ColumnMetadata columnMetadata = entry.getValue();
+      if (columnMetadata.getPartitionFunction() != null) {
+        partitionMetadataMap.put(columnName,
+            new ColumnPartitionMetadata(columnMetadata.getPartitionFunction().toString(),
+                columnMetadata.getNumPartitions(), columnMetadata.getPartitions()));
+      }
+    }
+    if (!partitionMetadataMap.isEmpty()) {
+      return new SegmentPartitionMetadata(partitionMetadataMap);
+    } else {
+      return null;
+    }
+  }
+
   public long getCommitTimeoutMS(String realtimeTableName) {
     long commitTimeoutMS = SegmentCompletionProtocol.getMaxSegmentCommitTimeMs();
     if (_propertyStore == null) {
@@ -603,7 +628,8 @@ public class PinotLLCRealtimeSegmentManager {
   }
 
   @VisibleForTesting
-  StreamPartitionMsgOffset getPartitionOffset(StreamConfig streamConfig, OffsetCriteria offsetCriteria, int partitionId) {
+  StreamPartitionMsgOffset getPartitionOffset(StreamConfig streamConfig, OffsetCriteria offsetCriteria,
+      int partitionId) {
     PartitionOffsetFetcher partitionOffsetFetcher =
         new PartitionOffsetFetcher(offsetCriteria, partitionId, streamConfig);
     try {
@@ -1011,7 +1037,8 @@ public class PinotLLCRealtimeSegmentManager {
     LLCSegmentName newLLCSegmentName =
         new LLCSegmentName(rawTableName, partitionId, STARTING_SEQUENCE_NUMBER, creationTimeMs);
     String newSegmentName = newLLCSegmentName.getSegmentName();
-    StreamPartitionMsgOffset startOffset = getPartitionOffset(streamConfig, streamConfig.getOffsetCriteria(), partitionId);
+    StreamPartitionMsgOffset startOffset =
+        getPartitionOffset(streamConfig, streamConfig.getOffsetCriteria(), partitionId);
     CommittingSegmentDescriptor committingSegmentDescriptor =
         new CommittingSegmentDescriptor(null, startOffset.toString(), 0);
     createNewSegmentZKMetadata(tableConfig, streamConfig, newLLCSegmentName, creationTimeMs,
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/HLRealtimeSegmentDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/HLRealtimeSegmentDataManager.java
index 74aea94..311d2a7 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/HLRealtimeSegmentDataManager.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/HLRealtimeSegmentDataManager.java
@@ -185,9 +185,9 @@ public class HLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
     _segmentLogger.info("Started {} stream provider", _streamConfig.getType());
     final int capacity = _streamConfig.getFlushThresholdRows();
     RealtimeSegmentConfig realtimeSegmentConfig =
-        new RealtimeSegmentConfig.Builder().setSegmentName(_segmentName).setStreamName(_streamConfig.getTopicName())
-            .setSchema(schema).setTimeColumnName(_timeColumnName).setCapacity(capacity)
-            .setAvgNumMultiValues(indexLoadingConfig.getRealtimeAvgMultiValueCount())
+        new RealtimeSegmentConfig.Builder().setTableNameWithType(_tableNameWithType).setSegmentName(_segmentName)
+            .setStreamName(_streamConfig.getTopicName()).setSchema(schema).setTimeColumnName(_timeColumnName)
+            .setCapacity(capacity).setAvgNumMultiValues(indexLoadingConfig.getRealtimeAvgMultiValueCount())
             .setNoDictionaryColumns(indexLoadingConfig.getNoDictionaryColumns())
             .setVarLengthDictionaryColumns(indexLoadingConfig.getVarLengthDictionaryColumns())
             .setInvertedIndexColumns(invertedIndexColumns).setRealtimeSegmentZKMetadata(realtimeSegmentZKMetadata)
@@ -197,7 +197,7 @@ public class HLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
                 indexLoadingConfig.isDirectRealtimeOffHeapAllocation(), serverMetrics))
             .setStatsHistory(realtimeTableDataManager.getStatsHistory())
             .setNullHandlingEnabled(indexingConfig.isNullHandlingEnabled()).build();
-    _realtimeSegment = new MutableSegmentImpl(realtimeSegmentConfig);
+    _realtimeSegment = new MutableSegmentImpl(realtimeSegmentConfig, serverMetrics);
 
     _notifier = realtimeTableDataManager;
 
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
index fc43ba2..1363381 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
@@ -364,7 +364,8 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
     final long idlePipeSleepTimeMillis = 100;
     final long maxIdleCountBeforeStatUpdate = (3 * 60 * 1000) / (idlePipeSleepTimeMillis + _partitionLevelStreamConfig
         .getFetchTimeoutMillis());  // 3 minute count
-    StreamPartitionMsgOffset lastUpdatedOffset = _streamPartitionMsgOffsetFactory.create(_currentOffset);  // so that we always update the metric when we enter this method.
+    StreamPartitionMsgOffset lastUpdatedOffset = _streamPartitionMsgOffsetFactory
+        .create(_currentOffset);  // so that we always update the metric when we enter this method.
     long consecutiveIdleCount = 0;
     // At this point, we know that we can potentially move the offset, so the old saved segment file is not valid
     // anymore. Remove the file if it exists.
@@ -473,7 +474,8 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
               GenericRow transformedRow = _recordTransformer.transform((GenericRow) singleRow);
               if (transformedRow != null && IngestionUtils.shouldIngestRow(transformedRow)) {
                 realtimeRowsConsumedMeter = _serverMetrics
-                    .addMeteredTableValue(_metricKeyName, ServerMeter.REALTIME_ROWS_CONSUMED, 1, realtimeRowsConsumedMeter);
+                    .addMeteredTableValue(_metricKeyName, ServerMeter.REALTIME_ROWS_CONSUMED, 1,
+                        realtimeRowsConsumedMeter);
                 indexedMessageCount++;
                 canTakeMore = _realtimeSegment.index(transformedRow, msgMetadata);
               } else {
@@ -486,7 +488,8 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
             GenericRow transformedRow = _recordTransformer.transform(decodedRow);
             if (transformedRow != null && IngestionUtils.shouldIngestRow(transformedRow)) {
               realtimeRowsConsumedMeter = _serverMetrics
-                  .addMeteredTableValue(_metricKeyName, ServerMeter.REALTIME_ROWS_CONSUMED, 1, realtimeRowsConsumedMeter);
+                  .addMeteredTableValue(_metricKeyName, ServerMeter.REALTIME_ROWS_CONSUMED, 1,
+                      realtimeRowsConsumedMeter);
               indexedMessageCount++;
               canTakeMore = _realtimeSegment.index(transformedRow, msgMetadata);
             } else {
@@ -901,8 +904,8 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
   protected void postStopConsumedMsg(String reason) {
     do {
       SegmentCompletionProtocol.Request.Params params = new SegmentCompletionProtocol.Request.Params();
-      params.withStreamPartitionMsgOffset(_currentOffset.toString()).withReason(reason)
-          .withSegmentName(_segmentNameStr).withInstanceId(_instanceId);
+      params.withStreamPartitionMsgOffset(_currentOffset.toString()).withReason(reason).withSegmentName(_segmentNameStr)
+          .withInstanceId(_instanceId);
 
       SegmentCompletionProtocol.Response response = _protocolHandler.segmentStoppedConsuming(params);
       if (response.getStatus() == SegmentCompletionProtocol.ControllerResponseStatus.PROCESSED) {
@@ -919,7 +922,7 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
     // Retry maybe once if leader is not found.
     SegmentCompletionProtocol.Request.Params params = new SegmentCompletionProtocol.Request.Params();
     params.withStreamPartitionMsgOffset(_currentOffset.toString()).withSegmentName(_segmentNameStr)
-        .withReason(_stopReason) .withNumRows(_numRowsConsumed).withInstanceId(_instanceId);
+        .withReason(_stopReason).withNumRows(_numRowsConsumed).withInstanceId(_instanceId);
     if (_isOffHeap) {
       params.withMemoryUsedBytes(_memoryManager.getTotalAllocatedBytes());
     }
@@ -1165,9 +1168,9 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
     // Start new realtime segment
     String consumerDir = realtimeTableDataManager.getConsumerDir();
     RealtimeSegmentConfig.Builder realtimeSegmentConfigBuilder =
-        new RealtimeSegmentConfig.Builder().setSegmentName(_segmentNameStr).setStreamName(_streamTopic)
-            .setSchema(_schema).setTimeColumnName(timeColumnName).setCapacity(_segmentMaxRowCount)
-            .setAvgNumMultiValues(indexLoadingConfig.getRealtimeAvgMultiValueCount())
+        new RealtimeSegmentConfig.Builder().setTableNameWithType(_tableNameWithType).setSegmentName(_segmentNameStr)
+            .setStreamName(_streamTopic).setSchema(_schema).setTimeColumnName(timeColumnName)
+            .setCapacity(_segmentMaxRowCount).setAvgNumMultiValues(indexLoadingConfig.getRealtimeAvgMultiValueCount())
             .setNoDictionaryColumns(indexLoadingConfig.getNoDictionaryColumns())
             .setVarLengthDictionaryColumns(indexLoadingConfig.getVarLengthDictionaryColumns())
             .setInvertedIndexColumns(invertedIndexColumns).setTextIndexColumns(textIndexColumns)
@@ -1204,6 +1207,12 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
         String partitionColumn = entry.getKey();
         ColumnPartitionConfig columnPartitionConfig = entry.getValue();
         String partitionFunctionName = columnPartitionConfig.getFunctionName();
+
+        // NOTE: Here we compare the number of partitions from the config and the stream, and log a warning and emit a
+        //       metric when they don't match, but use the one from the stream. The mismatch could happen when the
+        //       stream partitions are changed, but the table config has not been updated to reflect the change. In such
+        //       case, picking the number of partitions from the stream can keep the segment properly partitioned as
+        //       long as the partition function is not changed.
         int numPartitions = columnPartitionConfig.getNumPartitions();
         try {
           int numStreamPartitions = _streamMetadataProvider.fetchPartitionCount(/*maxWaitTimeMs=*/5000L);
@@ -1211,6 +1220,7 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
             segmentLogger.warn(
                 "Number of stream partitions: {} does not match number of partitions in the partition config: {}, using number of stream partitions",
                 numStreamPartitions, numPartitions);
+            _serverMetrics.addMeteredTableValue(_tableNameWithType, ServerMeter.REALTIME_PARTITION_MISMATCH, 1);
             numPartitions = numStreamPartitions;
           }
         } catch (Exception e) {
@@ -1219,6 +1229,7 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
               numPartitions, e);
           makeStreamMetadataProvider("Timeout getting number of stream partitions");
         }
+
         realtimeSegmentConfigBuilder.setPartitionColumn(partitionColumn);
         realtimeSegmentConfigBuilder
             .setPartitionFunction(PartitionFunctionFactory.getPartitionFunction(partitionFunctionName, numPartitions));
@@ -1228,7 +1239,7 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
       }
     }
 
-    _realtimeSegment = new MutableSegmentImpl(realtimeSegmentConfigBuilder.build());
+    _realtimeSegment = new MutableSegmentImpl(realtimeSegmentConfigBuilder.build(), serverMetrics);
     _startOffset = _streamPartitionMsgOffsetFactory.create(_segmentZKMetadata.getStartOffset());
     _currentOffset = _streamPartitionMsgOffsetFactory.create(_startOffset);
     _resourceTmpDir = new File(resourceDataDir, "_tmp");
@@ -1256,7 +1267,8 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
       _consumeEndTime = now + minConsumeTimeMillis;
     }
 
-    _segmentCommitterFactory = new SegmentCommitterFactory(segmentLogger, _protocolHandler, tableConfig, indexLoadingConfig, serverMetrics);
+    _segmentCommitterFactory =
+        new SegmentCommitterFactory(segmentLogger, _protocolHandler, tableConfig, indexLoadingConfig, serverMetrics);
 
     segmentLogger
         .info("Starting consumption on realtime consuming segment {} maxRowCount {} maxEndTime {}", _llcSegmentName,
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/mutable/MutableSegmentImpl.java b/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/mutable/MutableSegmentImpl.java
index f4fd34f..57534cf 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/mutable/MutableSegmentImpl.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/mutable/MutableSegmentImpl.java
@@ -35,6 +35,8 @@ import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import javax.annotation.Nullable;
 import org.apache.commons.lang3.StringUtils;
+import org.apache.pinot.common.metrics.ServerMeter;
+import org.apache.pinot.common.metrics.ServerMetrics;
 import org.apache.pinot.core.common.DataSource;
 import org.apache.pinot.core.data.partition.PartitionFunction;
 import org.apache.pinot.core.io.readerwriter.PinotDataBufferMemoryManager;
@@ -96,7 +98,9 @@ public class MutableSegmentImpl implements MutableSegment {
 
   private final Logger _logger;
   private final long _startTimeMillis = System.currentTimeMillis();
+  private final ServerMetrics _serverMetrics;
 
+  private final String _tableNameWithType;
   private final String _segmentName;
   private final Schema _schema;
   private final String _timeColumnName;
@@ -107,7 +111,6 @@ public class MutableSegmentImpl implements MutableSegment {
   private final RealtimeSegmentStatsHistory _statsHistory;
   private final String _partitionColumn;
   private final PartitionFunction _partitionFunction;
-  private final int _partitionId;
   private final boolean _nullHandlingEnabled;
 
   private final Map<String, IndexContainer> _indexContainerMap = new HashMap<>();
@@ -133,7 +136,10 @@ public class MutableSegmentImpl implements MutableSegment {
   private final Map<String, FieldSpec> _newlyAddedColumnsFieldMap = new ConcurrentHashMap();
   private final Map<String, FieldSpec> _newlyAddedPhysicalColumnsFieldMap = new ConcurrentHashMap();
 
-  public MutableSegmentImpl(RealtimeSegmentConfig config) {
+  public MutableSegmentImpl(RealtimeSegmentConfig config, @Nullable ServerMetrics serverMetrics) {
+    _serverMetrics = serverMetrics;
+
+    _tableNameWithType = config.getTableNameWithType();
     _segmentName = config.getSegmentName();
     _schema = config.getSchema();
     _timeColumnName = config.getTimeColumnName();
@@ -160,7 +166,6 @@ public class MutableSegmentImpl implements MutableSegment {
     _statsHistory = config.getStatsHistory();
     _partitionColumn = config.getPartitionColumn();
     _partitionFunction = config.getPartitionFunction();
-    _partitionId = config.getPartitionId();
     _nullHandlingEnabled = config.isNullHandlingEnabled();
     _aggregateMetrics = config.aggregateMetrics();
 
@@ -205,10 +210,16 @@ public class MutableSegmentImpl implements MutableSegment {
 
       // Partition info
       PartitionFunction partitionFunction = null;
-      int partitionId = 0;
+      Set<Integer> partitions = null;
       if (column.equals(_partitionColumn)) {
         partitionFunction = _partitionFunction;
-        partitionId = _partitionId;
+
+        // NOTE: Use a concurrent set because the partitions can be updated when the partition of the ingested record
+        //       does not match the stream partition. This could happen when stream partition changes, or the records
+        //       are not properly partitioned from the stream. Log an warning and emit a metric if it happens, then add
+        //       the new partition into this set.
+        partitions = ConcurrentHashMap.newKeySet();
+        partitions.add(config.getPartitionId());
       }
 
       // Check whether to generate raw index for the column while consuming
@@ -297,7 +308,7 @@ public class MutableSegmentImpl implements MutableSegment {
 
       // TODO: Support range index and bloom filter for mutable segment
       _indexContainerMap.put(column,
-          new IndexContainer(fieldSpec, partitionFunction, partitionId, new NumValuesInfo(), forwardIndex, dictionary,
+          new IndexContainer(fieldSpec, partitionFunction, partitions, new NumValuesInfo(), forwardIndex, dictionary,
               invertedIndexReader, null, textIndex, null, nullValueVector));
     }
 
@@ -472,6 +483,17 @@ public class MutableSegmentImpl implements MutableSegment {
       if (fieldSpec.isSingleValueField()) {
         // Single-value column
 
+        // Check partitions
+        if (column.equals(_partitionColumn)) {
+          int partition = _partitionFunction.getPartition(value);
+          if (indexContainer._partitions.add(partition)) {
+            _logger.warn("Found new partition: {} from partition column: {}, value: {}", partition, column, value);
+            if (_serverMetrics != null) {
+              _serverMetrics.addMeteredTableValue(_tableNameWithType, ServerMeter.REALTIME_PARTITION_MISMATCH, 1);
+            }
+          }
+        }
+
         // Update numValues info
         indexContainer._numValuesInfo.updateSVEntry();
 
@@ -967,7 +989,7 @@ public class MutableSegmentImpl implements MutableSegment {
   private class IndexContainer implements Closeable {
     final FieldSpec _fieldSpec;
     final PartitionFunction _partitionFunction;
-    final int _partitionId;
+    final Set<Integer> _partitions;
     final NumValuesInfo _numValuesInfo;
     final MutableForwardIndex _forwardIndex;
     final BaseMutableDictionary _dictionary;
@@ -984,14 +1006,14 @@ public class MutableSegmentImpl implements MutableSegment {
     int _dictId = Integer.MIN_VALUE;
     int[] _dictIds;
 
-    IndexContainer(FieldSpec fieldSpec, @Nullable PartitionFunction partitionFunction, int partitionId,
-        NumValuesInfo numValuesInfo, MutableForwardIndex forwardIndex, @Nullable BaseMutableDictionary dictionary,
-        @Nullable RealtimeInvertedIndexReader invertedIndex, @Nullable InvertedIndexReader rangeIndex,
-        @Nullable RealtimeLuceneTextIndexReader textIndex, @Nullable BloomFilterReader bloomFilter,
-        @Nullable MutableNullValueVector nullValueVector) {
+    IndexContainer(FieldSpec fieldSpec, @Nullable PartitionFunction partitionFunction,
+        @Nullable Set<Integer> partitions, NumValuesInfo numValuesInfo, MutableForwardIndex forwardIndex,
+        @Nullable BaseMutableDictionary dictionary, @Nullable RealtimeInvertedIndexReader invertedIndex,
+        @Nullable InvertedIndexReader rangeIndex, @Nullable RealtimeLuceneTextIndexReader textIndex,
+        @Nullable BloomFilterReader bloomFilter, @Nullable MutableNullValueVector nullValueVector) {
       _fieldSpec = fieldSpec;
       _partitionFunction = partitionFunction;
-      _partitionId = partitionId;
+      _partitions = partitions;
       _numValuesInfo = numValuesInfo;
       _forwardIndex = forwardIndex;
       _dictionary = dictionary;
@@ -1004,7 +1026,7 @@ public class MutableSegmentImpl implements MutableSegment {
 
     DataSource toDataSource() {
       return new MutableDataSource(_fieldSpec, _numDocsIndexed, _numValuesInfo._numValues,
-          _numValuesInfo._maxNumValuesPerMVEntry, _partitionFunction, _partitionId, _minValue, _maxValue, _forwardIndex,
+          _numValuesInfo._maxNumValuesPerMVEntry, _partitionFunction, _partitions, _minValue, _maxValue, _forwardIndex,
           _dictionary, _invertedIndex, _rangeIndex, _textIndex, _bloomFilter, _nullValueVector);
     }
 
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/realtime/impl/RealtimeSegmentConfig.java b/pinot-core/src/main/java/org/apache/pinot/core/realtime/impl/RealtimeSegmentConfig.java
index 0f97048..d9c8d0f 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/realtime/impl/RealtimeSegmentConfig.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/realtime/impl/RealtimeSegmentConfig.java
@@ -27,6 +27,7 @@ import org.apache.pinot.spi.data.Schema;
 
 
 public class RealtimeSegmentConfig {
+  private final String _tableNameWithType;
   private final String _segmentName;
   private final String _streamName;
   private final Schema _schema;
@@ -49,12 +50,13 @@ public class RealtimeSegmentConfig {
   private final String _consumerDir;
 
   // TODO: Clean up this constructor. Most of these things can be extracted from tableConfig.
-  private RealtimeSegmentConfig(String segmentName, String streamName, Schema schema, String timeColumnName,
-      int capacity, int avgNumMultiValues, Set<String> noDictionaryColumns, Set<String> varLengthDictionaryColumns,
-      Set<String> invertedIndexColumns, Set<String> textIndexColumns,
+  private RealtimeSegmentConfig(String tableNameWithType, String segmentName, String streamName, Schema schema,
+      String timeColumnName, int capacity, int avgNumMultiValues, Set<String> noDictionaryColumns,
+      Set<String> varLengthDictionaryColumns, Set<String> invertedIndexColumns, Set<String> textIndexColumns,
       RealtimeSegmentZKMetadata realtimeSegmentZKMetadata, boolean offHeap, PinotDataBufferMemoryManager memoryManager,
       RealtimeSegmentStatsHistory statsHistory, String partitionColumn, PartitionFunction partitionFunction,
       int partitionId, boolean aggregateMetrics, boolean nullHandlingEnabled, String consumerDir) {
+    _tableNameWithType = tableNameWithType;
     _segmentName = segmentName;
     _streamName = streamName;
     _schema = schema;
@@ -77,6 +79,10 @@ public class RealtimeSegmentConfig {
     _consumerDir = consumerDir;
   }
 
+  public String getTableNameWithType() {
+    return _tableNameWithType;
+  }
+
   public String getSegmentName() {
     return _segmentName;
   }
@@ -163,6 +169,7 @@ public class RealtimeSegmentConfig {
   }
 
   public static class Builder {
+    private String _tableNameWithType;
     private String _segmentName;
     private String _streamName;
     private Schema _schema;
@@ -187,6 +194,11 @@ public class RealtimeSegmentConfig {
     public Builder() {
     }
 
+    public Builder setTableNameWithType(String tableNameWithType) {
+      _tableNameWithType = tableNameWithType;
+      return this;
+    }
+
     public Builder setSegmentName(String segmentName) {
       _segmentName = segmentName;
       return this;
@@ -296,8 +308,8 @@ public class RealtimeSegmentConfig {
     }
 
     public RealtimeSegmentConfig build() {
-      return new RealtimeSegmentConfig(_segmentName, _streamName, _schema, _timeColumnName, _capacity,
-          _avgNumMultiValues, _noDictionaryColumns, _varLengthDictionaryColumns, _invertedIndexColumns,
+      return new RealtimeSegmentConfig(_tableNameWithType, _segmentName, _streamName, _schema, _timeColumnName,
+          _capacity, _avgNumMultiValues, _noDictionaryColumns, _varLengthDictionaryColumns, _invertedIndexColumns,
           _textIndexColumns, _realtimeSegmentZKMetadata, _offHeap, _memoryManager, _statsHistory, _partitionColumn,
           _partitionFunction, _partitionId, _aggregateMetrics, _nullHandlingEnabled, _consumerDir);
     }
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/datasource/MutableDataSource.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/datasource/MutableDataSource.java
index 3a6b3a2..a927353 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/datasource/MutableDataSource.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/datasource/MutableDataSource.java
@@ -18,7 +18,6 @@
  */
 package org.apache.pinot.core.segment.index.datasource;
 
-import java.util.Collections;
 import java.util.Set;
 import javax.annotation.Nullable;
 import org.apache.pinot.core.common.DataSourceMetadata;
@@ -35,16 +34,17 @@ import org.apache.pinot.spi.data.FieldSpec;
 /**
  * The {@code MutableDataSource} class is the data source for a column in the mutable segment.
  */
+@SuppressWarnings("rawtypes")
 public class MutableDataSource extends BaseDataSource {
 
   public MutableDataSource(FieldSpec fieldSpec, int numDocs, int numValues, int maxNumValuesPerMVEntry,
-      @Nullable PartitionFunction partitionFunction, int partitionId, @Nullable Comparable minValue,
+      @Nullable PartitionFunction partitionFunction, @Nullable Set<Integer> partitions, @Nullable Comparable minValue,
       @Nullable Comparable maxValue, ForwardIndexReader forwardIndex, @Nullable Dictionary dictionary,
       @Nullable InvertedIndexReader invertedIndex, @Nullable InvertedIndexReader rangeIndex,
       @Nullable TextIndexReader textIndex, @Nullable BloomFilterReader bloomFilter,
       @Nullable NullValueVectorReader nullValueVector) {
     super(new MutableDataSourceMetadata(fieldSpec, numDocs, numValues, maxNumValuesPerMVEntry, partitionFunction,
-            partitionId, minValue, maxValue), forwardIndex, dictionary, invertedIndex, rangeIndex, textIndex, bloomFilter,
+            partitions, minValue, maxValue), forwardIndex, dictionary, invertedIndex, rangeIndex, textIndex, bloomFilter,
         nullValueVector);
   }
 
@@ -59,7 +59,7 @@ public class MutableDataSource extends BaseDataSource {
     final Comparable _maxValue;
 
     MutableDataSourceMetadata(FieldSpec fieldSpec, int numDocs, int numValues, int maxNumValuesPerMVEntry,
-        @Nullable PartitionFunction partitionFunction, int partitionId, @Nullable Comparable minValue,
+        @Nullable PartitionFunction partitionFunction, @Nullable Set<Integer> partitions, @Nullable Comparable minValue,
         @Nullable Comparable maxValue) {
       _fieldSpec = fieldSpec;
       _numDocs = numDocs;
@@ -67,7 +67,7 @@ public class MutableDataSource extends BaseDataSource {
       _maxNumValuesPerMVEntry = maxNumValuesPerMVEntry;
       if (partitionFunction != null) {
         _partitionFunction = partitionFunction;
-        _partitions = Collections.singleton(partitionId);
+        _partitions = partitions;
       } else {
         _partitionFunction = null;
         _partitions = null;
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/indexsegment/mutable/MutableSegmentImplTestUtils.java b/pinot-core/src/test/java/org/apache/pinot/core/indexsegment/mutable/MutableSegmentImplTestUtils.java
index e4df760..e7a7363 100644
--- a/pinot-core/src/test/java/org/apache/pinot/core/indexsegment/mutable/MutableSegmentImplTestUtils.java
+++ b/pinot-core/src/test/java/org/apache/pinot/core/indexsegment/mutable/MutableSegmentImplTestUtils.java
@@ -19,7 +19,6 @@
 package org.apache.pinot.core.indexsegment.mutable;
 
 import java.util.Set;
-import javax.annotation.Nonnull;
 import org.apache.pinot.common.metadata.segment.RealtimeSegmentZKMetadata;
 import org.apache.pinot.core.io.writer.impl.DirectMemoryManager;
 import org.apache.pinot.core.realtime.impl.RealtimeSegmentConfig;
@@ -35,30 +34,30 @@ public class MutableSegmentImplTestUtils {
   private MutableSegmentImplTestUtils() {
   }
 
-  private static final String STEAM_NAME = "testStream";
+  private static final String TABLE_NAME_WITH_TYPE = "testTable_REALTIME";
   private static final String SEGMENT_NAME = "testSegment";
+  private static final String STEAM_NAME = "testStream";
 
-  public static MutableSegmentImpl createMutableSegmentImpl(@Nonnull Schema schema,
-      @Nonnull Set<String> noDictionaryColumns, @Nonnull Set<String> varLengthDictionaryColumns,
-      @Nonnull Set<String> invertedIndexColumns, boolean aggregateMetrics) {
+  public static MutableSegmentImpl createMutableSegmentImpl(Schema schema, Set<String> noDictionaryColumns,
+      Set<String> varLengthDictionaryColumns, Set<String> invertedIndexColumns, boolean aggregateMetrics) {
     return createMutableSegmentImpl(schema, noDictionaryColumns, varLengthDictionaryColumns, invertedIndexColumns,
         aggregateMetrics, false);
   }
 
-  public static MutableSegmentImpl createMutableSegmentImpl(@Nonnull Schema schema,
-      @Nonnull Set<String> noDictionaryColumns, @Nonnull Set<String> varLengthDictionaryColumns,
-      @Nonnull Set<String> invertedIndexColumns, boolean aggregateMetrics, boolean nullHandlingEnabled) {
+  public static MutableSegmentImpl createMutableSegmentImpl(Schema schema, Set<String> noDictionaryColumns,
+      Set<String> varLengthDictionaryColumns, Set<String> invertedIndexColumns, boolean aggregateMetrics,
+      boolean nullHandlingEnabled) {
     RealtimeSegmentStatsHistory statsHistory = mock(RealtimeSegmentStatsHistory.class);
     when(statsHistory.getEstimatedCardinality(anyString())).thenReturn(200);
     when(statsHistory.getEstimatedAvgColSize(anyString())).thenReturn(32);
 
     RealtimeSegmentConfig realtimeSegmentConfig =
-        new RealtimeSegmentConfig.Builder().setSegmentName(SEGMENT_NAME).setStreamName(STEAM_NAME).setSchema(schema)
-            .setCapacity(100000).setAvgNumMultiValues(2).setNoDictionaryColumns(noDictionaryColumns)
-            .setVarLengthDictionaryColumns(varLengthDictionaryColumns).setInvertedIndexColumns(invertedIndexColumns)
-            .setRealtimeSegmentZKMetadata(new RealtimeSegmentZKMetadata())
+        new RealtimeSegmentConfig.Builder().setTableNameWithType(TABLE_NAME_WITH_TYPE).setSegmentName(SEGMENT_NAME)
+            .setStreamName(STEAM_NAME).setSchema(schema).setCapacity(100000).setAvgNumMultiValues(2)
+            .setNoDictionaryColumns(noDictionaryColumns).setVarLengthDictionaryColumns(varLengthDictionaryColumns)
+            .setInvertedIndexColumns(invertedIndexColumns).setRealtimeSegmentZKMetadata(new RealtimeSegmentZKMetadata())
             .setMemoryManager(new DirectMemoryManager(SEGMENT_NAME)).setStatsHistory(statsHistory)
             .setAggregateMetrics(aggregateMetrics).setNullHandlingEnabled(nullHandlingEnabled).build();
-    return new MutableSegmentImpl(realtimeSegmentConfig);
+    return new MutableSegmentImpl(realtimeSegmentConfig, null);
   }
 }
diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SegmentPartitionLLCRealtimeClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SegmentPartitionLLCRealtimeClusterIntegrationTest.java
new file mode 100644
index 0000000..c520599
--- /dev/null
+++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SegmentPartitionLLCRealtimeClusterIntegrationTest.java
@@ -0,0 +1,185 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.integration.tests;
+
+import java.io.File;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import javax.annotation.Nullable;
+import org.apache.commons.io.FileUtils;
+import org.apache.pinot.common.metadata.segment.ColumnPartitionMetadata;
+import org.apache.pinot.common.metadata.segment.RealtimeSegmentZKMetadata;
+import org.apache.pinot.common.metadata.segment.SegmentPartitionMetadata;
+import org.apache.pinot.common.utils.CommonConstants.Segment.Realtime.Status;
+import org.apache.pinot.common.utils.LLCSegmentName;
+import org.apache.pinot.spi.config.table.ColumnPartitionConfig;
+import org.apache.pinot.spi.config.table.IndexingConfig;
+import org.apache.pinot.spi.config.table.RoutingConfig;
+import org.apache.pinot.spi.config.table.SegmentPartitionConfig;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.data.FieldSpec.DataType;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.util.TestUtils;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertTrue;
+
+
+/**
+ * Integration test that enables segment partition for the LLC real-time table.
+ */
+public class SegmentPartitionLLCRealtimeClusterIntegrationTest extends BaseClusterIntegrationTest {
+  // Number of documents in the first Avro file
+  private static final long NUM_DOCS = 9292;
+
+  @BeforeClass
+  public void setUp()
+      throws Exception {
+    TestUtils.ensureDirectoriesExistAndEmpty(_tempDir);
+
+    // Start the Pinot cluster
+    startZk();
+    startController();
+    startBroker();
+    startServer();
+
+    // Start Kafka
+    startKafka();
+
+    // Unpack the Avro files
+    List<File> avroFiles = unpackAvroData(_tempDir);
+
+    // Create and upload the schema and table config with reduced number of columns and partition config
+    Schema schema =
+        new Schema.SchemaBuilder().setSchemaName(getSchemaName()).addSingleValueDimension("Carrier", DataType.STRING)
+            .addDateTime("DaysSinceEpoch", DataType.INT, "1:DAYS:EPOCH", "1:DAYS").build();
+    addSchema(schema);
+
+    TableConfig tableConfig = createRealtimeTableConfig(avroFiles.get(0));
+    IndexingConfig indexingConfig = tableConfig.getIndexingConfig();
+    indexingConfig.setSegmentPartitionConfig(
+        new SegmentPartitionConfig(Collections.singletonMap("Carrier", new ColumnPartitionConfig("murmur", 5))));
+    tableConfig.setRoutingConfig(
+        new RoutingConfig(null, Collections.singletonList(RoutingConfig.PARTITION_SEGMENT_PRUNER_TYPE), null));
+    addTableConfig(tableConfig);
+
+    // Push data into Kafka (only ingest the first Avro file)
+    pushAvroIntoKafka(Collections.singletonList(avroFiles.get(0)));
+
+    // Wait for all documents loaded
+    waitForAllDocsLoaded(600_000L);
+  }
+
+  @Override
+  protected long getCountStarResult() {
+    return NUM_DOCS;
+  }
+
+  @Override
+  protected boolean useLlc() {
+    return true;
+  }
+
+  @Nullable
+  @Override
+  protected List<String> getInvertedIndexColumns() {
+    return null;
+  }
+
+  @Nullable
+  @Override
+  protected List<String> getNoDictionaryColumns() {
+    return null;
+  }
+
+  @Nullable
+  @Override
+  protected List<String> getRangeIndexColumns() {
+    return null;
+  }
+
+  @Nullable
+  @Override
+  protected List<String> getBloomFilterColumns() {
+    return null;
+  }
+
+  @Test
+  public void testPartitionMetadata() {
+    List<RealtimeSegmentZKMetadata> segmentZKMetadataList =
+        _helixResourceManager.getRealtimeSegmentMetadata(getTableName());
+    for (RealtimeSegmentZKMetadata segmentZKMetadata : segmentZKMetadataList) {
+      SegmentPartitionMetadata segmentPartitionMetadata = segmentZKMetadata.getPartitionMetadata();
+      assertNotNull(segmentPartitionMetadata);
+      Map<String, ColumnPartitionMetadata> columnPartitionMetadataMap =
+          segmentPartitionMetadata.getColumnPartitionMap();
+      assertEquals(columnPartitionMetadataMap.size(), 1);
+      ColumnPartitionMetadata columnPartitionMetadata = columnPartitionMetadataMap.get("Carrier");
+      assertNotNull(columnPartitionMetadata);
+
+      // The function name should be aligned with the partition config in the table config
+      assertTrue(columnPartitionMetadata.getFunctionName().equalsIgnoreCase("murmur"));
+
+      if (segmentZKMetadata.getStatus() == Status.IN_PROGRESS) {
+        // Consuming segment
+
+        // Number of partitions should be aligned with the partition config in the table config
+        assertEquals(columnPartitionMetadata.getNumPartitions(), 5);
+
+        // Should contain only the stream partition
+        assertEquals(columnPartitionMetadata.getPartitions(),
+            Collections.singleton(new LLCSegmentName(segmentZKMetadata.getSegmentName()).getPartitionId()));
+      } else {
+        // Completed segment
+
+        // Number of partitions should be the same as number of stream partitions
+        assertEquals(columnPartitionMetadata.getNumPartitions(), 2);
+
+        // Should contain the partitions based on the ingested records. Since the records are not partitioned in Kafka,
+        // it should contain all the partitions.
+        assertEquals(columnPartitionMetadata.getPartitions(), new HashSet<>(Arrays.asList(0, 1)));
+      }
+    }
+  }
+
+  // TODO: Add test on partition routing once the consuming segment behavior is fixed.
+  //       Currently the partition info is cached in the PartitionSegmentPruner, and won't be reloaded when the
+  //       consuming segment gets committed. The segment will be pruned based on the consuming segment partition info
+  //       (using stream partition as the segment partition), even if the partition info changed for the completed
+  //       segment.
+
+  @AfterClass
+  public void tearDown()
+      throws Exception {
+    dropRealtimeTable(getTableName());
+    stopServer();
+    stopBroker();
+    stopController();
+    stopKafka();
+    stopZk();
+    FileUtils.deleteDirectory(_tempDir);
+  }
+}
diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/realtime/provisioning/MemoryEstimator.java b/pinot-tools/src/main/java/org/apache/pinot/tools/realtime/provisioning/MemoryEstimator.java
index 874d4c2..ef0484f 100644
--- a/pinot-tools/src/main/java/org/apache/pinot/tools/realtime/provisioning/MemoryEstimator.java
+++ b/pinot-tools/src/main/java/org/apache/pinot/tools/realtime/provisioning/MemoryEstimator.java
@@ -85,7 +85,7 @@ public class MemoryEstimator {
       throw new RuntimeException("Caught exception when reading segment index dir", e);
     }
     _totalDocsInSampleSegment = _segmentMetadata.getTotalDocs();
-    _sampleSegmentConsumedSeconds = (int)(_totalDocsInSampleSegment/ingestionRate);
+    _sampleSegmentConsumedSeconds = (int) (_totalDocsInSampleSegment / ingestionRate);
 
     if (CollectionUtils.isNotEmpty(_tableConfig.getIndexingConfig().getNoDictionaryColumns())) {
       _noDictionaryColumns.addAll(_tableConfig.getIndexingConfig().getNoDictionaryColumns());
@@ -130,7 +130,8 @@ public class MemoryEstimator {
 
     // create a config
     RealtimeSegmentConfig.Builder realtimeSegmentConfigBuilder =
-        new RealtimeSegmentConfig.Builder().setSegmentName(_segmentMetadata.getName()).setStreamName(_tableNameWithType)
+        new RealtimeSegmentConfig.Builder().setTableNameWithType(_tableNameWithType)
+            .setSegmentName(_segmentMetadata.getName()).setStreamName(_tableNameWithType)
             .setSchema(_segmentMetadata.getSchema()).setCapacity(_segmentMetadata.getTotalDocs())
             .setAvgNumMultiValues(_avgMultiValues).setNoDictionaryColumns(_noDictionaryColumns)
             .setVarLengthDictionaryColumns(_varLengthDictionaryColumns).setInvertedIndexColumns(_invertedIndexColumns)
@@ -138,13 +139,13 @@ public class MemoryEstimator {
             .setStatsHistory(sampleStatsHistory);
 
     // create mutable segment impl
-    MutableSegmentImpl mutableSegmentImpl = new MutableSegmentImpl(realtimeSegmentConfigBuilder.build());
+    MutableSegmentImpl mutableSegmentImpl = new MutableSegmentImpl(realtimeSegmentConfigBuilder.build(), null);
 
     // read all rows and index them
     try (PinotSegmentRecordReader segmentRecordReader = new PinotSegmentRecordReader(_sampleCompletedSegment)) {
       GenericRow row = new GenericRow();
       while (segmentRecordReader.hasNext()) {
-        segmentRecordReader.next(row);
+        row = segmentRecordReader.next(row);
         mutableSegmentImpl.index(row, null);
         row.clear();
       }
@@ -224,9 +225,10 @@ public class MemoryEstimator {
 
       memoryForConsumingSegmentPerPartition += getMemoryForInvertedIndex(memoryForConsumingSegmentPerPartition);
 
-      int numActiveSegmentsPerPartition = (retentionHours  + numHoursToConsume - 1)/numHoursToConsume;
-      long activeMemoryForCompletedSegmentsPerPartition = completedSegmentSizeBytes * (numActiveSegmentsPerPartition - 1);
-      int numCompletedSegmentsPerPartition = (_tableRetentionHours + numHoursToConsume - 1)/numHoursToConsume - 1;
+      int numActiveSegmentsPerPartition = (retentionHours + numHoursToConsume - 1) / numHoursToConsume;
+      long activeMemoryForCompletedSegmentsPerPartition =
+          completedSegmentSizeBytes * (numActiveSegmentsPerPartition - 1);
+      int numCompletedSegmentsPerPartition = (_tableRetentionHours + numHoursToConsume - 1) / numHoursToConsume - 1;
 
       for (int j = 0; j < numHosts.length; j++) {
         int numHostsToProvision = numHosts[j];
@@ -238,22 +240,26 @@ public class MemoryEstimator {
             activeMemoryForCompletedSegmentsPerPartition * totalConsumingPartitionsPerHost;
         long totalMemoryForConsumingSegmentsPerHost =
             memoryForConsumingSegmentPerPartition * totalConsumingPartitionsPerHost;
-        long activeMemoryPerHostBytes = activeMemoryForCompletedSegmentsPerHost + totalMemoryForConsumingSegmentsPerHost;
-        long mappedMemoryPerHost = totalMemoryForConsumingSegmentsPerHost +
-            (numCompletedSegmentsPerPartition * totalConsumingPartitionsPerHost * completedSegmentSizeBytes);
+        long activeMemoryPerHostBytes =
+            activeMemoryForCompletedSegmentsPerHost + totalMemoryForConsumingSegmentsPerHost;
+        long mappedMemoryPerHost =
+            totalMemoryForConsumingSegmentsPerHost + (numCompletedSegmentsPerPartition * totalConsumingPartitionsPerHost
+                * completedSegmentSizeBytes);
 
         if (activeMemoryPerHostBytes <= _maxUsableHostMemory) {
-          _activeMemoryPerHost[i][j] = DataSizeUtils.fromBytes(activeMemoryPerHostBytes)
-              + "/" + DataSizeUtils.fromBytes(mappedMemoryPerHost);
+          _activeMemoryPerHost[i][j] =
+              DataSizeUtils.fromBytes(activeMemoryPerHostBytes) + "/" + DataSizeUtils.fromBytes(mappedMemoryPerHost);
           _consumingMemoryPerHost[i][j] = DataSizeUtils.fromBytes(totalMemoryForConsumingSegmentsPerHost);
           _optimalSegmentSize[i][j] = DataSizeUtils.fromBytes(completedSegmentSizeBytes);
-          _numSegmentsQueriedPerHost[i][j] = String.valueOf(numActiveSegmentsPerPartition * totalConsumingPartitionsPerHost);
+          _numSegmentsQueriedPerHost[i][j] =
+              String.valueOf(numActiveSegmentsPerPartition * totalConsumingPartitionsPerHost);
         }
       }
     }
   }
 
-  private long getMemoryForConsumingSegmentPerPartition(File statsFile, int totalDocs) throws IOException {
+  private long getMemoryForConsumingSegmentPerPartition(File statsFile, int totalDocs)
+      throws IOException {
     // We don't want the stats history to get updated from all our dummy runs
     // So we copy over the original stats history every time we start
     File statsFileCopy = new File(_tableDataDir, STATS_FILE_COPY_NAME);
@@ -269,15 +275,15 @@ public class MemoryEstimator {
     RealtimeSegmentZKMetadata segmentZKMetadata = getRealtimeSegmentZKMetadata(_segmentMetadata, totalDocs);
 
     RealtimeSegmentConfig.Builder realtimeSegmentConfigBuilder =
-        new RealtimeSegmentConfig.Builder().setSegmentName(_segmentMetadata.getName())
-            .setStreamName(_tableNameWithType).setSchema(_segmentMetadata.getSchema()).setCapacity(totalDocs)
-            .setAvgNumMultiValues(_avgMultiValues).setNoDictionaryColumns(_noDictionaryColumns)
-            .setVarLengthDictionaryColumns(_varLengthDictionaryColumns).setInvertedIndexColumns(_invertedIndexColumns)
-            .setRealtimeSegmentZKMetadata(segmentZKMetadata).setOffHeap(true).setMemoryManager(memoryManager)
-            .setStatsHistory(statsHistory);
+        new RealtimeSegmentConfig.Builder().setTableNameWithType(_tableNameWithType)
+            .setSegmentName(_segmentMetadata.getName()).setStreamName(_tableNameWithType)
+            .setSchema(_segmentMetadata.getSchema()).setCapacity(totalDocs).setAvgNumMultiValues(_avgMultiValues)
+            .setNoDictionaryColumns(_noDictionaryColumns).setVarLengthDictionaryColumns(_varLengthDictionaryColumns)
+            .setInvertedIndexColumns(_invertedIndexColumns).setRealtimeSegmentZKMetadata(segmentZKMetadata)
+            .setOffHeap(true).setMemoryManager(memoryManager).setStatsHistory(statsHistory);
 
     // create mutable segment impl
-    MutableSegmentImpl mutableSegmentImpl = new MutableSegmentImpl(realtimeSegmentConfigBuilder.build());
+    MutableSegmentImpl mutableSegmentImpl = new MutableSegmentImpl(realtimeSegmentConfigBuilder.build(), null);
     long memoryForConsumingSegmentPerPartition = memoryManager.getTotalAllocatedBytes();
     mutableSegmentImpl.destroy();
     FileUtils.deleteQuietly(statsFileCopy);
@@ -362,7 +368,7 @@ public class MemoryEstimator {
   private long calculateMemoryForCompletedSegmentsPerPartition(long completedSegmentSizeBytes, int numHoursToConsume,
       int retentionHours) {
 
-    int numSegmentsInMemory = (retentionHours + numHoursToConsume - 1)/numHoursToConsume;
+    int numSegmentsInMemory = (retentionHours + numHoursToConsume - 1) / numHoursToConsume;
     return completedSegmentSizeBytes * (numSegmentsInMemory - 1);
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org