You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by GitBox <gi...@apache.org> on 2021/06/14 07:16:50 UTC

[GitHub] [incubator-pinot] yupeng9 commented on a change in pull request #6899: Add partial upsert config and mergers

yupeng9 commented on a change in pull request #6899:
URL: https://github.com/apache/incubator-pinot/pull/6899#discussion_r650204656



##########
File path: pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/PartialUpsertHandler.java
##########
@@ -0,0 +1,138 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.upsert;
+
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixManager;
+import org.apache.helix.PropertyKey;
+import org.apache.helix.model.CurrentState;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.LiveInstance;
+import org.apache.pinot.segment.local.upsert.merger.PartialUpsertMerger;
+import org.apache.pinot.segment.local.upsert.merger.PartialUpsertMergerFactory;
+import org.apache.pinot.spi.config.table.UpsertConfig;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.spi.utils.CommonConstants.Helix.StateModel.SegmentStateModel;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class PartialUpsertHandler {

Review comment:
       add javadoc

##########
File path: pinot-core/src/test/java/org/apache/pinot/core/util/TableConfigUtilsTest.java
##########
@@ -1098,4 +1105,46 @@ public void testValidateUpsertConfig() {
       Assert.assertEquals(e.getMessage(), "The upsert table cannot have star-tree index.");
     }
   }
