You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2019/11/12 20:13:09 UTC

[incubator-pinot] 10/12: improve pinot server upsert segment loading time

This is an automated email from the ASF dual-hosted git repository.

jamesshao pushed a commit to branch upsert
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git

commit 7c8cc38a0aec511e2afb41d7ef9d1cdf1c3174d6
Author: james Shao <sj...@uber.com>
AuthorDate: Mon Nov 11 13:47:16 2019 -0800

    improve pinot server upsert segment loading time
    
    Summary:
    previously pinot server spent too much time on loading virtual column for upsert segment (10+ seconds for segment with 5M updates), we did some investigation and found issues related to how we load the virtual columns. The major issues boils down to:
    
    1. for each message, we provide an message to Preconditions.checkState(...) with String.format(..). This cause java to construct a new string with String.format for every update events and it is very costly
    2. various synchronization that are no longer necessary
    3. various minor improvement by picking correct data types/optimization
    
    after optimization the time for init virtual column should be < 0.5 seconds for segments with 5M updates. The overall segments loading time should be within 1-2 seconds  (including loading virtual column file from disk and others related work)
    
    Reviewers: bzzhang, tingchen, #streaming_pinot
    
    Reviewed By: bzzhang, #streaming_pinot
    
    Maniphest Tasks: T4441685
    
    Differential Revision: https://code.uberinternal.com/D3571549
---
 pinot-api/pom.xml                                  |   2 +-
 pinot-azure-filesystem/pom.xml                     |   2 +-
 pinot-broker/pom.xml                               |   2 +-
 pinot-common/pom.xml                               |   2 +-
 .../pinot-connector-kafka-0.11/pom.xml             |   2 +-
 pinot-connectors/pinot-connector-kafka-0.9/pom.xml |   2 +-
 pinot-connectors/pom.xml                           |   2 +-
 pinot-controller/pom.xml                           |   2 +-
 pinot-core/pom.xml                                 |   2 +-
 .../immutable/ImmutableUpsertSegmentImpl.java      | 119 +++++++++++++--------
 .../segment/updater/UpsertWaterMarkManager.java    |  13 ++-
 .../ValidFromInMemoryVirtualColumnProvider.java    |   3 +-
 .../ValidUntilInMemoryVirtualColumnProvider.java   |   3 +-
 .../VirtualColumnLongValueReaderWriter.java        |  28 +++--
 .../immutable/ImmutableUpsertSegmentImplTest.java  | 118 ++++++++++++++++++++
 pinot-distribution/pom.xml                         |   2 +-
 pinot-grigio/pinot-grigio-common/pom.xml           |   2 +-
 .../pinot/grigio/common/metrics/GrigioMeter.java   |   1 -
 .../SegmentUpdateLogStorageProvider.java           |   7 +-
 .../storageProvider/UpdateLogStorageProvider.java  |   1 -
 .../SegmentUpdateLogStorageProviderTest.java       |  15 +++
 pinot-grigio/pinot-grigio-coordinator/pom.xml      |   2 +-
 pinot-grigio/pom.xml                               |   2 +-
 pinot-hadoop-filesystem/pom.xml                    |   2 +-
 pinot-hadoop/pom.xml                               |   2 +-
 pinot-integration-tests/pom.xml                    |   2 +-
 pinot-minion/pom.xml                               |   2 +-
 pinot-orc/pom.xml                                  |   2 +-
 pinot-parquet/pom.xml                              |   2 +-
 pinot-perf/pom.xml                                 |   2 +-
 pinot-server/pom.xml                               |   2 +-
 pinot-tools/pom.xml                                |   2 +-
 pinot-transport/pom.xml                            |   2 +-
 pom.xml                                            |   2 +-
 34 files changed, 262 insertions(+), 94 deletions(-)

diff --git a/pinot-api/pom.xml b/pinot-api/pom.xml
index 8763b4b..41391eb 100644
--- a/pinot-api/pom.xml
+++ b/pinot-api/pom.xml
@@ -24,7 +24,7 @@
   <parent>
     <artifactId>pinot</artifactId>
     <groupId>org.apache.pinot</groupId>
-    <version>0.2.2.5.49-SNAPSHOT</version>
+    <version>0.2.2.5.57-SNAPSHOT</version>
   </parent>
   <artifactId>pinot-api</artifactId>
   <name>Pinot API</name>
diff --git a/pinot-azure-filesystem/pom.xml b/pinot-azure-filesystem/pom.xml
index e42b355..ce07b85 100644
--- a/pinot-azure-filesystem/pom.xml
+++ b/pinot-azure-filesystem/pom.xml
@@ -24,7 +24,7 @@
   <parent>
     <artifactId>pinot</artifactId>
     <groupId>org.apache.pinot</groupId>
-    <version>0.2.2.5.49-SNAPSHOT</version>
+    <version>0.2.2.5.57-SNAPSHOT</version>
   </parent>
   <artifactId>pinot-azure-filesystem</artifactId>
   <name>Pinot Azure Filesystem</name>
diff --git a/pinot-broker/pom.xml b/pinot-broker/pom.xml
index d625e2d..4e92b3f 100644
--- a/pinot-broker/pom.xml
+++ b/pinot-broker/pom.xml
@@ -24,7 +24,7 @@
   <parent>
     <artifactId>pinot</artifactId>
     <groupId>org.apache.pinot</groupId>
-    <version>0.2.2.5.49-SNAPSHOT</version>
+    <version>0.2.2.5.57-SNAPSHOT</version>
   </parent>
   <artifactId>pinot-broker</artifactId>
   <name>Pinot Broker</name>
diff --git a/pinot-common/pom.xml b/pinot-common/pom.xml
index cde3078..4ce2e32 100644
--- a/pinot-common/pom.xml
+++ b/pinot-common/pom.xml
@@ -24,7 +24,7 @@
   <parent>
     <artifactId>pinot</artifactId>
     <groupId>org.apache.pinot</groupId>