+
+  @Test
+  public void testValidatePartialUpsertConfig() {
+    Schema schema =
+        new Schema.SchemaBuilder().setSchemaName(TABLE_NAME).addSingleValueDimension("myCol1", FieldSpec.DataType.LONG)
+            .addSingleValueDimension("myCol2", FieldSpec.DataType.STRING)
+            .setPrimaryKeyColumns(Lists.newArrayList("myCol1")).build();
+
+    Map<String, String> streamConfigs = new HashMap<>();
+    streamConfigs.put("stream.kafka.consumer.type", "highLevel");
+    streamConfigs.put("streamType", "kafka");
+    streamConfigs.put("stream.kafka.topic.name", "test");
+    streamConfigs
+        .put("stream.kafka.decoder.class.name", "org.apache.pinot.plugin.stream.kafka.KafkaJSONMessageDecoder");
+    streamConfigs.put("stream.kafka.consumer.type", "simple");
+
+    TableConfig tableConfig = new TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME)
+        .setUpsertConfig(new UpsertConfig(UpsertConfig.Mode.PARTIAL)).setNullHandlingEnabled(false)
+        .setRoutingConfig(new RoutingConfig(null, null, RoutingConfig.STRICT_REPLICA_GROUP_INSTANCE_SELECTOR_TYPE))
+        .setStreamConfigs(streamConfigs).build();
+    try {
+      TableConfigUtils.validateUpsertConfig(tableConfig, schema);
+    } catch (Exception e) {
+      Assert.assertEquals(e.getMessage(), "NullValueHandling is required to be enabled for partial upsert tables.");
+    }
+
+    Map<String, UpsertConfig.Strategy> partialUpsertStratgies = new HashMap<>();
+    partialUpsertStratgies.put("myCol1", UpsertConfig.Strategy.INCREMENT);
+    try {
+      TableConfigUtils.validateUpsertConfig(tableConfig, schema);
+    } catch (Exception e) {
+      Assert.assertEquals(e.getMessage(), "INCREMENT merger cannot be applied to PK.");
+    }
+
+    partialUpsertStratgies = new HashMap<>();
+    partialUpsertStratgies.put("myCol2", UpsertConfig.Strategy.INCREMENT);
+    try {
+      TableConfigUtils.validateUpsertConfig(tableConfig, schema);
+    } catch (Exception e) {
+      Assert.assertEquals(e.getMessage(), "INCREMENT merger should be numeric data types.");

Review comment:
       can increment merge be applied on time type?

##########
File path: pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/PartialUpsertHandler.java
##########
@@ -0,0 +1,138 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.upsert;
+
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixManager;
+import org.apache.helix.PropertyKey;
+import org.apache.helix.model.CurrentState;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.LiveInstance;
+import org.apache.pinot.segment.local.upsert.merger.PartialUpsertMerger;
+import org.apache.pinot.segment.local.upsert.merger.PartialUpsertMergerFactory;
+import org.apache.pinot.spi.config.table.UpsertConfig;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.spi.utils.CommonConstants.Helix.StateModel.SegmentStateModel;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class PartialUpsertHandler {
+  private static final Logger LOGGER = LoggerFactory.getLogger(PartialUpsertHandler.class);
+
+  private final Map<String, PartialUpsertMerger> _mergers = new HashMap<>();
+
+  private final HelixManager _helixManager;
+  private final String _tableNameWithType;
+  private boolean _allSegmentsLoaded;

Review comment:
       this seems a cache, do we ever need to reset it?

##########
File path: pinot-spi/src/main/java/org/apache/pinot/spi/config/table/UpsertConfig.java
##########
@@ -30,16 +32,37 @@
     FULL, PARTIAL, NONE
   }
 
+  public enum Strategy {
+    OVERWRITE, INCREMENT

Review comment:
       add a TODO for custom

##########
File path: pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/merger/PartialUpsertMergerFactory.java
##########
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.upsert.merger;
+
+import org.apache.pinot.spi.config.table.UpsertConfig;
+
+
+public class PartialUpsertMergerFactory {
+  private PartialUpsertMergerFactory() {
+  }
+
+  private static final OverwriteMerger OVERWRITE_MERGER = new OverwriteMerger();
+  private static final IncrementMerger INCREMENT_MERGER = new IncrementMerger();
+
+  public static PartialUpsertMerger getMerger(UpsertConfig.Strategy strategy) {
+    switch (strategy) {
+      case OVERWRITE:
+        return OVERWRITE_MERGER;
+      case INCREMENT:
+        return INCREMENT_MERGER;

Review comment:
       do we have a test case to cover the case that the previous value does not exist, and the merger uses the increment value directly (instead of adding to the null-default value)?

##########
File path: pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
##########
@@ -350,6 +357,36 @@ public static void validateUpsertConfig(TableConfig tableConfig, Schema schema)
             .getIndexingConfig().isEnableDefaultStarTree(), "The upsert table cannot have star-tree index.");
   }
 
+  /**
+   * Validates the partial upsert-related configurations
+   *  - INCREMENT merger cannot be applied to PK.
+   *  - INCREMENT merger should be numeric data types.
+   *  - enforce nullValueHandling for partial upsert tables.
+   */
+  private static void validatePartialUpsertStrategies(Schema schema, TableConfig tableConfig) {
+    if (tableConfig.getUpsertMode() != UpsertConfig.Mode.PARTIAL) {
+      return;
+    }
+
+    Preconditions.checkState(tableConfig.getIndexingConfig().isNullHandlingEnabled(),
+        "NullValueHandling is required to be enabled for partial upsert tables.");
+
+    Map<String, UpsertConfig.Strategy> partialUpsertStrategies =
+        tableConfig.getUpsertConfig().getPartialUpsertStrategies();
+
+    for (Map.Entry<String, UpsertConfig.Strategy> entry : partialUpsertStrategies.entrySet()) {
+      Set<FieldSpec.DataType> numericsDataType = new HashSet<>(Arrays.asList(INT, LONG, FLOAT, DOUBLE));

Review comment:
       shall allow datetime

##########
File path: pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java
##########
@@ -178,20 +175,20 @@ public MutableSegmentImpl(RealtimeSegmentConfig config, @Nullable ServerMetrics
             realtimeSegmentZKMetadata.getEndTime(), realtimeSegmentZKMetadata.getTimeUnit(),
             realtimeSegmentZKMetadata.getTotalDocs(), realtimeSegmentZKMetadata.getCrc(), _schema) {
           @Override
-      public int getTotalDocs() {
-        return _numDocsIndexed;
-      }
+          public int getTotalDocs() {
+            return _numDocsIndexed;
+          }
 
-      @Override
-      public long getLastIndexedTimestamp() {
-        return _lastIndexedTimeMs;
-      }
+          @Override
+          public long getLastIndexedTimestamp() {
+            return _lastIndexedTimeMs;
+          }
 
-      @Override
-      public long getLatestIngestionTimestamp() {
-        return _latestIngestionTimeMs;
-      }
-    };
+          @Override
+          public long getLatestIngestionTimestamp() {
+            return _latestIngestionTimeMs;

Review comment:
       why indentation change?

##########
File path: pinot-core/src/test/java/org/apache/pinot/core/util/TableConfigUtilsTest.java
##########
@@ -984,17 +989,19 @@ public void testValidateIndexingConfig() {
       // Expected
     }
 
-    tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).setVarLengthDictionaryColumns(Arrays.asList("intCol")).
-        build();
+    tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME)
+        .setVarLengthDictionaryColumns(Arrays.asList("intCol")).

Review comment:
       curious, are you using a different formatter that resulted in these changes?

##########
File path: pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/PartitionUpsertMetadataManager.java
##########
@@ -60,128 +64,175 @@
   private final String _tableNameWithType;
   private final int _partitionId;
   private final ServerMetrics _serverMetrics;
+  private final PartialUpsertHandler _partialUpsertHandler;
 
-  public PartitionUpsertMetadataManager(String tableNameWithType, int partitionId, ServerMetrics serverMetrics) {
+  // TODO(upset): consider an off-heap KV store to persist this index to improve the recovery speed.
+  @VisibleForTesting
+  final ConcurrentHashMap<PrimaryKey, RecordLocation> _primaryKeyToRecordLocationMap = new ConcurrentHashMap<>();
+
+  // Reused for reading previous record during partial upsert
+  private final GenericRow _reuse = new GenericRow();
+  // Stores the result of updateRecord()
+  private GenericRow _result;
+
+  public PartitionUpsertMetadataManager(String tableNameWithType, int partitionId, ServerMetrics serverMetrics,
+      @Nullable PartialUpsertHandler partialUpsertHandler) {
     _tableNameWithType = tableNameWithType;
     _partitionId = partitionId;
     _serverMetrics = serverMetrics;
+    _partialUpsertHandler = partialUpsertHandler;
   }
 
-  public ConcurrentHashMap<PrimaryKey, RecordLocation> getPrimaryKeyToRecordLocationMap() {
-    return _primaryKeyToRecordLocationMap;
+  /**
+   * Initializes the upsert metadata for the given immutable segment.
+   */
+  public void addSegment(ImmutableSegmentImpl immutableSegment, Iterator<RecordInfo> recordInfoIterator) {
+    addSegment(immutableSegment, recordInfoIterator, new ThreadSafeMutableRoaringBitmap());
   }
 
-  // TODO(upset): consider an off-heap KV store to persist this index to improve the recovery speed.
   @VisibleForTesting
-  final ConcurrentHashMap<PrimaryKey, RecordLocation> _primaryKeyToRecordLocationMap = new ConcurrentHashMap<>();
-
-  /**
-   * Initializes the upsert metadata for the given immutable segment, returns the valid doc ids for the segment.
-   */
-  public ThreadSafeMutableRoaringBitmap addSegment(String segmentName, Iterator<RecordInfo> recordInfoIterator) {
+  void addSegment(ImmutableSegmentImpl immutableSegment, Iterator<RecordInfo> recordInfoIterator,
+      ThreadSafeMutableRoaringBitmap validDocIds) {
+    String segmentName = immutableSegment.getSegmentName();
     LOGGER.info("Adding upsert metadata for segment: {}", segmentName);
+    immutableSegment.enableUpsert(this, validDocIds);
 
-    ThreadSafeMutableRoaringBitmap validDocIds = new ThreadSafeMutableRoaringBitmap();
     while (recordInfoIterator.hasNext()) {
       RecordInfo recordInfo = recordInfoIterator.next();
       _primaryKeyToRecordLocationMap.compute(recordInfo._primaryKey, (primaryKey, currentRecordLocation) -> {
         if (currentRecordLocation != null) {
           // Existing primary key
 
-          if (segmentName.equals(currentRecordLocation.getSegmentName())) {
-            // The current record location has the same segment name
-
-            // Update the record location when the new timestamp is greater than or equal to the current timestamp.
-            // There are 2 scenarios:
-            //   1. The current record location is pointing to the same segment (the segment being added). In this case,
-            //      we want to update the record location when there is a tie to keep the newer record. Note that the
-            //      record info iterator will return records with incremental doc ids.
-            //   2. The current record location is pointing to the old segment being replaced. This could happen when
-            //      committing a consuming segment, or reloading a completed segment. In this case, we want to update
-            //      the record location when there is a tie because the record locations should point to the new added
-            //      segment instead of the old segment being replaced. Also, do not update the valid doc ids for the old
-            //      segment because it has not been replaced yet.
+          // The current record is in the same segment
+          // Update the record location when there is a tie to keep the newer record. Note that the record info iterator
+          // will return records with incremental doc ids.
+          IndexSegment currentSegment = currentRecordLocation.getSegment();
+          if (immutableSegment == currentSegment) {
             if (recordInfo._timestamp >= currentRecordLocation.getTimestamp()) {
-              // Only update the valid doc ids for the new segment
-              if (validDocIds == currentRecordLocation.getValidDocIds()) {
-                validDocIds.remove(currentRecordLocation.getDocId());
-              }
+              validDocIds.remove(currentRecordLocation.getDocId());
               validDocIds.add(recordInfo._docId);
-              return new RecordLocation(segmentName, recordInfo._docId, recordInfo._timestamp, validDocIds);
+              return new RecordLocation(immutableSegment, recordInfo._docId, recordInfo._timestamp);
             } else {
               return currentRecordLocation;
             }
-          } else {
-            // The current record location is pointing to a different segment
-
-            // Update the record location when getting a newer timestamp, or the timestamp is the same as the current
-            // timestamp, but the segment has a larger sequence number (the segment is newer than the current segment).
-            if (recordInfo._timestamp > currentRecordLocation.getTimestamp() || (
-                recordInfo._timestamp == currentRecordLocation.getTimestamp()
-                    && LLCSegmentName.isLowLevelConsumerSegmentName(segmentName)
-                    && LLCSegmentName.isLowLevelConsumerSegmentName(currentRecordLocation.getSegmentName())
-                    && LLCSegmentName.getSequenceNumber(segmentName) > LLCSegmentName
-                    .getSequenceNumber(currentRecordLocation.getSegmentName()))) {
-              currentRecordLocation.getValidDocIds().remove(currentRecordLocation.getDocId());
+          }
+
+          // The current record is in an old segment being replaced
+          // This could happen when committing a consuming segment, or reloading a completed segment. In this case, we
+          // want to update the record location when there is a tie because the record locations should point to the new
+          // added segment instead of the old segment being replaced. Also, do not update the valid doc ids for the old
+          // segment because it has not been replaced yet.
+          String currentSegmentName = currentSegment.getSegmentName();
+          if (segmentName.equals(currentSegmentName)) {
+            if (recordInfo._timestamp >= currentRecordLocation.getTimestamp()) {
               validDocIds.add(recordInfo._docId);
-              return new RecordLocation(segmentName, recordInfo._docId, recordInfo._timestamp, validDocIds);
+              return new RecordLocation(immutableSegment, recordInfo._docId, recordInfo._timestamp);
             } else {
               return currentRecordLocation;
             }
           }
+
+          // The current record is in a different segment
+          // Update the record location when getting a newer timestamp, or the timestamp is the same as the current
+          // timestamp, but the segment has a larger sequence number (the segment is newer than the current segment).
+          if (recordInfo._timestamp > currentRecordLocation.getTimestamp() || (
+              recordInfo._timestamp == currentRecordLocation.getTimestamp() && LLCSegmentName
+                  .isLowLevelConsumerSegmentName(segmentName) && LLCSegmentName
+                  .isLowLevelConsumerSegmentName(currentSegmentName)
+                  && LLCSegmentName.getSequenceNumber(segmentName) > LLCSegmentName
+                  .getSequenceNumber(currentSegmentName))) {
+            assert currentSegment.getValidDocIds() != null;
+            currentSegment.getValidDocIds().remove(currentRecordLocation.getDocId());
+            validDocIds.add(recordInfo._docId);
+            return new RecordLocation(immutableSegment, recordInfo._docId, recordInfo._timestamp);
+          } else {
+            return currentRecordLocation;
+          }
         } else {
           // New primary key
           validDocIds.add(recordInfo._docId);
-          return new RecordLocation(segmentName, recordInfo._docId, recordInfo._timestamp, validDocIds);
+          return new RecordLocation(immutableSegment, recordInfo._docId, recordInfo._timestamp);
         }
       });
     }
     // Update metrics
     _serverMetrics.setValueOfPartitionGauge(_tableNameWithType, _partitionId, ServerGauge.UPSERT_PRIMARY_KEYS_COUNT,
         _primaryKeyToRecordLocationMap.size());
-    return validDocIds;
   }
 
   /**
-   * Updates the upsert metadata for a new consumed record in the given consuming segment.
+   * Updates the upsert metadata for a new consumed record in the given consuming segment. Returns the merged record if
+   * partial-upsert is enabled.
    */
-  public void updateRecord(String segmentName, RecordInfo recordInfo, ThreadSafeMutableRoaringBitmap validDocIds) {
+  public GenericRow updateRecord(IndexSegment segment, RecordInfo recordInfo, GenericRow record) {
+    // For partial-upsert, need to ensure all previous records are loaded before inserting new records.
+    if (_partialUpsertHandler != null) {
+      while (!_partialUpsertHandler.isAllSegmentsLoaded()) {
+        LOGGER
+            .info("Sleeping 1 second waiting for all segments loaded for partial-upsert table: {}", _tableNameWithType);
+        try {
+          //noinspection BusyWait
+          Thread.sleep(1000L);
+        } catch (InterruptedException e) {
+          throw new RuntimeException(e);
+        }
+      }
+    }
+
+    _result = record;
     _primaryKeyToRecordLocationMap.compute(recordInfo._primaryKey, (primaryKey, currentRecordLocation) -> {
       if (currentRecordLocation != null) {
         // Existing primary key
 
         // Update the record location when the new timestamp is greater than or equal to the current timestamp. Update
         // the record location when there is a tie to keep the newer record.
         if (recordInfo._timestamp >= currentRecordLocation.getTimestamp()) {
-          currentRecordLocation.getValidDocIds().remove(currentRecordLocation.getDocId());
-          validDocIds.add(recordInfo._docId);
-          return new RecordLocation(segmentName, recordInfo._docId, recordInfo._timestamp, validDocIds);
+          IndexSegment currentSegment = currentRecordLocation.getSegment();
+          if (_partialUpsertHandler != null) {
+            // Partial upsert
+            GenericRow previousRecord = currentSegment.getRecord(currentRecordLocation.getDocId(), _reuse);

Review comment:
       can `previousRecord ` be null if not found?

##########
File path: pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/PartitionUpsertMetadataManager.java
##########
@@ -60,128 +64,175 @@
   private final String _tableNameWithType;
   private final int _partitionId;
   private final ServerMetrics _serverMetrics;
+  private final PartialUpsertHandler _partialUpsertHandler;
 
-  public PartitionUpsertMetadataManager(String tableNameWithType, int partitionId, ServerMetrics serverMetrics) {
+  // TODO(upset): consider an off-heap KV store to persist this index to improve the recovery speed.
+  @VisibleForTesting
+  final ConcurrentHashMap<PrimaryKey, RecordLocation> _primaryKeyToRecordLocationMap = new ConcurrentHashMap<>();
+
+  // Reused for reading previous record during partial upsert
+  private final GenericRow _reuse = new GenericRow();
+  // Stores the result of updateRecord()
+  private GenericRow _result;
+
+  public PartitionUpsertMetadataManager(String tableNameWithType, int partitionId, ServerMetrics serverMetrics,
+      @Nullable PartialUpsertHandler partialUpsertHandler) {
     _tableNameWithType = tableNameWithType;
     _partitionId = partitionId;
     _serverMetrics = serverMetrics;
+    _partialUpsertHandler = partialUpsertHandler;
   }
 
-  public ConcurrentHashMap<PrimaryKey, RecordLocation> getPrimaryKeyToRecordLocationMap() {
-    return _primaryKeyToRecordLocationMap;
+  /**
+   * Initializes the upsert metadata for the given immutable segment.
+   */
+  public void addSegment(ImmutableSegmentImpl immutableSegment, Iterator<RecordInfo> recordInfoIterator) {
+    addSegment(immutableSegment, recordInfoIterator, new ThreadSafeMutableRoaringBitmap());
   }
 
-  // TODO(upset): consider an off-heap KV store to persist this index to improve the recovery speed.
   @VisibleForTesting
-  final ConcurrentHashMap<PrimaryKey, RecordLocation> _primaryKeyToRecordLocationMap = new ConcurrentHashMap<>();
-
-  /**
-   * Initializes the upsert metadata for the given immutable segment, returns the valid doc ids for the segment.
-   */
-  public ThreadSafeMutableRoaringBitmap addSegment(String segmentName, Iterator<RecordInfo> recordInfoIterator) {
+  void addSegment(ImmutableSegmentImpl immutableSegment, Iterator<RecordInfo> recordInfoIterator,
+      ThreadSafeMutableRoaringBitmap validDocIds) {
+    String segmentName = immutableSegment.getSegmentName();
     LOGGER.info("Adding upsert metadata for segment: {}", segmentName);
+    immutableSegment.enableUpsert(this, validDocIds);
 
-    ThreadSafeMutableRoaringBitmap validDocIds = new ThreadSafeMutableRoaringBitmap();
     while (recordInfoIterator.hasNext()) {
       RecordInfo recordInfo = recordInfoIterator.next();
       _primaryKeyToRecordLocationMap.compute(recordInfo._primaryKey, (primaryKey, currentRecordLocation) -> {
         if (currentRecordLocation != null) {
           // Existing primary key
 
-          if (segmentName.equals(currentRecordLocation.getSegmentName())) {
-            // The current record location has the same segment name
-
-            // Update the record location when the new timestamp is greater than or equal to the current timestamp.
-            // There are 2 scenarios:
-            //   1. The current record location is pointing to the same segment (the segment being added). In this case,
-            //      we want to update the record location when there is a tie to keep the newer record. Note that the
-            //      record info iterator will return records with incremental doc ids.
-            //   2. The current record location is pointing to the old segment being replaced. This could happen when
-            //      committing a consuming segment, or reloading a completed segment. In this case, we want to update
-            //      the record location when there is a tie because the record locations should point to the new added
-            //      segment instead of the old segment being replaced. Also, do not update the valid doc ids for the old
-            //      segment because it has not been replaced yet.
+          // The current record is in the same segment
+          // Update the record location when there is a tie to keep the newer record. Note that the record info iterator
+          // will return records with incremental doc ids.
+          IndexSegment currentSegment = currentRecordLocation.getSegment();
+          if (immutableSegment == currentSegment) {
             if (recordInfo._timestamp >= currentRecordLocation.getTimestamp()) {
-              // Only update the valid doc ids for the new segment
-              if (validDocIds == currentRecordLocation.getValidDocIds()) {
-                validDocIds.remove(currentRecordLocation.getDocId());
-              }
+              validDocIds.remove(currentRecordLocation.getDocId());
               validDocIds.add(recordInfo._docId);
-              return new RecordLocation(segmentName, recordInfo._docId, recordInfo._timestamp, validDocIds);
+              return new RecordLocation(immutableSegment, recordInfo._docId, recordInfo._timestamp);
             } else {
               return currentRecordLocation;
             }
-          } else {
-            // The current record location is pointing to a different segment
-
-            // Update the record location when getting a newer timestamp, or the timestamp is the same as the current
-            // timestamp, but the segment has a larger sequence number (the segment is newer than the current segment).
-            if (recordInfo._timestamp > currentRecordLocation.getTimestamp() || (
-                recordInfo._timestamp == currentRecordLocation.getTimestamp()
-                    && LLCSegmentName.isLowLevelConsumerSegmentName(segmentName)
-                    && LLCSegmentName.isLowLevelConsumerSegmentName(currentRecordLocation.getSegmentName())
-                    && LLCSegmentName.getSequenceNumber(segmentName) > LLCSegmentName
-                    .getSequenceNumber(currentRecordLocation.getSegmentName()))) {
-              currentRecordLocation.getValidDocIds().remove(currentRecordLocation.getDocId());
+          }
+
+          // The current record is in an old segment being replaced
+          // This could happen when committing a consuming segment, or reloading a completed segment. In this case, we
+          // want to update the record location when there is a tie because the record locations should point to the new
+          // added segment instead of the old segment being replaced. Also, do not update the valid doc ids for the old
+          // segment because it has not been replaced yet.
+          String currentSegmentName = currentSegment.getSegmentName();
+          if (segmentName.equals(currentSegmentName)) {
+            if (recordInfo._timestamp >= currentRecordLocation.getTimestamp()) {
               validDocIds.add(recordInfo._docId);
-              return new RecordLocation(segmentName, recordInfo._docId, recordInfo._timestamp, validDocIds);
+              return new RecordLocation(immutableSegment, recordInfo._docId, recordInfo._timestamp);
             } else {
               return currentRecordLocation;
             }
           }
+
+          // The current record is in a different segment
+          // Update the record location when getting a newer timestamp, or the timestamp is the same as the current
+          // timestamp, but the segment has a larger sequence number (the segment is newer than the current segment).
+          if (recordInfo._timestamp > currentRecordLocation.getTimestamp() || (
+              recordInfo._timestamp == currentRecordLocation.getTimestamp() && LLCSegmentName
+                  .isLowLevelConsumerSegmentName(segmentName) && LLCSegmentName
+                  .isLowLevelConsumerSegmentName(currentSegmentName)
+                  && LLCSegmentName.getSequenceNumber(segmentName) > LLCSegmentName
+                  .getSequenceNumber(currentSegmentName))) {
+            assert currentSegment.getValidDocIds() != null;
+            currentSegment.getValidDocIds().remove(currentRecordLocation.getDocId());
+            validDocIds.add(recordInfo._docId);
+            return new RecordLocation(immutableSegment, recordInfo._docId, recordInfo._timestamp);
+          } else {
+            return currentRecordLocation;
+          }
         } else {
           // New primary key
           validDocIds.add(recordInfo._docId);
-          return new RecordLocation(segmentName, recordInfo._docId, recordInfo._timestamp, validDocIds);
+          return new RecordLocation(immutableSegment, recordInfo._docId, recordInfo._timestamp);
         }
       });
     }
     // Update metrics
     _serverMetrics.setValueOfPartitionGauge(_tableNameWithType, _partitionId, ServerGauge.UPSERT_PRIMARY_KEYS_COUNT,
         _primaryKeyToRecordLocationMap.size());
-    return validDocIds;
   }
 
   /**
-   * Updates the upsert metadata for a new consumed record in the given consuming segment.
+   * Updates the upsert metadata for a new consumed record in the given consuming segment. Returns the merged record if
+   * partial-upsert is enabled.
    */
-  public void updateRecord(String segmentName, RecordInfo recordInfo, ThreadSafeMutableRoaringBitmap validDocIds) {
+  public GenericRow updateRecord(IndexSegment segment, RecordInfo recordInfo, GenericRow record) {
+    // For partial-upsert, need to ensure all previous records are loaded before inserting new records.
+    if (_partialUpsertHandler != null) {
+      while (!_partialUpsertHandler.isAllSegmentsLoaded()) {
+        LOGGER
+            .info("Sleeping 1 second waiting for all segments loaded for partial-upsert table: {}", _tableNameWithType);

Review comment:
       is it possible that segment seals during this 1s?

##########
File path: pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/PartitionUpsertMetadataManager.java
##########
@@ -60,128 +64,175 @@
   private final String _tableNameWithType;
   private final int _partitionId;
   private final ServerMetrics _serverMetrics;
+  private final PartialUpsertHandler _partialUpsertHandler;
 
-  public PartitionUpsertMetadataManager(String tableNameWithType, int partitionId, ServerMetrics serverMetrics) {
+  // TODO(upset): consider an off-heap KV store to persist this index to improve the recovery speed.
+  @VisibleForTesting
+  final ConcurrentHashMap<PrimaryKey, RecordLocation> _primaryKeyToRecordLocationMap = new ConcurrentHashMap<>();
+
+  // Reused for reading previous record during partial upsert
+  private final GenericRow _reuse = new GenericRow();
+  // Stores the result of updateRecord()
+  private GenericRow _result;
+
+  public PartitionUpsertMetadataManager(String tableNameWithType, int partitionId, ServerMetrics serverMetrics,
+      @Nullable PartialUpsertHandler partialUpsertHandler) {
     _tableNameWithType = tableNameWithType;
     _partitionId = partitionId;
     _serverMetrics = serverMetrics;
+    _partialUpsertHandler = partialUpsertHandler;
   }
 
-  public ConcurrentHashMap<PrimaryKey, RecordLocation> getPrimaryKeyToRecordLocationMap() {
-    return _primaryKeyToRecordLocationMap;
+  /**
+   * Initializes the upsert metadata for the given immutable segment.
+   */
+  public void addSegment(ImmutableSegmentImpl immutableSegment, Iterator<RecordInfo> recordInfoIterator) {
+    addSegment(immutableSegment, recordInfoIterator, new ThreadSafeMutableRoaringBitmap());
   }
 
-  // TODO(upset): consider an off-heap KV store to persist this index to improve the recovery speed.
   @VisibleForTesting
-  final ConcurrentHashMap<PrimaryKey, RecordLocation> _primaryKeyToRecordLocationMap = new ConcurrentHashMap<>();
-
-  /**
-   * Initializes the upsert metadata for the given immutable segment, returns the valid doc ids for the segment.
-   */
-  public ThreadSafeMutableRoaringBitmap addSegment(String segmentName, Iterator<RecordInfo> recordInfoIterator) {
+  void addSegment(ImmutableSegmentImpl immutableSegment, Iterator<RecordInfo> recordInfoIterator,
+      ThreadSafeMutableRoaringBitmap validDocIds) {
+    String segmentName = immutableSegment.getSegmentName();
     LOGGER.info("Adding upsert metadata for segment: {}", segmentName);
+    immutableSegment.enableUpsert(this, validDocIds);
 
-    ThreadSafeMutableRoaringBitmap validDocIds = new ThreadSafeMutableRoaringBitmap();
     while (recordInfoIterator.hasNext()) {
       RecordInfo recordInfo = recordInfoIterator.next();
       _primaryKeyToRecordLocationMap.compute(recordInfo._primaryKey, (primaryKey, currentRecordLocation) -> {
         if (currentRecordLocation != null) {
           // Existing primary key
 
-          if (segmentName.equals(currentRecordLocation.getSegmentName())) {
-            // The current record location has the same segment name
-
-            // Update the record location when the new timestamp is greater than or equal to the current timestamp.
-            // There are 2 scenarios:
-            //   1. The current record location is pointing to the same segment (the segment being added). In this case,
-            //      we want to update the record location when there is a tie to keep the newer record. Note that the
-            //      record info iterator will return records with incremental doc ids.
-            //   2. The current record location is pointing to the old segment being replaced. This could happen when
-            //      committing a consuming segment, or reloading a completed segment. In this case, we want to update
-            //      the record location when there is a tie because the record locations should point to the new added
-            //      segment instead of the old segment being replaced. Also, do not update the valid doc ids for the old
-            //      segment because it has not been replaced yet.
+          // The current record is in the same segment
+          // Update the record location when there is a tie to keep the newer record. Note that the record info iterator
+          // will return records with incremental doc ids.
+          IndexSegment currentSegment = currentRecordLocation.getSegment();
+          if (immutableSegment == currentSegment) {
             if (recordInfo._timestamp >= currentRecordLocation.getTimestamp()) {
-              // Only update the valid doc ids for the new segment
-              if (validDocIds == currentRecordLocation.getValidDocIds()) {
-                validDocIds.remove(currentRecordLocation.getDocId());
-              }
+              validDocIds.remove(currentRecordLocation.getDocId());
               validDocIds.add(recordInfo._docId);
-              return new RecordLocation(segmentName, recordInfo._docId, recordInfo._timestamp, validDocIds);
+              return new RecordLocation(immutableSegment, recordInfo._docId, recordInfo._timestamp);
             } else {
               return currentRecordLocation;
             }
-          } else {
-            // The current record location is pointing to a different segment
-
-            // Update the record location when getting a newer timestamp, or the timestamp is the same as the current
-            // timestamp, but the segment has a larger sequence number (the segment is newer than the current segment).
-            if (recordInfo._timestamp > currentRecordLocation.getTimestamp() || (
-                recordInfo._timestamp == currentRecordLocation.getTimestamp()
-                    && LLCSegmentName.isLowLevelConsumerSegmentName(segmentName)
-                    && LLCSegmentName.isLowLevelConsumerSegmentName(currentRecordLocation.getSegmentName())
-                    && LLCSegmentName.getSequenceNumber(segmentName) > LLCSegmentName
-                    .getSequenceNumber(currentRecordLocation.getSegmentName()))) {
-              currentRecordLocation.getValidDocIds().remove(currentRecordLocation.getDocId());
+          }
+
+          // The current record is in an old segment being replaced
+          // This could happen when committing a consuming segment, or reloading a completed segment. In this case, we
+          // want to update the record location when there is a tie because the record locations should point to the new
+          // added segment instead of the old segment being replaced. Also, do not update the valid doc ids for the old
+          // segment because it has not been replaced yet.
+          String currentSegmentName = currentSegment.getSegmentName();
+          if (segmentName.equals(currentSegmentName)) {
+            if (recordInfo._timestamp >= currentRecordLocation.getTimestamp()) {
               validDocIds.add(recordInfo._docId);
-              return new RecordLocation(segmentName, recordInfo._docId, recordInfo._timestamp, validDocIds);
+              return new RecordLocation(immutableSegment, recordInfo._docId, recordInfo._timestamp);
             } else {
               return currentRecordLocation;
             }
           }
+
+          // The current record is in a different segment
+          // Update the record location when getting a newer timestamp, or the timestamp is the same as the current
+          // timestamp, but the segment has a larger sequence number (the segment is newer than the current segment).

Review comment:
       why do we need this additional handling and split the handling of immutable segment vs consuming segment? Why does partial upsert require this change?

##########
File path: pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/merger/IncrementMerger.java
##########
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.upsert.merger;
+
+public class IncrementMerger implements PartialUpsertMerger {
+  IncrementMerger() {

Review comment:
       private

##########
File path: pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/PartialUpsertHandler.java
##########
@@ -0,0 +1,138 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.upsert;
+
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixManager;
+import org.apache.helix.PropertyKey;
+import org.apache.helix.model.CurrentState;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.LiveInstance;
+import org.apache.pinot.segment.local.upsert.merger.PartialUpsertMerger;
+import org.apache.pinot.segment.local.upsert.merger.PartialUpsertMergerFactory;
+import org.apache.pinot.spi.config.table.UpsertConfig;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.spi.utils.CommonConstants.Helix.StateModel.SegmentStateModel;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class PartialUpsertHandler {
+  private static final Logger LOGGER = LoggerFactory.getLogger(PartialUpsertHandler.class);
+
+  private final Map<String, PartialUpsertMerger> _mergers = new HashMap<>();
+
+  private final HelixManager _helixManager;
+  private final String _tableNameWithType;
+  private boolean _allSegmentsLoaded;

Review comment:
       I see. Is it possible for a loaded segment to be reloaded?

##########
File path: pinot-core/src/test/java/org/apache/pinot/core/util/TableConfigUtilsTest.java
##########
@@ -1098,4 +1105,46 @@ public void testValidateUpsertConfig() {
       Assert.assertEquals(e.getMessage(), "The upsert table cannot have star-tree index.");
     }
   }
+
+  @Test
+  public void testValidatePartialUpsertConfig() {
+    Schema schema =
+        new Schema.SchemaBuilder().setSchemaName(TABLE_NAME).addSingleValueDimension("myCol1", FieldSpec.DataType.LONG)
+            .addSingleValueDimension("myCol2", FieldSpec.DataType.STRING)
+            .setPrimaryKeyColumns(Lists.newArrayList("myCol1")).build();
+
+    Map<String, String> streamConfigs = new HashMap<>();
+    streamConfigs.put("stream.kafka.consumer.type", "highLevel");
+    streamConfigs.put("streamType", "kafka");
+    streamConfigs.put("stream.kafka.topic.name", "test");
+    streamConfigs
+        .put("stream.kafka.decoder.class.name", "org.apache.pinot.plugin.stream.kafka.KafkaJSONMessageDecoder");
+    streamConfigs.put("stream.kafka.consumer.type", "simple");
+
+    TableConfig tableConfig = new TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME)
+        .setUpsertConfig(new UpsertConfig(UpsertConfig.Mode.PARTIAL)).setNullHandlingEnabled(false)
+        .setRoutingConfig(new RoutingConfig(null, null, RoutingConfig.STRICT_REPLICA_GROUP_INSTANCE_SELECTOR_TYPE))
+        .setStreamConfigs(streamConfigs).build();
+    try {
+      TableConfigUtils.validateUpsertConfig(tableConfig, schema);
+    } catch (Exception e) {
+      Assert.assertEquals(e.getMessage(), "NullValueHandling is required to be enabled for partial upsert tables.");
+    }
+
+    Map<String, UpsertConfig.Strategy> partialUpsertStratgies = new HashMap<>();
+    partialUpsertStratgies.put("myCol1", UpsertConfig.Strategy.INCREMENT);
+    try {
+      TableConfigUtils.validateUpsertConfig(tableConfig, schema);
+    } catch (Exception e) {
+      Assert.assertEquals(e.getMessage(), "INCREMENT merger cannot be applied to PK.");
+    }
+
+    partialUpsertStratgies = new HashMap<>();
+    partialUpsertStratgies.put("myCol2", UpsertConfig.Strategy.INCREMENT);
+    try {
+      TableConfigUtils.validateUpsertConfig(tableConfig, schema);
+    } catch (Exception e) {
+      Assert.assertEquals(e.getMessage(), "INCREMENT merger should be numeric data types.");

Review comment:
       One possible use case is the time counter increments.

##########
File path: pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/PartitionUpsertMetadataManager.java
##########
@@ -60,128 +64,175 @@
   private final String _tableNameWithType;
   private final int _partitionId;
   private final ServerMetrics _serverMetrics;
+  private final PartialUpsertHandler _partialUpsertHandler;
 
-  public PartitionUpsertMetadataManager(String tableNameWithType, int partitionId, ServerMetrics serverMetrics) {
+  // TODO(upset): consider an off-heap KV store to persist this index to improve the recovery speed.
+  @VisibleForTesting
+  final ConcurrentHashMap<PrimaryKey, RecordLocation> _primaryKeyToRecordLocationMap = new ConcurrentHashMap<>();
+
+  // Reused for reading previous record during partial upsert
+  private final GenericRow _reuse = new GenericRow();
+  // Stores the result of updateRecord()
+  private GenericRow _result;
+
+  public PartitionUpsertMetadataManager(String tableNameWithType, int partitionId, ServerMetrics serverMetrics,
+      @Nullable PartialUpsertHandler partialUpsertHandler) {
     _tableNameWithType = tableNameWithType;
     _partitionId = partitionId;
     _serverMetrics = serverMetrics;
+    _partialUpsertHandler = partialUpsertHandler;
   }
 
-  public ConcurrentHashMap<PrimaryKey, RecordLocation> getPrimaryKeyToRecordLocationMap() {
-    return _primaryKeyToRecordLocationMap;
+  /**
+   * Initializes the upsert metadata for the given immutable segment.
+   */
+  public void addSegment(ImmutableSegmentImpl immutableSegment, Iterator<RecordInfo> recordInfoIterator) {
+    addSegment(immutableSegment, recordInfoIterator, new ThreadSafeMutableRoaringBitmap());
   }
 
-  // TODO(upset): consider an off-heap KV store to persist this index to improve the recovery speed.
   @VisibleForTesting
-  final ConcurrentHashMap<PrimaryKey, RecordLocation> _primaryKeyToRecordLocationMap = new ConcurrentHashMap<>();
-
-  /**
-   * Initializes the upsert metadata for the given immutable segment, returns the valid doc ids for the segment.
-   */
-  public ThreadSafeMutableRoaringBitmap addSegment(String segmentName, Iterator<RecordInfo> recordInfoIterator) {
+  void addSegment(ImmutableSegmentImpl immutableSegment, Iterator<RecordInfo> recordInfoIterator,
+      ThreadSafeMutableRoaringBitmap validDocIds) {
+    String segmentName = immutableSegment.getSegmentName();
     LOGGER.info("Adding upsert metadata for segment: {}", segmentName);
+    immutableSegment.enableUpsert(this, validDocIds);
 
-    ThreadSafeMutableRoaringBitmap validDocIds = new ThreadSafeMutableRoaringBitmap();
     while (recordInfoIterator.hasNext()) {
       RecordInfo recordInfo = recordInfoIterator.next();
       _primaryKeyToRecordLocationMap.compute(recordInfo._primaryKey, (primaryKey, currentRecordLocation) -> {
         if (currentRecordLocation != null) {
           // Existing primary key
 
-          if (segmentName.equals(currentRecordLocation.getSegmentName())) {
-            // The current record location has the same segment name
-
-            // Update the record location when the new timestamp is greater than or equal to the current timestamp.
-            // There are 2 scenarios:
-            //   1. The current record location is pointing to the same segment (the segment being added). In this case,
-            //      we want to update the record location when there is a tie to keep the newer record. Note that the
-            //      record info iterator will return records with incremental doc ids.
-            //   2. The current record location is pointing to the old segment being replaced. This could happen when
-            //      committing a consuming segment, or reloading a completed segment. In this case, we want to update
-            //      the record location when there is a tie because the record locations should point to the new added
-            //      segment instead of the old segment being replaced. Also, do not update the valid doc ids for the old
-            //      segment because it has not been replaced yet.
+          // The current record is in the same segment
+          // Update the record location when there is a tie to keep the newer record. Note that the record info iterator
+          // will return records with incremental doc ids.
+          IndexSegment currentSegment = currentRecordLocation.getSegment();
+          if (immutableSegment == currentSegment) {
             if (recordInfo._timestamp >= currentRecordLocation.getTimestamp()) {
-              // Only update the valid doc ids for the new segment
-              if (validDocIds == currentRecordLocation.getValidDocIds()) {
-                validDocIds.remove(currentRecordLocation.getDocId());
-              }
+              validDocIds.remove(currentRecordLocation.getDocId());
               validDocIds.add(recordInfo._docId);
-              return new RecordLocation(segmentName, recordInfo._docId, recordInfo._timestamp, validDocIds);
+              return new RecordLocation(immutableSegment, recordInfo._docId, recordInfo._timestamp);
             } else {
               return currentRecordLocation;
             }
-          } else {
-            // The current record location is pointing to a different segment
-
-            // Update the record location when getting a newer timestamp, or the timestamp is the same as the current
-            // timestamp, but the segment has a larger sequence number (the segment is newer than the current segment).
-            if (recordInfo._timestamp > currentRecordLocation.getTimestamp() || (
-                recordInfo._timestamp == currentRecordLocation.getTimestamp()
-                    && LLCSegmentName.isLowLevelConsumerSegmentName(segmentName)
-                    && LLCSegmentName.isLowLevelConsumerSegmentName(currentRecordLocation.getSegmentName())
-                    && LLCSegmentName.getSequenceNumber(segmentName) > LLCSegmentName
-                    .getSequenceNumber(currentRecordLocation.getSegmentName()))) {
-              currentRecordLocation.getValidDocIds().remove(currentRecordLocation.getDocId());
+          }
+
+          // The current record is in an old segment being replaced
+          // This could happen when committing a consuming segment, or reloading a completed segment. In this case, we
+          // want to update the record location when there is a tie because the record locations should point to the new
+          // added segment instead of the old segment being replaced. Also, do not update the valid doc ids for the old
+          // segment because it has not been replaced yet.
+          String currentSegmentName = currentSegment.getSegmentName();
+          if (segmentName.equals(currentSegmentName)) {
+            if (recordInfo._timestamp >= currentRecordLocation.getTimestamp()) {
               validDocIds.add(recordInfo._docId);
-              return new RecordLocation(segmentName, recordInfo._docId, recordInfo._timestamp, validDocIds);
+              return new RecordLocation(immutableSegment, recordInfo._docId, recordInfo._timestamp);
             } else {
               return currentRecordLocation;
             }
           }
+
+          // The current record is in a different segment
+          // Update the record location when getting a newer timestamp, or the timestamp is the same as the current
+          // timestamp, but the segment has a larger sequence number (the segment is newer than the current segment).
+          if (recordInfo._timestamp > currentRecordLocation.getTimestamp() || (
+              recordInfo._timestamp == currentRecordLocation.getTimestamp() && LLCSegmentName
+                  .isLowLevelConsumerSegmentName(segmentName) && LLCSegmentName
+                  .isLowLevelConsumerSegmentName(currentSegmentName)
+                  && LLCSegmentName.getSequenceNumber(segmentName) > LLCSegmentName
+                  .getSequenceNumber(currentSegmentName))) {
+            assert currentSegment.getValidDocIds() != null;
+            currentSegment.getValidDocIds().remove(currentRecordLocation.getDocId());
+            validDocIds.add(recordInfo._docId);
+            return new RecordLocation(immutableSegment, recordInfo._docId, recordInfo._timestamp);
+          } else {
+            return currentRecordLocation;
+          }
         } else {
           // New primary key
           validDocIds.add(recordInfo._docId);
-          return new RecordLocation(segmentName, recordInfo._docId, recordInfo._timestamp, validDocIds);
+          return new RecordLocation(immutableSegment, recordInfo._docId, recordInfo._timestamp);
         }
       });
     }
     // Update metrics
     _serverMetrics.setValueOfPartitionGauge(_tableNameWithType, _partitionId, ServerGauge.UPSERT_PRIMARY_KEYS_COUNT,
         _primaryKeyToRecordLocationMap.size());
-    return validDocIds;
   }
 
   /**
-   * Updates the upsert metadata for a new consumed record in the given consuming segment.
+   * Updates the upsert metadata for a new consumed record in the given consuming segment. Returns the merged record if
+   * partial-upsert is enabled.
    */
-  public void updateRecord(String segmentName, RecordInfo recordInfo, ThreadSafeMutableRoaringBitmap validDocIds) {
+  public GenericRow updateRecord(IndexSegment segment, RecordInfo recordInfo, GenericRow record) {
+    // For partial-upsert, need to ensure all previous records are loaded before inserting new records.
+    if (_partialUpsertHandler != null) {
+      while (!_partialUpsertHandler.isAllSegmentsLoaded()) {
+        LOGGER
+            .info("Sleeping 1 second waiting for all segments loaded for partial-upsert table: {}", _tableNameWithType);
+        try {
+          //noinspection BusyWait
+          Thread.sleep(1000L);
+        } catch (InterruptedException e) {
+          throw new RuntimeException(e);
+        }
+      }
+    }
+
+    _result = record;
     _primaryKeyToRecordLocationMap.compute(recordInfo._primaryKey, (primaryKey, currentRecordLocation) -> {
       if (currentRecordLocation != null) {
         // Existing primary key
 
         // Update the record location when the new timestamp is greater than or equal to the current timestamp. Update
         // the record location when there is a tie to keep the newer record.
         if (recordInfo._timestamp >= currentRecordLocation.getTimestamp()) {
-          currentRecordLocation.getValidDocIds().remove(currentRecordLocation.getDocId());
-          validDocIds.add(recordInfo._docId);
-          return new RecordLocation(segmentName, recordInfo._docId, recordInfo._timestamp, validDocIds);
+          IndexSegment currentSegment = currentRecordLocation.getSegment();
+          if (_partialUpsertHandler != null) {
+            // Partial upsert
+            GenericRow previousRecord = currentSegment.getRecord(currentRecordLocation.getDocId(), _reuse);

Review comment:
       hmm what if the segment is expired in between?

##########
File path: pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/merger/IncrementMerger.java
##########
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.upsert.merger;
+
+public class IncrementMerger implements PartialUpsertMerger {
+  IncrementMerger() {

Review comment:
       ack




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org