-    <version>0.2.2.5.49-SNAPSHOT</version>
+    <version>0.2.2.5.57-SNAPSHOT</version>
   </parent>
   <artifactId>pinot-common</artifactId>
   <name>Pinot Common</name>
diff --git a/pinot-connectors/pinot-connector-kafka-0.11/pom.xml b/pinot-connectors/pinot-connector-kafka-0.11/pom.xml
index 0b32fe0..5e384d5 100644
--- a/pinot-connectors/pinot-connector-kafka-0.11/pom.xml
+++ b/pinot-connectors/pinot-connector-kafka-0.11/pom.xml
@@ -23,7 +23,7 @@
   <parent>
     <artifactId>pinot-connectors</artifactId>
     <groupId>org.apache.pinot</groupId>
-    <version>0.2.2.5.49-SNAPSHOT</version>
+    <version>0.2.2.5.57-SNAPSHOT</version>
     <relativePath>..</relativePath>
   </parent>
   <modelVersion>4.0.0</modelVersion>
diff --git a/pinot-connectors/pinot-connector-kafka-0.9/pom.xml b/pinot-connectors/pinot-connector-kafka-0.9/pom.xml
index f409a8f..daf74ad 100644
--- a/pinot-connectors/pinot-connector-kafka-0.9/pom.xml
+++ b/pinot-connectors/pinot-connector-kafka-0.9/pom.xml
@@ -23,7 +23,7 @@
   <parent>
     <artifactId>pinot-connectors</artifactId>
     <groupId>org.apache.pinot</groupId>
-    <version>0.2.2.5.49-SNAPSHOT</version>
+    <version>0.2.2.5.57-SNAPSHOT</version>
     <relativePath>..</relativePath>
   </parent>
   <modelVersion>4.0.0</modelVersion>
diff --git a/pinot-connectors/pom.xml b/pinot-connectors/pom.xml
index b2cfd90..f4e8763 100644
--- a/pinot-connectors/pom.xml
+++ b/pinot-connectors/pom.xml
@@ -24,7 +24,7 @@
   <parent>
     <artifactId>pinot</artifactId>
     <groupId>org.apache.pinot</groupId>
-    <version>0.2.2.5.49-SNAPSHOT</version>
+    <version>0.2.2.5.57-SNAPSHOT</version>
   </parent>
 
   <artifactId>pinot-connectors</artifactId>
diff --git a/pinot-controller/pom.xml b/pinot-controller/pom.xml
index cf1e1f3..259300d 100644
--- a/pinot-controller/pom.xml
+++ b/pinot-controller/pom.xml
@@ -24,7 +24,7 @@
   <parent>
     <artifactId>pinot</artifactId>
     <groupId>org.apache.pinot</groupId>
-    <version>0.2.2.5.49-SNAPSHOT</version>
+    <version>0.2.2.5.57-SNAPSHOT</version>
   </parent>
   <artifactId>pinot-controller</artifactId>
   <name>Pinot Controller</name>
diff --git a/pinot-core/pom.xml b/pinot-core/pom.xml
index 141249d..06309cc 100644
--- a/pinot-core/pom.xml
+++ b/pinot-core/pom.xml
@@ -24,7 +24,7 @@
   <parent>
     <artifactId>pinot</artifactId>
     <groupId>org.apache.pinot</groupId>
-    <version>0.2.2.5.49-SNAPSHOT</version>
+    <version>0.2.2.5.57-SNAPSHOT</version>
   </parent>
   <artifactId>pinot-core</artifactId>
   <name>Pinot Core</name>
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/immutable/ImmutableUpsertSegmentImpl.java b/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/immutable/ImmutableUpsertSegmentImpl.java
index 80723b4..1537689 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/immutable/ImmutableUpsertSegmentImpl.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/immutable/ImmutableUpsertSegmentImpl.java
@@ -19,10 +19,9 @@
 package org.apache.pinot.core.indexsegment.immutable;
 
 import com.clearspring.analytics.util.Preconditions;
-import com.google.common.collect.ArrayListMultimap;
-import com.google.common.collect.Multimap;
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.pinot.common.Utils;
 import org.apache.pinot.common.config.TableNameBuilder;
-import org.apache.pinot.common.data.Schema;
 import org.apache.pinot.common.utils.CommonConstants;
 import org.apache.pinot.core.indexsegment.UpsertSegment;
 import org.apache.pinot.core.io.reader.BaseSingleColumnSingleValueReader;
@@ -42,7 +41,6 @@ import org.slf4j.LoggerFactory;
 import javax.annotation.Nullable;
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.Collection;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -51,19 +49,19 @@ public class ImmutableUpsertSegmentImpl extends ImmutableSegmentImpl implements
 
   private static final Logger LOGGER = LoggerFactory.getLogger(ImmutableUpsertSegmentImpl.class);
 
-  private final List<VirtualColumnLongValueReaderWriter> _virtualColumnsReaderWriter = new ArrayList<>();
-
+  private final List<VirtualColumnLongValueReaderWriter> _virtualColumnsReaderWriter;
   private final String _tableNameWithType;
   private final String _segmentName;
-  private final Schema _schema;
   private final int _totalDoc;
-  private final ColumnIndexContainer _offsetColumnIndexContainer;
-  private final UpsertWaterMarkManager _upsertWaterMarkManager;
   private long _minSourceOffset;
+  private final UpsertWaterMarkManager _upsertWaterMarkManager;
+  private final UpdateLogStorageProvider _updateLogStorageProvider;
   // use array for mapping bewteen offset to docId, where actual offset = min_offset + array_index
   // use 4 bytes per record
   private int[] _sourceOffsetToDocIdArray;
 
+  private static final int DEFAULT_DOC_ID_FOR_MISSING_ENTRY = -1;
+
   public ImmutableUpsertSegmentImpl(SegmentDirectory segmentDirectory,
                                     SegmentMetadataImpl segmentMetadata,
                                     Map<String, ColumnIndexContainer> columnIndexContainerMap,
@@ -73,23 +71,44 @@ public class ImmutableUpsertSegmentImpl extends ImmutableSegmentImpl implements
     _tableNameWithType = TableNameBuilder.ensureTableNameWithType(segmentMetadata.getTableName(),
         CommonConstants.Helix.TableType.REALTIME);
     _segmentName = segmentMetadata.getName();
-    _schema = segmentMetadata.getSchema();
     _totalDoc = segmentMetadata.getTotalDocs();
-    _offsetColumnIndexContainer = columnIndexContainerMap.get(_schema.getOffsetKey());
     _upsertWaterMarkManager = UpsertWaterMarkManager.getInstance();
+    _updateLogStorageProvider = UpdateLogStorageProvider.getInstance();
+    _virtualColumnsReaderWriter = new ArrayList<>();
     for (Map.Entry<String, ColumnIndexContainer> entry: columnIndexContainerMap.entrySet()) {
       String columnName = entry.getKey();
       ColumnIndexContainer container = entry.getValue();
-      if (_schema.isVirtualColumn(columnName) && (container.getForwardIndex() instanceof VirtualColumnLongValueReaderWriter)) {
+      if (segmentMetadata.getSchema().isVirtualColumn(columnName) && (container.getForwardIndex() instanceof VirtualColumnLongValueReaderWriter)) {
         _virtualColumnsReaderWriter.add((VirtualColumnLongValueReaderWriter) container.getForwardIndex());
       }
     }
-     buildOffsetToDocIdMap();
+     buildOffsetToDocIdMap(columnIndexContainerMap.get(segmentMetadata.getSchema().getOffsetKey()));
   }
 
-  private void buildOffsetToDocIdMap() {
-    final DataFileReader reader = _offsetColumnIndexContainer.getForwardIndex();
-    final Dictionary dictionary = _offsetColumnIndexContainer.getDictionary();
+  /** constructor used for creating instance in test cases
+   * should not be used for creating regular segment
+   */
+  @VisibleForTesting
+  protected ImmutableUpsertSegmentImpl(List<VirtualColumnLongValueReaderWriter> readerWriters,
+                                       int totalDoc, UpsertWaterMarkManager manager,
+                                       UpdateLogStorageProvider updateLogStorageProvider,
+                                       long minSourceOffset, int[] offsetToDocId) {
+    super(null, null, null, null);
+    _tableNameWithType = "testTable";
+    _segmentName = "testSegment";
+    _virtualColumnsReaderWriter = readerWriters;
+    _totalDoc = totalDoc;
+    _upsertWaterMarkManager = manager;
+    _updateLogStorageProvider = updateLogStorageProvider;
+    _minSourceOffset = minSourceOffset;
+    _sourceOffsetToDocIdArray = offsetToDocId;
+
+  }
+
+  private void buildOffsetToDocIdMap(ColumnIndexContainer offsetColumnIndexContainer) {
+    long start = System.currentTimeMillis();
+    final DataFileReader reader = offsetColumnIndexContainer.getForwardIndex();
+    final Dictionary dictionary = offsetColumnIndexContainer.getDictionary();
     Map<Long, Integer> kafkaOffsetToDocIdMap = new HashMap<>();
     long minOffset = Long.MAX_VALUE;
     long maxOffset = 0;
@@ -114,15 +133,13 @@ public class ImmutableUpsertSegmentImpl extends ImmutableSegmentImpl implements
       int size = Math.toIntExact(maxOffset - minOffset + 1);
       _sourceOffsetToDocIdArray = new int[size];
       for (int i = 0; i < size; i++) {
-        if (kafkaOffsetToDocIdMap.containsKey(i + minOffset)) {
-          _sourceOffsetToDocIdArray[i] = kafkaOffsetToDocIdMap.get(i + minOffset);
-        } else {
-          _sourceOffsetToDocIdArray[i] = -1;
-        }
+        _sourceOffsetToDocIdArray[i] = kafkaOffsetToDocIdMap.getOrDefault(i + minOffset,
+            DEFAULT_DOC_ID_FOR_MISSING_ENTRY);
       }
     } else {
       throw new RuntimeException("unexpected forward reader type for kafka offset column " + reader.getClass());
     }
+    LOGGER.info("built offset to DocId map for segment {} with {} documents in {} ms", _segmentName, _totalDoc, System.currentTimeMillis() - start);
   }
 
   public static ImmutableUpsertSegmentImpl copyOf(ImmutableSegmentImpl immutableSegment) {
@@ -163,32 +180,48 @@ public class ImmutableUpsertSegmentImpl extends ImmutableSegmentImpl implements
   @Override
   public void initVirtualColumn() throws IOException {
     long start = System.currentTimeMillis();
-    List<UpdateLogEntry> updateLogEntries = UpdateLogStorageProvider.getInstance().getAllMessages(_tableNameWithType, _segmentName);
-    Multimap<Long, UpdateLogEntry> updateLogEntryMap = ArrayListMultimap.create();
-    for (UpdateLogEntry logEntry: updateLogEntries) {
-      updateLogEntryMap.put(logEntry.getOffset(), logEntry);
-    }
-    for (int i = 0; i < _sourceOffsetToDocIdArray.length; i++) {
-      if (_sourceOffsetToDocIdArray[i] != -1) {
-        final long offset = i + _minSourceOffset;
-        final int docId = _sourceOffsetToDocIdArray[i];
-        if (updateLogEntryMap.containsKey(offset)) {
-          boolean updated = false;
-          Collection<UpdateLogEntry> entries = updateLogEntryMap.get(offset);
-          UpdateLogEntry lastEntry = null;
-          for (UpdateLogEntry entry : entries) {
-            lastEntry = entry;
-            for (VirtualColumnLongValueReaderWriter readerWriter : _virtualColumnsReaderWriter) {
-              updated = readerWriter.update(docId, entry.getValue(), entry.getType()) || updated;
+    final List<UpdateLogEntry> updateLogEntries = _updateLogStorageProvider.getAllMessages(_tableNameWithType, _segmentName);
+    LOGGER.info("load {} update log entry from update log storage provider for segment {} in {} ms",
+        updateLogEntries.size(), _segmentName, System.currentTimeMillis() - start);
+
+    start = System.currentTimeMillis();
+    final long maxOffset = _totalDoc + _minSourceOffset;
+    int unmatchedLogEntryCount = 0;
+    try {
+      Map<Integer, Long> partitionToHighestWatermark = new HashMap<>();
+      int readerWriteCount = _virtualColumnsReaderWriter.size();
+      for (UpdateLogEntry logEntry: updateLogEntries) {
+        final int partition = logEntry.getPartition();
+        final long offset = logEntry.getOffset();
+        if (offset >= _minSourceOffset && offset < maxOffset) {
+          final int docId = _sourceOffsetToDocIdArray[Math.toIntExact(offset - _minSourceOffset)];
+          if (docId != DEFAULT_DOC_ID_FOR_MISSING_ENTRY) {
+            // use traditional for loop over int instead of foreach to give hints for JIT to do loop unroll in byte code
+            for (int i = 0; i < readerWriteCount; i++) {
+              _virtualColumnsReaderWriter.get(i).update(docId, logEntry.getValue(), logEntry.getType());
             }
+            if (logEntry.getValue() > partitionToHighestWatermark.getOrDefault(partition, -1L)) {
+              partitionToHighestWatermark.put(partition, logEntry.getValue());
+            }
+          } else {
+            LOGGER.error("segment {} got in-range update log at offset {} but no matching docId", _segmentName, offset);
           }
-          if (updated) {
-            _upsertWaterMarkManager.processMessage(_tableNameWithType, _segmentName, lastEntry);
-          }
+        } else {
+          unmatchedLogEntryCount++;
         }
       }
+      if (unmatchedLogEntryCount > 0) {
+        LOGGER.info("segment {} encountered {} update logs that are outside of its range", _segmentName,
+            unmatchedLogEntryCount);
+      }
+      partitionToHighestWatermark.forEach((partition, value) ->
+          _upsertWaterMarkManager.processVersionUpdate(_tableNameWithType, partition, value));
+
+    } catch (Exception e) {
+      LOGGER.error("failed to load the offset with thread pool");
+      Utils.rethrowException(e);
     }
-    LOGGER.info("loaded {} update log entries for current immutable segment {} in {} ms", updateLogEntries.size(),
+    LOGGER.info("populated all log entries to virtual columns for current immutable segment {} in {} ms",
         _segmentName, System.currentTimeMillis() - start);
   }
 
@@ -205,7 +238,7 @@ public class ImmutableUpsertSegmentImpl extends ImmutableSegmentImpl implements
       throw new RuntimeException("offset outside range");
     } else {
       int position = Math.toIntExact(offset - _minSourceOffset);
-      if (_sourceOffsetToDocIdArray[position] == -1) {
+      if (_sourceOffsetToDocIdArray[position] == DEFAULT_DOC_ID_FOR_MISSING_ENTRY) {
         LOGGER.error("no docId associated with offset {} for segment {}", offset, _segmentName);
         throw new RuntimeException("docId not found");
       } else {
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/updater/UpsertWaterMarkManager.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/updater/UpsertWaterMarkManager.java
index 6d37322..bbea037 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/segment/updater/UpsertWaterMarkManager.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/updater/UpsertWaterMarkManager.java
@@ -22,7 +22,6 @@ import com.google.common.base.Joiner;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableMap;
 import org.apache.pinot.grigio.common.metrics.GrigioGauge;
-import org.apache.pinot.grigio.common.metrics.GrigioMeter;
 import org.apache.pinot.grigio.common.metrics.GrigioMetrics;
 import org.apache.pinot.grigio.common.storageProvider.UpdateLogEntry;
 import org.slf4j.Logger;
@@ -62,15 +61,15 @@ public class UpsertWaterMarkManager {
     }
     long version = logEntry.getValue();
     int partition = logEntry.getPartition();
-    Preconditions.checkState(partition >= 0, "logEntry has no partition info {} for table ", logEntry.toString(), table);
+    processVersionUpdate(table, partition, version);
+  }
 
+  public void processVersionUpdate(String table, int partition, long version) {
+    Preconditions.checkState(partition >= 0, "logEntry has invalid version {} for table {}",
+        version, table);
     Map<Integer, Long> partitionToHighWaterMark = _highWaterMarkTablePartitionMap.computeIfAbsent(table, t -> new ConcurrentHashMap<>());
     long currentVersion = partitionToHighWaterMark.getOrDefault(partition, -1L);  // assumes that valid version is non-negative
-    if (version < currentVersion) {
-      // We expect the version number to increase monotonically unless we are reprocessing previous seen messages.
-      _metrics.addMeteredGlobalValue(GrigioMeter.VERSION_LOWER_THAN_CURRENT, 1);
-      LOGGER.debug("The latest record {} has lower version than the current one for the table {} ", logEntry, table);
-    } else if (version > currentVersion) {
+    if (version > currentVersion) {
       partitionToHighWaterMark.put(partition, version);
       _metrics.setValueOfTableGauge(String.valueOf(partition), GrigioGauge.SERVER_VERSION_CONSUMED, version);
     }
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/virtualcolumn/mutable/ValidFromInMemoryVirtualColumnProvider.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/virtualcolumn/mutable/ValidFromInMemoryVirtualColumnProvider.java
index 2dedc8e..ebb45e7 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/segment/virtualcolumn/mutable/ValidFromInMemoryVirtualColumnProvider.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/virtualcolumn/mutable/ValidFromInMemoryVirtualColumnProvider.java
@@ -30,7 +30,8 @@ public class ValidFromInMemoryVirtualColumnProvider extends BaseLongVirtualColum
       @Override
       public boolean update(int docId, int value, LogEventType eventType) {
         if (eventType == LogEventType.INSERT) {
-          return updateValue(docId, value);
+          updateValue(docId, value);
+          return true;
         }
         return false;
       }
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/virtualcolumn/mutable/ValidUntilInMemoryVirtualColumnProvider.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/virtualcolumn/mutable/ValidUntilInMemoryVirtualColumnProvider.java
index 1a0856e..9aef94c 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/segment/virtualcolumn/mutable/ValidUntilInMemoryVirtualColumnProvider.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/virtualcolumn/mutable/ValidUntilInMemoryVirtualColumnProvider.java
@@ -30,7 +30,8 @@ public class ValidUntilInMemoryVirtualColumnProvider extends BaseLongVirtualColu
       @Override
       public boolean update(int docId, int offset, LogEventType eventType) {
         if (eventType == LogEventType.DELETE) {
-          return this.updateValue(docId, offset);
+          this.updateValue(docId, offset);
+          return true;
         }
         return false;
       }
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/virtualcolumn/mutable/VirtualColumnLongValueReaderWriter.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/virtualcolumn/mutable/VirtualColumnLongValueReaderWriter.java
index 75d1b74..78f3bda 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/segment/virtualcolumn/mutable/VirtualColumnLongValueReaderWriter.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/virtualcolumn/mutable/VirtualColumnLongValueReaderWriter.java
@@ -31,17 +31,19 @@ public abstract class VirtualColumnLongValueReaderWriter extends BaseVirtualColu
 
   private static final Logger LOGGER = LoggerFactory.getLogger(VirtualColumnLongValueReaderWriter.class);
 
-  private final VirtualColumnContext _context;
   private int _totalDocSize;
   private int _currentMaxDocId;
   private final int [] _values;
   private final int DEFAULT_NEW_VALUE = -1;
 
   public VirtualColumnLongValueReaderWriter(VirtualColumnContext context) {
-    _context = context;
-    _totalDocSize = context.getTotalDocCount();
+    this(context.getTotalDocCount(), context.isMutableSegment());
+  }
+
+  public VirtualColumnLongValueReaderWriter(int totalDocSize, boolean isMutableSegment) {
+    _totalDocSize = totalDocSize;
     _values = new int[_totalDocSize];
-    if (!_context.isMutableSegment()) {
+    if (!isMutableSegment) {
       Arrays.fill(_values, -1);
     }
   }
@@ -80,10 +82,11 @@ public abstract class VirtualColumnLongValueReaderWriter extends BaseVirtualColu
   }
 
   /**
-   * method to update the internal data to value at a given location
-   * synchronized to ensure the we will not modify the internal array at the same time from multiple threads
+   * update the internal data to value at a given location
+   * assume the update will be idempotent and we won't check if the existing data have been set
+   *
    */
-  protected synchronized boolean updateValue(int docId, int value) {
+  protected void updateValue(int docId, int value) {
     if (docId >= _totalDocSize) {
       throw new RuntimeException(String.format("new record docId %s is larger than capacity %s", docId, _totalDocSize));
     }
@@ -91,17 +94,12 @@ public abstract class VirtualColumnLongValueReaderWriter extends BaseVirtualColu
       throw new RuntimeException(String.format("failed to update virtual column: with value %s:%s we are trying to " +
               "update a value that has not been ingested yet, max doc id %s", docId, value, _currentMaxDocId));
     }
-    if (_values[docId] == value) {
-      return false;
-    } else {
-      _values[docId] = value;
-      return true;
-    }
+    _values[docId] = value;
   }
 
   // ensure backward compatibility
-  protected boolean updateValue(int docId, long value) {
-    return updateValue(docId, Math.toIntExact(value));
+  protected void updateValue(int docId, long value) {
+    updateValue(docId, Math.toIntExact(value));
   }
 
   public abstract boolean update(int docId, long value, LogEventType eventType);
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/indexsegment/immutable/ImmutableUpsertSegmentImplTest.java b/pinot-core/src/test/java/org/apache/pinot/core/indexsegment/immutable/ImmutableUpsertSegmentImplTest.java
new file mode 100644
index 0000000..4712625
--- /dev/null
+++ b/pinot-core/src/test/java/org/apache/pinot/core/indexsegment/immutable/ImmutableUpsertSegmentImplTest.java
@@ -0,0 +1,118 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.indexsegment.immutable;
+
+import org.apache.pinot.core.segment.updater.UpsertWaterMarkManager;
+import org.apache.pinot.core.segment.virtualcolumn.mutable.VirtualColumnLongValueReaderWriter;
+import org.apache.pinot.grigio.common.messages.LogEventType;
+import org.apache.pinot.grigio.common.storageProvider.UpdateLogEntry;
+import org.apache.pinot.grigio.common.storageProvider.UpdateLogStorageProvider;
+import org.testng.Assert;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+
+public class ImmutableUpsertSegmentImplTest {
+
+  UpdateLogStorageProvider _mockProvider;
+  UpsertWaterMarkManager _mockUpsertWaterMarkManager;
+  List<VirtualColumnLongValueReaderWriter> _readerWriters = new ArrayList<>();
+
+  @BeforeMethod
+  public void init() {
+    _mockProvider = mock(UpdateLogStorageProvider.class);
+    _mockUpsertWaterMarkManager = mock(UpsertWaterMarkManager.class);
+  }
+
+  @Test
+  public void testInitVirtualColumn() throws IOException {
+    long start = System.currentTimeMillis();
+    long minOffset = 5000_000l;
+    int totalDocs = 5_000_000;
+    _readerWriters.add(new VirtualColumnLongValueReaderWriter(totalDocs, false) {
+      @Override
+      public boolean update(int docId, long value, LogEventType eventType) {
+        if (eventType == LogEventType.INSERT) {
+          updateValue(docId, value);
+          return true;
+        }
+        return false;
+      }
+
+      @Override
+      public boolean update(int docId, int value, LogEventType eventType) {
+        return update(docId, value, eventType);
+      }
+    });
+    _readerWriters.add(new VirtualColumnLongValueReaderWriter(totalDocs, false) {
+      @Override
+      public boolean update(int docId, long value, LogEventType eventType) {
+        if (eventType == LogEventType.DELETE) {
+          updateValue(docId, value);
+          return true;
+        }
+        return false;
+      }
+
+      @Override
+      public boolean update(int docId, int value, LogEventType eventType) {
+        return update(docId, value, eventType);
+      }
+    });
+    int[] offsetToDocId = new int[totalDocs];
+    for (int i = 0; i < totalDocs; i++) {
+      offsetToDocId[i] = i;
+    }
+    List<UpdateLogEntry> updateLogEntries = new ArrayList<>(totalDocs * 2);
+    for (int i = 0; i < totalDocs; i++) {
+      updateLogEntries.add(new UpdateLogEntry(minOffset + i, 50, LogEventType.INSERT, i%8));
+      updateLogEntries.add(new UpdateLogEntry(minOffset + i, 100, LogEventType.DELETE, i%8));
+    }
+    when(_mockProvider.getAllMessages(anyString(), anyString())).thenReturn(updateLogEntries);
+    System.out.println("run time for set up: " + (System.currentTimeMillis() - start));
+
+    start = System.currentTimeMillis();
+
+    ImmutableUpsertSegmentImpl immutableUpsertSegment = new ImmutableUpsertSegmentImpl(_readerWriters, totalDocs,
+        _mockUpsertWaterMarkManager, _mockProvider, minOffset, offsetToDocId);
+
+    immutableUpsertSegment.initVirtualColumn();
+    long runtime = System.currentTimeMillis() - start;
+    System.out.println("run time is " + runtime);
+    Assert.assertTrue(runtime < 1_000L, "run time should be less than 1 second");
+
+    VirtualColumnLongValueReaderWriter insertReaderWrite = _readerWriters.get(0);
+    VirtualColumnLongValueReaderWriter deleteReaderWrite = _readerWriters.get(1);
+    for (int i = 0; i < totalDocs; i++) {
+      if (insertReaderWrite.getLong(i) != 50 || deleteReaderWrite.getLong(i) != 100) {
+        System.out.println(String.format("position %d has value %d/%d", i, insertReaderWrite.getLong(i),
+            deleteReaderWrite.getLong(i)));
+        Assert.fail("no correct value");
+      }
+    }
+  }
+}
\ No newline at end of file
diff --git a/pinot-distribution/pom.xml b/pinot-distribution/pom.xml
index ebf0146..f55e1a7 100644
--- a/pinot-distribution/pom.xml
+++ b/pinot-distribution/pom.xml
@@ -25,7 +25,7 @@
   <parent>
     <artifactId>pinot</artifactId>
     <groupId>org.apache.pinot</groupId>
-    <version>0.2.2.5.49-SNAPSHOT</version>
+    <version>0.2.2.5.57-SNAPSHOT</version>
   </parent>
   <artifactId>pinot-distribution</artifactId>
   <name>Pinot Distribution</name>
diff --git a/pinot-grigio/pinot-grigio-common/pom.xml b/pinot-grigio/pinot-grigio-common/pom.xml
index 42e07c0..0c98e85 100644
--- a/pinot-grigio/pinot-grigio-common/pom.xml
+++ b/pinot-grigio/pinot-grigio-common/pom.xml
@@ -23,7 +23,7 @@
   <parent>
     <artifactId>pinot-grigio</artifactId>
     <groupId>org.apache.pinot</groupId>
-    <version>0.2.2.5.49-SNAPSHOT</version>
+    <version>0.2.2.5.57-SNAPSHOT</version>
   </parent>
   <modelVersion>4.0.0</modelVersion>
 
diff --git a/pinot-grigio/pinot-grigio-common/src/main/java/org/apache/pinot/grigio/common/metrics/GrigioMeter.java b/pinot-grigio/pinot-grigio-common/src/main/java/org/apache/pinot/grigio/common/metrics/GrigioMeter.java
index ab04b6e..8a738da 100644
--- a/pinot-grigio/pinot-grigio-common/src/main/java/org/apache/pinot/grigio/common/metrics/GrigioMeter.java
+++ b/pinot-grigio/pinot-grigio-common/src/main/java/org/apache/pinot/grigio/common/metrics/GrigioMeter.java
@@ -29,7 +29,6 @@ public enum GrigioMeter implements AbstractMetrics.Meter {
 
   // segment updater metrics
   MESSAGE_FETCH_PER_ROUND("messages", MetricsType.SERVER_ONLY),
-  VERSION_LOWER_THAN_CURRENT("messages", MetricsType.SERVER_ONLY),
 
   // key coordinator related metrics
   MESSAGE_PRODUCE_FAILED_COUNT("message", MetricsType.KC_ONLY),
diff --git a/pinot-grigio/pinot-grigio-common/src/main/java/org/apache/pinot/grigio/common/storageProvider/SegmentUpdateLogStorageProvider.java b/pinot-grigio/pinot-grigio-common/src/main/java/org/apache/pinot/grigio/common/storageProvider/SegmentUpdateLogStorageProvider.java
index 2972473..02f2fba 100644
--- a/pinot-grigio/pinot-grigio-common/src/main/java/org/apache/pinot/grigio/common/storageProvider/SegmentUpdateLogStorageProvider.java
+++ b/pinot-grigio/pinot-grigio-common/src/main/java/org/apache/pinot/grigio/common/storageProvider/SegmentUpdateLogStorageProvider.java
@@ -58,6 +58,7 @@ public class SegmentUpdateLogStorageProvider {
   }
 
   public synchronized List<UpdateLogEntry> readAllMessagesFromFile() throws IOException {
+    long start = System.currentTimeMillis();
     int insertMessageCount = 0;
     int deleteMessageCount = 0;
     int fileLength = (int) _file.length();
@@ -76,7 +77,8 @@ public class SegmentUpdateLogStorageProvider {
         logs.add(logEntry);
       }
       buffer.clear();
-      LOGGER.info("loaded {} message from file, {} insert and {} delete", messageCount, insertMessageCount, deleteMessageCount);
+      LOGGER.info("loaded {} message from file, {} insert and {} delete in {} ms", messageCount, insertMessageCount,
+          deleteMessageCount, System.currentTimeMillis() - start);
       return logs;
     } else {
       return ImmutableList.of();
@@ -123,6 +125,7 @@ public class SegmentUpdateLogStorageProvider {
   }
 
   private synchronized void readFullyFromBeginning(File segmentUpdateFile, ByteBuffer buffer) throws IOException {
+    long start = System.currentTimeMillis();
     FileChannel channel = new RandomAccessFile(segmentUpdateFile, "r").getChannel();
     channel.position(0);
     long position = 0;
@@ -132,6 +135,8 @@ public class SegmentUpdateLogStorageProvider {
       position += byteRead;
     } while (byteRead != -1 && buffer.hasRemaining());
     buffer.flip();
+    LOGGER.info("read all data from segment update file {} to buffer in {} ms", segmentUpdateFile.getName(),
+        System.currentTimeMillis() - start);
   }
 
   /**
diff --git a/pinot-grigio/pinot-grigio-common/src/main/java/org/apache/pinot/grigio/common/storageProvider/UpdateLogStorageProvider.java b/pinot-grigio/pinot-grigio-common/src/main/java/org/apache/pinot/grigio/common/storageProvider/UpdateLogStorageProvider.java
index 73ba01f..d04b1a2 100644
--- a/pinot-grigio/pinot-grigio-common/src/main/java/org/apache/pinot/grigio/common/storageProvider/UpdateLogStorageProvider.java
+++ b/pinot-grigio/pinot-grigio-common/src/main/java/org/apache/pinot/grigio/common/storageProvider/UpdateLogStorageProvider.java
@@ -143,7 +143,6 @@ public class UpdateLogStorageProvider {
   }
 
   public List<UpdateLogEntry> getAllMessages(String tableName, String segmentName) throws IOException {
-    LOGGER.info("loading all message for table {} segment {}", tableName, segmentName);
     if (_virtualColumnStorage.containsKey(tableName)) {
       SegmentUpdateLogStorageProvider provider = _virtualColumnStorage.get(tableName).get(segmentName);
       if (provider != null) {
diff --git a/pinot-grigio/pinot-grigio-common/src/test/java/org/apache/pinot/grigio/common/storageProvider/SegmentUpdateLogStorageProviderTest.java b/pinot-grigio/pinot-grigio-common/src/test/java/org/apache/pinot/grigio/common/storageProvider/SegmentUpdateLogStorageProviderTest.java
index 69d30bc..64d4d3b 100644
--- a/pinot-grigio/pinot-grigio-common/src/test/java/org/apache/pinot/grigio/common/storageProvider/SegmentUpdateLogStorageProviderTest.java
+++ b/pinot-grigio/pinot-grigio-common/src/test/java/org/apache/pinot/grigio/common/storageProvider/SegmentUpdateLogStorageProviderTest.java
@@ -115,6 +115,21 @@ public class SegmentUpdateLogStorageProviderTest {
       Assert.assertEquals(updateLogEntries.get(i * inputDataList.size() + 1), inputDataList.get(1));
       Assert.assertEquals(updateLogEntries.get(i * inputDataList.size() + 2), inputDataList.get(2));
     }
+  }
 
+  @Test
+  public void testReadMesssagePerf() throws IOException {
+    int totalMessageCount = 5_000_000;
+    List<UpdateLogEntry> inputMessages = new ArrayList<>(totalMessageCount * 2);
+    for (int i = 0; i < totalMessageCount; i++) {
+      inputMessages.add(new UpdateLogEntry(i, 50, LogEventType.INSERT, i%8));
+      inputMessages.add(new UpdateLogEntry(i, 100, LogEventType.DELETE, i%8));
+    }
+    long start = System.currentTimeMillis();
+    provider.addData(inputMessages);
+    System.out.println("write data takes ms: " + (System.currentTimeMillis() - start));
+    start = System.currentTimeMillis();
+    provider.readAllMessagesFromFile();
+    System.out.println("read data takes ms: " + (System.currentTimeMillis() - start));
   }
 }
\ No newline at end of file
diff --git a/pinot-grigio/pinot-grigio-coordinator/pom.xml b/pinot-grigio/pinot-grigio-coordinator/pom.xml
index f5b9527..7ef66ce 100644
--- a/pinot-grigio/pinot-grigio-coordinator/pom.xml
+++ b/pinot-grigio/pinot-grigio-coordinator/pom.xml
@@ -23,7 +23,7 @@
   <parent>
     <artifactId>pinot-grigio</artifactId>
     <groupId>org.apache.pinot</groupId>
-    <version>0.2.2.5.49-SNAPSHOT</version>
+    <version>0.2.2.5.57-SNAPSHOT</version>
   </parent>
   <modelVersion>4.0.0</modelVersion>
   <artifactId>pinot-grigio-coordinator</artifactId>
diff --git a/pinot-grigio/pom.xml b/pinot-grigio/pom.xml
index 3cdad0c..2d3a6a7 100644
--- a/pinot-grigio/pom.xml
+++ b/pinot-grigio/pom.xml
@@ -23,7 +23,7 @@
   <parent>
     <artifactId>pinot</artifactId>
     <groupId>org.apache.pinot</groupId>
-    <version>0.2.2.5.49-SNAPSHOT</version>
+    <version>0.2.2.5.57-SNAPSHOT</version>
   </parent>
   <modelVersion>4.0.0</modelVersion>
   <artifactId>pinot-grigio</artifactId>
diff --git a/pinot-hadoop-filesystem/pom.xml b/pinot-hadoop-filesystem/pom.xml
index 49b0a8a..0e0f03d 100644
--- a/pinot-hadoop-filesystem/pom.xml
+++ b/pinot-hadoop-filesystem/pom.xml
@@ -24,7 +24,7 @@
   <parent>
     <artifactId>pinot</artifactId>
     <groupId>org.apache.pinot</groupId>
-    <version>0.2.2.5.49-SNAPSHOT</version>
+    <version>0.2.2.5.57-SNAPSHOT</version>
   </parent>
   <artifactId>pinot-hadoop-filesystem</artifactId>
   <name>Pinot Hadoop Filesystem</name>
diff --git a/pinot-hadoop/pom.xml b/pinot-hadoop/pom.xml
index 928a11b..13691d8 100644
--- a/pinot-hadoop/pom.xml
+++ b/pinot-hadoop/pom.xml
@@ -24,7 +24,7 @@
   <parent>
     <artifactId>pinot</artifactId>
     <groupId>org.apache.pinot</groupId>
-    <version>0.2.2.5.49-SNAPSHOT</version>
+    <version>0.2.2.5.57-SNAPSHOT</version>
   </parent>
   <artifactId>pinot-hadoop</artifactId>
   <name>Pinot Hadoop</name>
diff --git a/pinot-integration-tests/pom.xml b/pinot-integration-tests/pom.xml
index 9d7cb12..8ea6a8d 100644
--- a/pinot-integration-tests/pom.xml
+++ b/pinot-integration-tests/pom.xml
@@ -24,7 +24,7 @@
   <parent>
     <artifactId>pinot</artifactId>
     <groupId>org.apache.pinot</groupId>
-    <version>0.2.2.5.49-SNAPSHOT</version>
+    <version>0.2.2.5.57-SNAPSHOT</version>
   </parent>
   <artifactId>pinot-integration-tests</artifactId>
   <name>Pinot Integration Tests</name>
diff --git a/pinot-minion/pom.xml b/pinot-minion/pom.xml
index c057094..72b1d85 100644
--- a/pinot-minion/pom.xml
+++ b/pinot-minion/pom.xml
@@ -24,7 +24,7 @@
   <parent>
     <artifactId>pinot</artifactId>
     <groupId>org.apache.pinot</groupId>
-    <version>0.2.2.5.49-SNAPSHOT</version>
+    <version>0.2.2.5.57-SNAPSHOT</version>
   </parent>
   <artifactId>pinot-minion</artifactId>
   <name>Pinot Minion</name>
diff --git a/pinot-orc/pom.xml b/pinot-orc/pom.xml
index 34a8740..9508747 100644
--- a/pinot-orc/pom.xml
+++ b/pinot-orc/pom.xml
@@ -23,7 +23,7 @@
   <parent>
     <artifactId>pinot</artifactId>
     <groupId>org.apache.pinot</groupId>
-    <version>0.2.2.5.49-SNAPSHOT</version>
+    <version>0.2.2.5.57-SNAPSHOT</version>
   </parent>
   <modelVersion>4.0.0</modelVersion>
 
diff --git a/pinot-parquet/pom.xml b/pinot-parquet/pom.xml
index 2d7dbb0..577e283 100644
--- a/pinot-parquet/pom.xml
+++ b/pinot-parquet/pom.xml
@@ -23,7 +23,7 @@
   <parent>
     <artifactId>pinot</artifactId>
     <groupId>org.apache.pinot</groupId>
-    <version>0.2.2.5.49-SNAPSHOT</version>
+    <version>0.2.2.5.57-SNAPSHOT</version>
   </parent>
   <modelVersion>4.0.0</modelVersion>
 
diff --git a/pinot-perf/pom.xml b/pinot-perf/pom.xml
index 060ebaf..704f659 100644
--- a/pinot-perf/pom.xml
+++ b/pinot-perf/pom.xml
@@ -24,7 +24,7 @@
   <parent>
     <artifactId>pinot</artifactId>
     <groupId>org.apache.pinot</groupId>
-    <version>0.2.2.5.49-SNAPSHOT</version>
+    <version>0.2.2.5.57-SNAPSHOT</version>
   </parent>
   <artifactId>pinot-perf</artifactId>
   <name>Pinot Perf</name>
diff --git a/pinot-server/pom.xml b/pinot-server/pom.xml
index ac77f20..0503db3 100644
--- a/pinot-server/pom.xml
+++ b/pinot-server/pom.xml
@@ -24,7 +24,7 @@
   <parent>
     <artifactId>pinot</artifactId>
     <groupId>org.apache.pinot</groupId>
-    <version>0.2.2.5.49-SNAPSHOT</version>
+    <version>0.2.2.5.57-SNAPSHOT</version>
   </parent>
   <artifactId>pinot-server</artifactId>
   <name>Pinot Server</name>
diff --git a/pinot-tools/pom.xml b/pinot-tools/pom.xml
index 4b4e686..383cc1f 100644
--- a/pinot-tools/pom.xml
+++ b/pinot-tools/pom.xml
@@ -24,7 +24,7 @@
   <parent>
     <artifactId>pinot</artifactId>
     <groupId>org.apache.pinot</groupId>
-    <version>0.2.2.5.49-SNAPSHOT</version>
+    <version>0.2.2.5.57-SNAPSHOT</version>
   </parent>
   <artifactId>pinot-tools</artifactId>
   <name>Pinot Tools</name>
diff --git a/pinot-transport/pom.xml b/pinot-transport/pom.xml
index b977921..5359b71 100644
--- a/pinot-transport/pom.xml
+++ b/pinot-transport/pom.xml
@@ -24,7 +24,7 @@
   <parent>
     <artifactId>pinot</artifactId>
     <groupId>org.apache.pinot</groupId>
-    <version>0.2.2.5.49-SNAPSHOT</version>
+    <version>0.2.2.5.57-SNAPSHOT</version>
   </parent>
   <artifactId>pinot-transport</artifactId>
   <name>Pinot Transport</name>
diff --git a/pom.xml b/pom.xml
index 77e8647..66af82b 100644
--- a/pom.xml
+++ b/pom.xml
@@ -30,7 +30,7 @@
 
   <groupId>org.apache.pinot</groupId>
   <artifactId>pinot</artifactId>
-  <version>0.2.2.5.49-SNAPSHOT</version>
+  <version>0.2.2.5.57-SNAPSHOT</version>
   <packaging>pom</packaging>
   <name>Pinot</name>
   <description>A realtime distributed OLAP datastore</description>


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org