You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2023/03/03 19:43:22 UTC

[pinot] branch master updated: Add support for multiple comparison columns (i.e. one comparison column per producer sinking to a table) (#10234)

This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 2e03df02f6 Add support for multiple comparison columns (i.e. one comparison column per producer sinking to a table) (#10234)
2e03df02f6 is described below

commit 2e03df02f65d766bde040389e4e6b12ea09710fa
Author: Evan Galpin <eg...@users.noreply.github.com>
AuthorDate: Fri Mar 3 11:43:16 2023 -0800

    Add support for multiple comparison columns (i.e. one comparison column per producer sinking to a table) (#10234)
---
 .../multipleIndexPartialUpsertConfig.json          | 41 +++++++++++
 ...artialUpsertMultipleComparisonColumnConfig.json | 41 +++++++++++
 .../api/resources/PinotUpsertRestletResource.java  | 27 +++++---
 .../realtime/LLRealtimeSegmentDataManager.java     |  2 +-
 ...adataAndDictionaryAggregationPlanMakerTest.java |  3 +-
 .../indexsegment/mutable/MutableSegmentImpl.java   | 48 ++++++++++---
 .../local/realtime/impl/RealtimeSegmentConfig.java | 18 ++---
 .../upsert/BasePartitionUpsertMetadataManager.java | 14 ++--
 .../upsert/BaseTableUpsertMetadataManager.java     | 11 +--
 .../segment/local/upsert/ComparisonColumns.java    | 67 ++++++++++++++++++
 ...oncurrentMapPartitionUpsertMetadataManager.java |  4 +-
 .../ConcurrentMapTableUpsertMetadataManager.java   |  2 +-
 .../segment/local/upsert/PartialUpsertHandler.java |  8 +--
 .../pinot/segment/local/upsert/UpsertUtils.java    | 79 ++++++++++++++++++++--
 .../segment/local/utils/TableConfigUtils.java      | 12 ++--
 .../ImmutableSegmentImplUpsertSnapshotTest.java    |  2 +-
 .../mutable/MutableSegmentImplTestUtils.java       |  4 +-
 .../mutable/MutableSegmentImplUpsertTest.java      | 64 ++++++++++++++----
 ...rrentMapPartitionUpsertMetadataManagerTest.java | 15 ++--
 .../local/upsert/PartialUpsertHandlerTest.java     |  7 +-
 .../src/test/resources/data/test_upsert_data.json  | 21 +++++-
 .../test/resources/data/test_upsert_schema.json    |  4 ++
 .../apache/pinot/spi/config/table/TableConfig.java |  4 +-
 .../pinot/spi/config/table/UpsertConfig.java       | 56 +++++++--------
 .../pinot/spi/config/table/UpsertConfigTest.java   |  3 +-
 25 files changed, 443 insertions(+), 114 deletions(-)

diff --git a/pinot-common/src/test/resources/testConfigs/multipleIndexPartialUpsertConfig.json b/pinot-common/src/test/resources/testConfigs/multipleIndexPartialUpsertConfig.json
new file mode 100644
index 0000000000..8e273a0514
--- /dev/null
+++ b/pinot-common/src/test/resources/testConfigs/multipleIndexPartialUpsertConfig.json
@@ -0,0 +1,41 @@
+{
+  "tableName": "testTableUpsert",
+  "segmentsConfig": {
+    "replication": "1",
+    "schemaName": "testTable",
+    "timeColumnName": "time"
+  },
+  "fieldConfigList": [
+    {
+      "encodingType": "DICTIONARY",
+      "indexTypes": [
+        "INVERTED",
+        "RANGE"
+      ],
+      "name": "hits"
+    }
+  ],
+  "tableIndexConfig": {
+    "invertedIndexColumns": [
+      "hits"
+    ],
+    "rangeIndexColumns": [
+      "hits"
+    ],
+    "loadMode": "HEAP"
+  },
+  "tenants": {
+    "broker": "DefaultTenant",
+    "server": "DefaultTenant"
+  },
+  "tableType": "REALTIME",
+  "upsertConfig": {
+    "mode": "PARTIAL",
+    "comparisonColumn": "_comparison_column",
+    "partialUpsertStrategies": {},
+    "defaultPartialUpsertStrategy": "OVERWRITE",
+    "hashFunction": "MURMUR3",
+    "enableSnapshot": false
+  },
+  "metadata": {}
+}
diff --git a/pinot-common/src/test/resources/testConfigs/multipleIndexPartialUpsertMultipleComparisonColumnConfig.json b/pinot-common/src/test/resources/testConfigs/multipleIndexPartialUpsertMultipleComparisonColumnConfig.json
new file mode 100644
index 0000000000..06ac7c2624
--- /dev/null
+++ b/pinot-common/src/test/resources/testConfigs/multipleIndexPartialUpsertMultipleComparisonColumnConfig.json
@@ -0,0 +1,41 @@
+{
+  "tableName": "testTableUpsertMultiComparison",
+  "segmentsConfig": {
+    "replication": "1",
+    "schemaName": "testTable",
+    "timeColumnName": "time"
+  },
+  "fieldConfigList": [
+    {
+      "encodingType": "DICTIONARY",
+      "indexTypes": [
+        "INVERTED",
+        "RANGE"
+      ],
+      "name": "hits"
+    }
+  ],
+  "tableIndexConfig": {
+    "invertedIndexColumns": [
+      "hits"
+    ],
+    "rangeIndexColumns": [
+      "hits"
+    ],
+    "loadMode": "HEAP"
+  },
+  "tenants": {
+    "broker": "DefaultTenant",
+    "server": "DefaultTenant"
+  },
+  "tableType": "REALTIME",
+  "upsertConfig": {
+    "mode": "PARTIAL",
+    "comparisonColumns": ["_comparison_column", "foo"],
+    "partialUpsertStrategies": {},
+    "defaultPartialUpsertStrategy": "OVERWRITE",
+    "hashFunction": "MURMUR3",
+    "enableSnapshot": false
+  },
+  "metadata": {}
+}
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotUpsertRestletResource.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotUpsertRestletResource.java
index 363b288ef6..0c0fdaa8e0 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotUpsertRestletResource.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotUpsertRestletResource.java
@@ -129,14 +129,25 @@ public class PinotUpsertRestletResource {
     // Estimated value space, it contains <segmentName, DocId, ComparisonValue(timestamp)> and overhead.
     // Here we only calculate the map content size. TODO: Add the map entry size and the array size within the map.
     int bytesPerValue = 60;
-    String comparisonColumn = tableConfig.getUpsertConfig().getComparisonColumn();
-    if (comparisonColumn != null) {
-      FieldSpec.DataType dt = schema.getFieldSpecFor(comparisonColumn).getDataType();
-      if (!dt.isFixedWidth()) {
-        String msg = "Not support data types for the comparison column";
-        throw new ControllerApplicationException(LOGGER, msg, Response.Status.BAD_REQUEST);
-      } else {
-        bytesPerValue = 52 + dt.size();
+    List<String> comparisonColumns = tableConfig.getUpsertConfig().getComparisonColumns();
+    if (comparisonColumns != null) {
+      int bytesPerArrayElem = 8;  // object ref
+      bytesPerValue = 52;
+      for (String columnName : comparisonColumns) {
+        FieldSpec.DataType dt = schema.getFieldSpecFor(columnName).getDataType();
+        if (!dt.isFixedWidth()) {
+          String msg = "Not support data types for the comparison column";
+          throw new ControllerApplicationException(LOGGER, msg, Response.Status.BAD_REQUEST);
+        } else {
+          if (comparisonColumns.size() == 1) {
+            bytesPerValue += dt.size();
+          } else {
+            bytesPerValue += bytesPerArrayElem + dt.size();
+          }
+        }
+      }
+      if (comparisonColumns.size() > 1) {
+        bytesPerValue += 48 + 4;  // array overhead + comparableIndex integer
       }
     }
 
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
index 28a24ad937..9d962dbe3f 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
@@ -1394,7 +1394,7 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
             .setConsumerDir(consumerDir).setUpsertMode(tableConfig.getUpsertMode())
             .setPartitionUpsertMetadataManager(partitionUpsertMetadataManager)
             .setPartitionDedupMetadataManager(partitionDedupMetadataManager)
-            .setUpsertComparisonColumn(tableConfig.getUpsertComparisonColumn())
+            .setUpsertComparisonColumns(tableConfig.getUpsertComparisonColumns())
             .setFieldConfigList(tableConfig.getFieldConfigList());
 
     // Create message decoder
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/plan/maker/MetadataAndDictionaryAggregationPlanMakerTest.java b/pinot-core/src/test/java/org/apache/pinot/core/plan/maker/MetadataAndDictionaryAggregationPlanMakerTest.java
index f6df4e9b4e..66fbf400d0 100644
--- a/pinot-core/src/test/java/org/apache/pinot/core/plan/maker/MetadataAndDictionaryAggregationPlanMakerTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/core/plan/maker/MetadataAndDictionaryAggregationPlanMakerTest.java
@@ -130,7 +130,8 @@ public class MetadataAndDictionaryAggregationPlanMakerTest {
     _upsertIndexSegment = ImmutableSegmentLoader.load(new File(INDEX_DIR, SEGMENT_NAME), ReadMode.heap);
     ((ImmutableSegmentImpl) _upsertIndexSegment).enableUpsert(
         new ConcurrentMapPartitionUpsertMetadataManager("testTable_REALTIME", 0, Collections.singletonList("column6"),
-            "daysSinceEpoch", HashFunction.NONE, null, false, serverMetrics), new ThreadSafeMutableRoaringBitmap());
+            Collections.singletonList("daysSinceEpoch"), HashFunction.NONE, null, false, serverMetrics),
+        new ThreadSafeMutableRoaringBitmap());
   }
 
   @AfterClass
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java
index f3e97443cb..df75e10333 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java
@@ -62,6 +62,7 @@ import org.apache.pinot.segment.local.segment.store.TextIndexUtils;
 import org.apache.pinot.segment.local.segment.virtualcolumn.VirtualColumnContext;
 import org.apache.pinot.segment.local.segment.virtualcolumn.VirtualColumnProvider;
 import org.apache.pinot.segment.local.segment.virtualcolumn.VirtualColumnProviderFactory;
+import org.apache.pinot.segment.local.upsert.ComparisonColumns;
 import org.apache.pinot.segment.local.upsert.PartitionUpsertMetadataManager;
 import org.apache.pinot.segment.local.upsert.RecordInfo;
 import org.apache.pinot.segment.local.utils.FixedIntArrayOffHeapIdMap;
@@ -163,7 +164,7 @@ public class MutableSegmentImpl implements MutableSegment {
   private RealtimeLuceneIndexRefreshState.RealtimeLuceneReaders _realtimeLuceneReaders;
 
   private final UpsertConfig.Mode _upsertMode;
-  private final String _upsertComparisonColumn;
+  private final List<String> _upsertComparisonColumns;
   private final PartitionUpsertMetadataManager _partitionUpsertMetadataManager;
   private final PartitionDedupMetadataManager _partitionDedupMetadataManager;
   // The valid doc ids are maintained locally instead of in the upsert metadata manager because:
@@ -408,12 +409,13 @@ public class MutableSegmentImpl implements MutableSegment {
           "Metrics aggregation and upsert cannot be enabled together");
       _partitionUpsertMetadataManager = config.getPartitionUpsertMetadataManager();
       _validDocIds = new ThreadSafeMutableRoaringBitmap();
-      String upsertComparisonColumn = config.getUpsertComparisonColumn();
-      _upsertComparisonColumn = upsertComparisonColumn != null ? upsertComparisonColumn : _timeColumnName;
+      List<String> upsertComparisonColumns = config.getUpsertComparisonColumns();
+      _upsertComparisonColumns =
+          upsertComparisonColumns != null ? upsertComparisonColumns : Collections.singletonList(_timeColumnName);
     } else {
       _partitionUpsertMetadataManager = null;
       _validDocIds = null;
-      _upsertComparisonColumn = null;
+      _upsertComparisonColumns = null;
     }
   }
 
@@ -558,15 +560,45 @@ public class MutableSegmentImpl implements MutableSegment {
     PrimaryKey primaryKey = row.getPrimaryKey(_schema.getPrimaryKeyColumns());
 
     if (isUpsertEnabled()) {
-      Object upsertComparisonValue = row.getValue(_upsertComparisonColumn);
-      Preconditions.checkState(upsertComparisonValue instanceof Comparable,
-          "Upsert comparison column: %s must be comparable", _upsertComparisonColumn);
-      return new RecordInfo(primaryKey, docId, (Comparable) upsertComparisonValue);
+      if (_upsertComparisonColumns.size() > 1) {
+        return multiComparisonRecordInfo(primaryKey, docId, row);
+      }
+      Comparable comparisonValue = (Comparable) row.getValue(_upsertComparisonColumns.get(0));
+      return new RecordInfo(primaryKey, docId, comparisonValue);
     }
 
     return new RecordInfo(primaryKey, docId, null);
   }
 
+  private RecordInfo multiComparisonRecordInfo(PrimaryKey primaryKey, int docId, GenericRow row) {
+    int numComparisonColumns = _upsertComparisonColumns.size();
+    Comparable[] comparisonValues = new Comparable[numComparisonColumns];
+
+    int comparableIndex = -1;
+    for (int i = 0; i < numComparisonColumns; i++) {
+      String columnName = _upsertComparisonColumns.get(i);
+
+      if (!row.isNullValue(columnName)) {
+        // Inbound records may only have exactly 1 non-null value in one of the comparison column i.e. comparison
+        // columns are mutually exclusive. If comparableIndex has already been modified from its initialized value,
+        // that means there must have already been a non-null value processed and therefore processing an additional
+        // non-null value would be an error.
+        Preconditions.checkState(comparableIndex == -1,
+            "Documents must have exactly 1 non-null comparison column value");
+
+        comparableIndex = i;
+
+        Object comparisonValue = row.getValue(columnName);
+        Preconditions.checkState(comparisonValue instanceof Comparable,
+            "Upsert comparison column: %s must be comparable", columnName);
+        comparisonValues[i] = (Comparable) comparisonValue;
+      }
+    }
+    Preconditions.checkState(comparableIndex != -1,
+        "Documents must have exactly 1 non-null comparison column value");
+    return new RecordInfo(primaryKey, docId, new ComparisonColumns(comparisonValues, comparableIndex));
+  }
+
   private void updateDictionary(GenericRow row) {
     for (Map.Entry<String, IndexContainer> entry : _indexContainerMap.entrySet()) {
       IndexContainer indexContainer = entry.getValue();
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/RealtimeSegmentConfig.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/RealtimeSegmentConfig.java
index 9af8367b49..7068d3e382 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/RealtimeSegmentConfig.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/RealtimeSegmentConfig.java
@@ -62,7 +62,7 @@ public class RealtimeSegmentConfig {
   private final boolean _aggregateMetrics;
   private final boolean _nullHandlingEnabled;
   private final UpsertConfig.Mode _upsertMode;
-  private final String _upsertComparisonColumn;
+  private final List<String> _upsertComparisonColumns;
   private final PartitionUpsertMetadataManager _partitionUpsertMetadataManager;
   private final PartitionDedupMetadataManager _partitionDedupMetadataManager;
   private final String _consumerDir;
@@ -77,7 +77,7 @@ public class RealtimeSegmentConfig {
       Map<String, H3IndexConfig> h3IndexConfigs, SegmentZKMetadata segmentZKMetadata, boolean offHeap,
       PinotDataBufferMemoryManager memoryManager, RealtimeSegmentStatsHistory statsHistory, String partitionColumn,
       PartitionFunction partitionFunction, int partitionId, boolean aggregateMetrics, boolean nullHandlingEnabled,
-      String consumerDir, UpsertConfig.Mode upsertMode, String upsertComparisonColumn,
+      String consumerDir, UpsertConfig.Mode upsertMode, List<String> upsertComparisonColumns,
       PartitionUpsertMetadataManager partitionUpsertMetadataManager,
       PartitionDedupMetadataManager partitionDedupMetadataManager, List<FieldConfig> fieldConfigList,
       List<AggregationConfig> ingestionAggregationConfigs) {
@@ -106,7 +106,7 @@ public class RealtimeSegmentConfig {
     _nullHandlingEnabled = nullHandlingEnabled;
     _consumerDir = consumerDir;
     _upsertMode = upsertMode != null ? upsertMode : UpsertConfig.Mode.NONE;
-    _upsertComparisonColumn = upsertComparisonColumn;
+    _upsertComparisonColumns = upsertComparisonColumns;
     _partitionUpsertMetadataManager = partitionUpsertMetadataManager;
     _partitionDedupMetadataManager = partitionDedupMetadataManager;
     _fieldConfigList = fieldConfigList;
@@ -222,8 +222,8 @@ public class RealtimeSegmentConfig {
     return _partitionDedupMetadataManager != null;
   }
 
-  public String getUpsertComparisonColumn() {
-    return _upsertComparisonColumn;
+  public List<String> getUpsertComparisonColumns() {
+    return _upsertComparisonColumns;
   }
 
   public PartitionUpsertMetadataManager getPartitionUpsertMetadataManager() {
@@ -268,7 +268,7 @@ public class RealtimeSegmentConfig {
     private boolean _nullHandlingEnabled = false;
     private String _consumerDir;
     private UpsertConfig.Mode _upsertMode;
-    private String _upsertComparisonColumn;
+    private List<String> _upsertComparisonColumns;
     private PartitionUpsertMetadataManager _partitionUpsertMetadataManager;
     private PartitionDedupMetadataManager _partitionDedupMetadataManager;
     private List<FieldConfig> _fieldConfigList;
@@ -410,8 +410,8 @@ public class RealtimeSegmentConfig {
       return this;
     }
 
-    public Builder setUpsertComparisonColumn(String upsertComparisonColumn) {
-      _upsertComparisonColumn = upsertComparisonColumn;
+    public Builder setUpsertComparisonColumns(List<String> upsertComparisonColumns) {
+      _upsertComparisonColumns = upsertComparisonColumns;
       return this;
     }
 
@@ -440,7 +440,7 @@ public class RealtimeSegmentConfig {
           _capacity, _avgNumMultiValues, _noDictionaryColumns, _varLengthDictionaryColumns, _invertedIndexColumns,
           _textIndexColumns, _fstIndexColumns, _jsonIndexConfigs, _h3IndexConfigs, _segmentZKMetadata, _offHeap,
           _memoryManager, _statsHistory, _partitionColumn, _partitionFunction, _partitionId, _aggregateMetrics,
-          _nullHandlingEnabled, _consumerDir, _upsertMode, _upsertComparisonColumn, _partitionUpsertMetadataManager,
+          _nullHandlingEnabled, _consumerDir, _upsertMode, _upsertComparisonColumns, _partitionUpsertMetadataManager,
           _partitionDedupMetadataManager, _fieldConfigList, _ingestionAggregationConfigs);
     }
   }
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java
index 361052fdc2..066ef9cac1 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java
@@ -54,7 +54,7 @@ public abstract class BasePartitionUpsertMetadataManager implements PartitionUps
   protected final String _tableNameWithType;
   protected final int _partitionId;
   protected final List<String> _primaryKeyColumns;
-  protected final String _comparisonColumn;
+  protected final List<String> _comparisonColumns;
   protected final HashFunction _hashFunction;
   protected final PartialUpsertHandler _partialUpsertHandler;
   protected final boolean _enableSnapshot;
@@ -72,12 +72,12 @@ public abstract class BasePartitionUpsertMetadataManager implements PartitionUps
   protected int _numOutOfOrderEvents = 0;
 
   protected BasePartitionUpsertMetadataManager(String tableNameWithType, int partitionId,
-      List<String> primaryKeyColumns, String comparisonColumn, HashFunction hashFunction,
+      List<String> primaryKeyColumns, List<String> comparisonColumns, HashFunction hashFunction,
       @Nullable PartialUpsertHandler partialUpsertHandler, boolean enableSnapshot, ServerMetrics serverMetrics) {
     _tableNameWithType = tableNameWithType;
     _partitionId = partitionId;
     _primaryKeyColumns = primaryKeyColumns;
-    _comparisonColumn = comparisonColumn;
+    _comparisonColumns = comparisonColumns;
     _hashFunction = hashFunction;
     _partialUpsertHandler = partialUpsertHandler;
     _enableSnapshot = enableSnapshot;
@@ -131,8 +131,8 @@ public abstract class BasePartitionUpsertMetadataManager implements PartitionUps
       immutableSegmentImpl.deleteValidDocIdsSnapshot();
     }
 
-    try (UpsertUtils.RecordInfoReader recordInfoReader = new UpsertUtils.RecordInfoReader(segment, _primaryKeyColumns,
-        _comparisonColumn)) {
+    try (UpsertUtils.RecordInfoReader recordInfoReader = UpsertUtils.makeRecordReader(segment, _primaryKeyColumns,
+        _comparisonColumns)) {
       Iterator<RecordInfo> recordInfoIterator;
       if (validDocIds != null) {
         recordInfoIterator = UpsertUtils.getRecordInfoIterator(recordInfoReader, validDocIds);
@@ -225,8 +225,8 @@ public abstract class BasePartitionUpsertMetadataManager implements PartitionUps
       return;
     }
 
-    try (UpsertUtils.RecordInfoReader recordInfoReader = new UpsertUtils.RecordInfoReader(segment, _primaryKeyColumns,
-        _comparisonColumn)) {
+    try (UpsertUtils.RecordInfoReader recordInfoReader = UpsertUtils.makeRecordReader(segment, _primaryKeyColumns,
+        _comparisonColumns)) {
       Iterator<RecordInfo> recordInfoIterator =
           UpsertUtils.getRecordInfoIterator(recordInfoReader, segment.getSegmentMetadata().getTotalDocs());
       replaceSegment(segment, null, recordInfoIterator, oldSegment);
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BaseTableUpsertMetadataManager.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BaseTableUpsertMetadataManager.java
index 7147341b69..2ceb5ddc72 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BaseTableUpsertMetadataManager.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BaseTableUpsertMetadataManager.java
@@ -19,6 +19,7 @@
 package org.apache.pinot.segment.local.upsert;
 
 import com.google.common.base.Preconditions;
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import javax.annotation.concurrent.ThreadSafe;
@@ -35,7 +36,7 @@ import org.apache.pinot.spi.data.Schema;
 public abstract class BaseTableUpsertMetadataManager implements TableUpsertMetadataManager {
   protected String _tableNameWithType;
   protected List<String> _primaryKeyColumns;
-  protected String _comparisonColumn;
+  protected List<String> _comparisonColumns;
   protected HashFunction _hashFunction;
   protected PartialUpsertHandler _partialUpsertHandler;
   protected boolean _enableSnapshot;
@@ -54,9 +55,9 @@ public abstract class BaseTableUpsertMetadataManager implements TableUpsertMetad
     Preconditions.checkArgument(!CollectionUtils.isEmpty(_primaryKeyColumns),
         "Primary key columns must be configured for upsert enabled table: %s", _tableNameWithType);
 
-    _comparisonColumn = upsertConfig.getComparisonColumn();
-    if (_comparisonColumn == null) {
-      _comparisonColumn = tableConfig.getValidationConfig().getTimeColumnName();
+    _comparisonColumns = upsertConfig.getComparisonColumns();
+    if (_comparisonColumns == null) {
+      _comparisonColumns = Collections.singletonList(tableConfig.getValidationConfig().getTimeColumnName());
     }
 
     _hashFunction = upsertConfig.getHashFunction();
@@ -67,7 +68,7 @@ public abstract class BaseTableUpsertMetadataManager implements TableUpsertMetad
           "Partial-upsert strategies must be configured for partial-upsert enabled table: %s", _tableNameWithType);
       _partialUpsertHandler =
           new PartialUpsertHandler(schema, partialUpsertStrategies, upsertConfig.getDefaultPartialUpsertStrategy(),
-              _comparisonColumn);
+              _comparisonColumns);
     }
 
     _enableSnapshot = upsertConfig.isEnableSnapshot();
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ComparisonColumns.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ComparisonColumns.java
new file mode 100644
index 0000000000..de223f27d3
--- /dev/null
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ComparisonColumns.java
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.upsert;
+
+
+@SuppressWarnings({"rawtypes", "unchecked"})
+public class ComparisonColumns implements Comparable<ComparisonColumns> {
+  private final Comparable[] _values;
+  private final int _comparableIndex;
+
+  public ComparisonColumns(Comparable[] values, int comparableIndex) {
+    _values = values;
+    _comparableIndex = comparableIndex;
+  }
+
+  public Comparable[] getValues() {
+    return _values;
+  }
+
+  @Override
+  public int compareTo(ComparisonColumns other) {
+    // _comparisonColumns should only at most one non-null comparison value. If not, it is the user's responsibility.
+    // There is no attempt to guarantee behavior in the case where there are multiple non-null values
+    int comparisonResult;
+
+    Comparable comparisonValue = _values[_comparableIndex];
+    Comparable otherComparisonValue = other.getValues()[_comparableIndex];
+
+    if (otherComparisonValue == null) {
+      // Keep this record because the existing record has no value for the same comparison column, therefore the
+      // (lack of) existing value could not possibly cause the new value to be rejected.
+      comparisonResult = 1;
+    } else {
+      comparisonResult = comparisonValue.compareTo(otherComparisonValue);
+    }
+
+    if (comparisonResult >= 0) {
+      // TODO(egalpin):  This method currently may have side-effects on _values. Depending on the comparison result,
+      //  entities from {@param other} may be merged into _values. This really should not be done implicitly as part
+      //  of compareTo, but has been implemented this way to minimize the changes required within all subclasses of
+      //  {@link BasePartitionUpsertMetadataManager}. Ideally, this merge should only be triggered explicitly by
+      //  implementations of {@link BasePartitionUpsertMetadataManager}.
+      for (int i = 0; i < _values.length; i++) {
+        if (i != _comparableIndex) {
+          _values[i] = other._values[i];
+        }
+      }
+    }
+    return comparisonResult;
+  }
+}
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java
index b1687afdf8..f7529a0011 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java
@@ -57,9 +57,9 @@ public class ConcurrentMapPartitionUpsertMetadataManager extends BasePartitionUp
   private final GenericRow _reuse = new GenericRow();
 
   public ConcurrentMapPartitionUpsertMetadataManager(String tableNameWithType, int partitionId,
-      List<String> primaryKeyColumns, String comparisonColumn, HashFunction hashFunction,
+      List<String> primaryKeyColumns, List<String> comparisonColumns, HashFunction hashFunction,
       @Nullable PartialUpsertHandler partialUpsertHandler, boolean enableSnapshot, ServerMetrics serverMetrics) {
-    super(tableNameWithType, partitionId, primaryKeyColumns, comparisonColumn, hashFunction, partialUpsertHandler,
+    super(tableNameWithType, partitionId, primaryKeyColumns, comparisonColumns, hashFunction, partialUpsertHandler,
         enableSnapshot, serverMetrics);
   }
 
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapTableUpsertMetadataManager.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapTableUpsertMetadataManager.java
index 67d6fe773b..cfc6e529a4 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapTableUpsertMetadataManager.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapTableUpsertMetadataManager.java
@@ -36,7 +36,7 @@ public class ConcurrentMapTableUpsertMetadataManager extends BaseTableUpsertMeta
   public ConcurrentMapPartitionUpsertMetadataManager getOrCreatePartitionManager(int partitionId) {
     return _partitionMetadataManagerMap.computeIfAbsent(partitionId,
         k -> new ConcurrentMapPartitionUpsertMetadataManager(_tableNameWithType, k, _primaryKeyColumns,
-            _comparisonColumn, _hashFunction, _partialUpsertHandler, _enableSnapshot, _serverMetrics));
+            _comparisonColumns, _hashFunction, _partialUpsertHandler, _enableSnapshot, _serverMetrics));
   }
 
   @Override
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/PartialUpsertHandler.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/PartialUpsertHandler.java
index 3444a5ac54..f18a95ca00 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/PartialUpsertHandler.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/PartialUpsertHandler.java
@@ -35,13 +35,13 @@ public class PartialUpsertHandler {
   // _column2Mergers maintains the mapping of merge strategies per columns.
   private final Map<String, PartialUpsertMerger> _column2Mergers = new HashMap<>();
   private final PartialUpsertMerger _defaultPartialUpsertMerger;
-  private final String _comparisonColumn;
+  private final List<String> _comparisonColumns;
   private final List<String> _primaryKeyColumns;
 
   public PartialUpsertHandler(Schema schema, Map<String, UpsertConfig.Strategy> partialUpsertStrategies,
-      UpsertConfig.Strategy defaultPartialUpsertStrategy, String comparisonColumn) {
+      UpsertConfig.Strategy defaultPartialUpsertStrategy, List<String> comparisonColumns) {
     _defaultPartialUpsertMerger = PartialUpsertMergerFactory.getMerger(defaultPartialUpsertStrategy);
-    _comparisonColumn = comparisonColumn;
+    _comparisonColumns = comparisonColumns;
     _primaryKeyColumns = schema.getPrimaryKeyColumns();
 
     for (Map.Entry<String, UpsertConfig.Strategy> entry : partialUpsertStrategies.entrySet()) {
@@ -66,7 +66,7 @@ public class PartialUpsertHandler {
    */
   public GenericRow merge(GenericRow previousRecord, GenericRow newRecord) {
     for (String column : previousRecord.getFieldToValueMap().keySet()) {
-      if (!_primaryKeyColumns.contains(column) && !_comparisonColumn.equals(column)) {
+      if (!_primaryKeyColumns.contains(column) && !_comparisonColumns.contains(column)) {
         if (!previousRecord.isNullValue(column)) {
           if (newRecord.isNullValue(column)) {
             newRecord.putValue(column, previousRecord.getValue(column));
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/UpsertUtils.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/UpsertUtils.java
index 8ac5fee8d3..acda88a27b 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/UpsertUtils.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/UpsertUtils.java
@@ -75,19 +75,31 @@ public class UpsertUtils {
     };
   }
 
+  public static RecordInfoReader makeRecordReader(IndexSegment segment, List<String> primaryKeyColumns,
+      List<String> comparisonColumns) {
+    if (comparisonColumns.size() > 1) {
+      return new RecordInfoReader(segment, primaryKeyColumns, comparisonColumns);
+    }
+    return new RecordInfoReader(segment, primaryKeyColumns, comparisonColumns.get(0));
+  }
+
   public static class RecordInfoReader implements Closeable {
     public final PrimaryKeyReader _primaryKeyReader;
-    public final PinotSegmentColumnReader _comparisonColumnReader;
+    public final ComparisonColumnReader _comparisonColumnReader;
+
+    public RecordInfoReader(IndexSegment segment, List<String> primaryKeyColumns, List<String> comparisonColumns) {
+      _primaryKeyReader = new PrimaryKeyReader(segment, primaryKeyColumns);
+      _comparisonColumnReader = new MultiComparisonColumnReader(segment, comparisonColumns);
+    }
 
     public RecordInfoReader(IndexSegment segment, List<String> primaryKeyColumns, String comparisonColumn) {
       _primaryKeyReader = new PrimaryKeyReader(segment, primaryKeyColumns);
-      _comparisonColumnReader = new PinotSegmentColumnReader(segment, comparisonColumn);
+      _comparisonColumnReader = new SingleComparisonColumnReader(segment, comparisonColumn);
     }
 
     public RecordInfo getRecordInfo(int docId) {
       PrimaryKey primaryKey = _primaryKeyReader.getPrimaryKey(docId);
-      Comparable comparisonValue = (Comparable) getValue(_comparisonColumnReader, docId);
-      return new RecordInfo(primaryKey, docId, comparisonValue);
+      return new RecordInfo(primaryKey, docId, _comparisonColumnReader.getComparisonValue(docId));
     }
 
     @Override
@@ -134,8 +146,65 @@ public class UpsertUtils {
     }
   }
 
-  private static Object getValue(PinotSegmentColumnReader columnReader, int docId) {
+  static Object getValue(PinotSegmentColumnReader columnReader, int docId) {
     Object value = columnReader.getValue(docId);
     return value instanceof byte[] ? new ByteArray((byte[]) value) : value;
   }
+
+
+  public interface ComparisonColumnReader extends Closeable {
+    Comparable getComparisonValue(int docId);
+  }
+
+  public static class SingleComparisonColumnReader implements UpsertUtils.ComparisonColumnReader {
+    private final PinotSegmentColumnReader _comparisonColumnReader;
+
+    public SingleComparisonColumnReader(IndexSegment segment, String comparisonColumn) {
+      _comparisonColumnReader = new PinotSegmentColumnReader(segment, comparisonColumn);
+    }
+
+    @Override
+    public Comparable getComparisonValue(int docId) {
+      return (Comparable) _comparisonColumnReader.getValue(docId);
+    }
+
+    @Override
+    public void close()
+        throws IOException {
+      _comparisonColumnReader.close();
+    }
+  }
+
+  public static class MultiComparisonColumnReader implements UpsertUtils.ComparisonColumnReader {
+    private final PinotSegmentColumnReader[] _comparisonColumnReaders;
+
+    public MultiComparisonColumnReader(IndexSegment segment, List<String> comparisonColumns) {
+      _comparisonColumnReaders = new PinotSegmentColumnReader[comparisonColumns.size()];
+      for (int i = 0; i < comparisonColumns.size(); i++) {
+        _comparisonColumnReaders[i] = new PinotSegmentColumnReader(segment, comparisonColumns.get(i));
+      }
+    }
+
+    public Comparable getComparisonValue(int docId) {
+      Comparable[] comparisonColumns = new Comparable[_comparisonColumnReaders.length];
+
+      for (int i = 0; i < _comparisonColumnReaders.length; i++) {
+        PinotSegmentColumnReader columnReader = _comparisonColumnReaders[i];
+        Comparable comparisonValue = (Comparable) UpsertUtils.getValue(columnReader, docId);
+        comparisonColumns[i] = comparisonValue;
+      }
+
+      // Note that the comparable index is negative here to indicate that this instance could be the argument to
+      // ComparisonColumns#compareTo, but should never call compareTo itself.
+      return new ComparisonColumns(comparisonColumns, -1);
+    }
+
+    @Override
+    public void close()
+        throws IOException {
+      for (PinotSegmentColumnReader comparisonColumnReader : _comparisonColumnReaders) {
+        comparisonColumnReader.close();
+      }
+    }
+  }
 }
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
index 987d10de6c..046c0175e7 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
@@ -573,9 +573,11 @@ public final class TableConfigUtils {
           "The upsert table cannot have star-tree index.");
 
       // comparison column exists
-      if (tableConfig.getUpsertConfig().getComparisonColumn() != null) {
-        String comparisonCol = tableConfig.getUpsertConfig().getComparisonColumn();
-        Preconditions.checkState(schema.hasColumn(comparisonCol), "The comparison column does not exist on schema");
+      if (tableConfig.getUpsertConfig().getComparisonColumns() != null) {
+        List<String> comparisonCols = tableConfig.getUpsertConfig().getComparisonColumns();
+        for (String comparisonCol : comparisonCols) {
+          Preconditions.checkState(schema.hasColumn(comparisonCol), "The comparison column does not exist on schema");
+        }
       }
     }
     validateAggregateMetricsForUpsertConfig(tableConfig);
@@ -644,8 +646,8 @@ public final class TableConfigUtils {
       UpsertConfig.Strategy columnStrategy = entry.getValue();
       Preconditions.checkState(!primaryKeyColumns.contains(column), "Merger cannot be applied to primary key columns");
 
-      if (upsertConfig.getComparisonColumn() != null) {
-        Preconditions.checkState(!upsertConfig.getComparisonColumn().equals(column),
+      if (upsertConfig.getComparisonColumns() != null) {
+        Preconditions.checkState(!upsertConfig.getComparisonColumns().contains(column),
             "Merger cannot be applied to comparison column");
       } else {
         Preconditions.checkState(!tableConfig.getValidationConfig().getTimeColumnName().equals(column),
diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/immutable/ImmutableSegmentImplUpsertSnapshotTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/immutable/ImmutableSegmentImplUpsertSnapshotTest.java
index 6c31c3981f..4f3858d55c 100644
--- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/immutable/ImmutableSegmentImplUpsertSnapshotTest.java
+++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/immutable/ImmutableSegmentImplUpsertSnapshotTest.java
@@ -118,7 +118,7 @@ public class ImmutableSegmentImplUpsertSnapshotTest {
     ServerMetrics serverMetrics = Mockito.mock(ServerMetrics.class);
     _partitionUpsertMetadataManager =
         new ConcurrentMapPartitionUpsertMetadataManager("testTable_REALTIME", 0, Collections.singletonList("column6"),
-            "daysSinceEpoch", HashFunction.NONE, null, true, serverMetrics);
+            Collections.singletonList("daysSinceEpoch"), HashFunction.NONE, null, true, serverMetrics);
 
     _immutableSegmentImpl.enableUpsert(_partitionUpsertMetadataManager, new ThreadSafeMutableRoaringBitmap());
   }
diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplTestUtils.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplTestUtils.java
index 4c0a924c81..04a6e09783 100644
--- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplTestUtils.java
+++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplTestUtils.java
@@ -96,7 +96,7 @@ public class MutableSegmentImplTestUtils {
     when(statsHistory.getEstimatedAvgColSize(anyString())).thenReturn(32);
 
     UpsertConfig.Mode upsertMode = upsertConfig == null ? UpsertConfig.Mode.NONE : upsertConfig.getMode();
-    String comparisonColumn = upsertConfig == null ? null : upsertConfig.getComparisonColumn();
+    List<String> comparisonColumns = upsertConfig == null ? null : upsertConfig.getComparisonColumns();
     RealtimeSegmentConfig realtimeSegmentConfig =
         new RealtimeSegmentConfig.Builder().setTableNameWithType(TABLE_NAME_WITH_TYPE).setSegmentName(SEGMENT_NAME)
             .setStreamName(STREAM_NAME).setSchema(schema).setTimeColumnName(timeColumnName).setCapacity(100000)
@@ -105,7 +105,7 @@ public class MutableSegmentImplTestUtils {
             .setSegmentZKMetadata(new SegmentZKMetadata(SEGMENT_NAME))
             .setMemoryManager(new DirectMemoryManager(SEGMENT_NAME)).setStatsHistory(statsHistory)
             .setAggregateMetrics(aggregateMetrics).setNullHandlingEnabled(nullHandlingEnabled).setUpsertMode(upsertMode)
-            .setUpsertComparisonColumn(comparisonColumn)
+            .setUpsertComparisonColumns(comparisonColumns)
             .setPartitionUpsertMetadataManager(partitionUpsertMetadataManager)
             .setPartitionDedupMetadataManager(partitionDedupMetadataManager)
             .setIngestionAggregationConfigs(aggregationConfigs).build();
diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplUpsertTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplUpsertTest.java
index a2439d9f8f..4f5ad8bc6c 100644
--- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplUpsertTest.java
+++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplUpsertTest.java
@@ -20,7 +20,9 @@ package org.apache.pinot.segment.local.indexsegment.mutable;
 
 import java.io.File;
 import java.net.URL;
+import java.util.Arrays;
 import java.util.Collections;
+import java.util.HashMap;
 import org.apache.pinot.common.metrics.ServerMetrics;
 import org.apache.pinot.segment.local.data.manager.TableDataManager;
 import org.apache.pinot.segment.local.recordtransformer.CompositeTransformer;
@@ -52,13 +54,26 @@ public class MutableSegmentImplUpsertTest {
   private MutableSegmentImpl _mutableSegmentImpl;
   private PartitionUpsertMetadataManager _partitionUpsertMetadataManager;
 
-  private void setup(HashFunction hashFunction)
+  private UpsertConfig createPartialUpsertConfig(HashFunction hashFunction) {
+    UpsertConfig upsertConfigWithHash = new UpsertConfig(UpsertConfig.Mode.PARTIAL);
+    upsertConfigWithHash.setPartialUpsertStrategies(new HashMap<>());
+    upsertConfigWithHash.setDefaultPartialUpsertStrategy(UpsertConfig.Strategy.OVERWRITE);
+    upsertConfigWithHash.setComparisonColumns(Arrays.asList("secondsSinceEpoch", "otherComparisonColumn"));
+    upsertConfigWithHash.setHashFunction(hashFunction);
+    return upsertConfigWithHash;
+  }
+
+  private UpsertConfig createFullUpsertConfig(HashFunction hashFunction) {
+    UpsertConfig upsertConfigWithHash = new UpsertConfig(UpsertConfig.Mode.FULL);
+    upsertConfigWithHash.setHashFunction(hashFunction);
+    return upsertConfigWithHash;
+  }
+
+  private void setup(UpsertConfig upsertConfigWithHash)
       throws Exception {
     URL schemaResourceUrl = this.getClass().getClassLoader().getResource(SCHEMA_FILE_PATH);
     URL dataResourceUrl = this.getClass().getClassLoader().getResource(DATA_FILE_PATH);
     _schema = Schema.fromFile(new File(schemaResourceUrl.getFile()));
-    UpsertConfig upsertConfigWithHash = new UpsertConfig(UpsertConfig.Mode.FULL);
-    upsertConfigWithHash.setHashFunction(hashFunction);
     _tableConfig =
         new TableConfigBuilder(TableType.REALTIME).setTableName("testTable").setUpsertConfig(upsertConfigWithHash)
             .build();
@@ -87,18 +102,43 @@ public class MutableSegmentImplUpsertTest {
   @Test
   public void testHashFunctions()
       throws Exception {
-    testUpsertIngestion(HashFunction.NONE);
-    testUpsertIngestion(HashFunction.MD5);
-    testUpsertIngestion(HashFunction.MURMUR3);
+    testUpsertIngestion(createFullUpsertConfig(HashFunction.NONE));
+    testUpsertIngestion(createFullUpsertConfig(HashFunction.MD5));
+    testUpsertIngestion(createFullUpsertConfig(HashFunction.MURMUR3));
   }
 
-  private void testUpsertIngestion(HashFunction hashFunction)
+  @Test
+  public void testMultipleComparisonColumns()
       throws Exception {
-    setup(hashFunction);
+    testUpsertIngestion(createPartialUpsertConfig(HashFunction.NONE));
+    testUpsertIngestion(createPartialUpsertConfig(HashFunction.MD5));
+    testUpsertIngestion(createPartialUpsertConfig(HashFunction.MURMUR3));
+  }
+
+  private void testUpsertIngestion(UpsertConfig upsertConfig)
+      throws Exception {
+    setup(upsertConfig);
     ImmutableRoaringBitmap bitmap = _mutableSegmentImpl.getValidDocIds().getMutableRoaringBitmap();
-    Assert.assertFalse(bitmap.contains(0));
-    Assert.assertTrue(bitmap.contains(1));
-    Assert.assertTrue(bitmap.contains(2));
-    Assert.assertFalse(bitmap.contains(3));
+    if (upsertConfig.getComparisonColumns() == null) {
+      // aa
+      Assert.assertFalse(bitmap.contains(0));
+      Assert.assertTrue(bitmap.contains(1));
+      Assert.assertFalse(bitmap.contains(2));
+      Assert.assertFalse(bitmap.contains(3));
+      // bb
+      Assert.assertFalse(bitmap.contains(4));
+      Assert.assertTrue(bitmap.contains(5));
+      Assert.assertFalse(bitmap.contains(6));
+    } else {
+      // aa
+      Assert.assertFalse(bitmap.contains(0));
+      Assert.assertFalse(bitmap.contains(1));
+      Assert.assertTrue(bitmap.contains(2));
+      Assert.assertFalse(bitmap.contains(3));
+      // bb
+      Assert.assertFalse(bitmap.contains(4));
+      Assert.assertTrue(bitmap.contains(5));
+      Assert.assertFalse(bitmap.contains(6));
+    }
   }
 }
diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerTest.java
index 6b4bf34368..1e3f130059 100644
--- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerTest.java
+++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerTest.java
@@ -73,9 +73,10 @@ public class ConcurrentMapPartitionUpsertMetadataManagerTest {
 
   private void verifyAddReplaceRemoveSegment(HashFunction hashFunction, boolean enableSnapshot)
       throws IOException {
+    String comparisonColumn = "timeCol";
     ConcurrentMapPartitionUpsertMetadataManager upsertMetadataManager =
         new ConcurrentMapPartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 0, Collections.singletonList("pk"),
-            "timeCol", hashFunction, null, false, mock(ServerMetrics.class));
+            Collections.singletonList(comparisonColumn), hashFunction, null, false, mock(ServerMetrics.class));
     Map<Object, RecordLocation> recordLocationMap = upsertMetadataManager._primaryKeyToRecordLocationMap;
 
     // Add the first segment
@@ -217,7 +218,8 @@ public class ConcurrentMapPartitionUpsertMetadataManagerTest {
   private List<RecordInfo> getRecordInfoList(int numRecords, int[] primaryKeys, int[] timestamps) {
     List<RecordInfo> recordInfoList = new ArrayList<>();
     for (int i = 0; i < numRecords; i++) {
-      recordInfoList.add(new RecordInfo(makePrimaryKey(primaryKeys[i]), i, new IntWrapper(timestamps[i])));
+      recordInfoList.add(
+          new RecordInfo(makePrimaryKey(primaryKeys[i]), i, new IntWrapper(timestamps[i])));
     }
     return recordInfoList;
   }
@@ -286,7 +288,8 @@ public class ConcurrentMapPartitionUpsertMetadataManagerTest {
     assertNotNull(recordLocation);
     assertSame(recordLocation.getSegment(), segment);
     assertEquals(recordLocation.getDocId(), docId);
-    assertEquals(((IntWrapper) recordLocation.getComparisonValue())._value, comparisonValue);
+    assertEquals(((IntWrapper) recordLocation.getComparisonValue())._value,
+        comparisonValue);
   }
 
   @Test
@@ -299,9 +302,10 @@ public class ConcurrentMapPartitionUpsertMetadataManagerTest {
 
   private void verifyAddRecord(HashFunction hashFunction)
       throws IOException {
+    String comparisonColumn = "timeCol";
     ConcurrentMapPartitionUpsertMetadataManager upsertMetadataManager =
         new ConcurrentMapPartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 0, Collections.singletonList("pk"),
-            "timeCol", hashFunction, null, false, mock(ServerMetrics.class));
+            Collections.singletonList(comparisonColumn), hashFunction, null, false, mock(ServerMetrics.class));
     Map<Object, RecordLocation> recordLocationMap = upsertMetadataManager._primaryKeyToRecordLocationMap;
 
     // Add the first segment
@@ -329,6 +333,7 @@ public class ConcurrentMapPartitionUpsertMetadataManagerTest {
     assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new int[]{0});
 
     upsertMetadataManager.addRecord(segment2, new RecordInfo(makePrimaryKey(2), 1, new IntWrapper(120)));
+
     // segment1: 0 -> {0, 100}, 1 -> {1, 120}
     // segment2: 2 -> {1, 120}, 3 -> {0, 100}
     checkRecordLocation(recordLocationMap, 0, segment1, 0, 100, hashFunction);
@@ -339,6 +344,7 @@ public class ConcurrentMapPartitionUpsertMetadataManagerTest {
     assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new int[]{0, 1});
 
     upsertMetadataManager.addRecord(segment2, new RecordInfo(makePrimaryKey(1), 2, new IntWrapper(100)));
+
     // segment1: 0 -> {0, 100}, 1 -> {1, 120}
     // segment2: 2 -> {1, 120}, 3 -> {0, 100}
     checkRecordLocation(recordLocationMap, 0, segment1, 0, 100, hashFunction);
@@ -349,6 +355,7 @@ public class ConcurrentMapPartitionUpsertMetadataManagerTest {
     assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new int[]{0, 1});
 
     upsertMetadataManager.addRecord(segment2, new RecordInfo(makePrimaryKey(0), 3, new IntWrapper(100)));
+
     // segment1: 1 -> {1, 120}
     // segment2: 0 -> {3, 100}, 2 -> {1, 120}, 3 -> {0, 100}
     checkRecordLocation(recordLocationMap, 0, segment2, 3, 100, hashFunction);
diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/PartialUpsertHandlerTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/PartialUpsertHandlerTest.java
index 913215c85d..fd335ad41b 100644
--- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/PartialUpsertHandlerTest.java
+++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/PartialUpsertHandlerTest.java
@@ -19,6 +19,7 @@
 package org.apache.pinot.segment.local.upsert;
 
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 import org.apache.pinot.spi.config.table.UpsertConfig;
@@ -43,7 +44,8 @@ public class PartialUpsertHandlerTest {
     Map<String, UpsertConfig.Strategy> partialUpsertStrategies = new HashMap<>();
     partialUpsertStrategies.put("field1", UpsertConfig.Strategy.INCREMENT);
     PartialUpsertHandler handler =
-        new PartialUpsertHandler(schema, partialUpsertStrategies, UpsertConfig.Strategy.OVERWRITE, "hoursSinceEpoch");
+        new PartialUpsertHandler(schema, partialUpsertStrategies, UpsertConfig.Strategy.OVERWRITE,
+            Collections.singletonList("hoursSinceEpoch"));
 
     // both records are null.
     GenericRow previousRecord = new GenericRow();
@@ -98,7 +100,8 @@ public class PartialUpsertHandlerTest {
     Map<String, UpsertConfig.Strategy> partialUpsertStrategies = new HashMap<>();
     partialUpsertStrategies.put("field1", UpsertConfig.Strategy.INCREMENT);
     PartialUpsertHandler handler =
-        new PartialUpsertHandler(schema, partialUpsertStrategies, UpsertConfig.Strategy.OVERWRITE, "hoursSinceEpoch");
+        new PartialUpsertHandler(schema, partialUpsertStrategies, UpsertConfig.Strategy.OVERWRITE,
+            Collections.singletonList("hoursSinceEpoch"));
 
     // previousRecord is null default value, while newRecord is not.
     GenericRow previousRecord = new GenericRow();
diff --git a/pinot-segment-local/src/test/resources/data/test_upsert_data.json b/pinot-segment-local/src/test/resources/data/test_upsert_data.json
index 70fe90b4c1..5407326f9c 100644
--- a/pinot-segment-local/src/test/resources/data/test_upsert_data.json
+++ b/pinot-segment-local/src/test/resources/data/test_upsert_data.json
@@ -4,15 +4,30 @@
     "description" : "first",
     "secondsSinceEpoch": 1567205394
   },
+  {
+    "event_id": "aa",
+    "description" : "update",
+    "secondsSinceEpoch": 1567205397
+  },
+  {
+    "event_id": "aa",
+    "description" : "first arrival, other column",
+    "otherComparisonColumn": 1567205395
+  },
+  {
+    "event_id": "aa",
+    "description" : "late arrival, other column",
+    "otherComparisonColumn": 1567205393
+  },
   {
     "event_id": "bb",
     "description" : "first",
     "secondsSinceEpoch": 1567205396
   },
   {
-    "event_id": "aa",
-    "description" : "update",
-    "secondsSinceEpoch": 1567205397
+    "event_id": "bb",
+    "description" : "duplicate arrival",
+    "secondsSinceEpoch": 1567205396
   },
   {
     "event_id": "bb",
diff --git a/pinot-segment-local/src/test/resources/data/test_upsert_schema.json b/pinot-segment-local/src/test/resources/data/test_upsert_schema.json
index 859c6f70c7..8e6b7d2849 100644
--- a/pinot-segment-local/src/test/resources/data/test_upsert_schema.json
+++ b/pinot-segment-local/src/test/resources/data/test_upsert_schema.json
@@ -8,6 +8,10 @@
     {
       "name": "description",
       "dataType": "STRING"
+    },
+    {
+      "name": "otherComparisonColumn",
+      "dataType": "LONG"
     }
   ],
   "timeFieldSpec": {
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/TableConfig.java b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/TableConfig.java
index a71ddb47f6..bae7b7c798 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/TableConfig.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/TableConfig.java
@@ -359,8 +359,8 @@ public class TableConfig extends BaseJsonConfig {
 
   @JsonIgnore
   @Nullable
-  public String getUpsertComparisonColumn() {
-    return _upsertConfig == null ? null : _upsertConfig.getComparisonColumn();
+  public List<String> getUpsertComparisonColumns() {
+    return _upsertConfig == null ? null : _upsertConfig.getComparisonColumns();
   }
 
   @JsonProperty(TUNER_CONFIG_LIST_KEY)
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/UpsertConfig.java b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/UpsertConfig.java
index 0c06d9e07d..68b532dc83 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/UpsertConfig.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/UpsertConfig.java
@@ -18,12 +18,12 @@
  */
 package org.apache.pinot.spi.config.table;
 
-import com.fasterxml.jackson.annotation.JsonProperty;
 import com.fasterxml.jackson.annotation.JsonPropertyDescription;
-import com.google.common.base.Preconditions;
-import java.util.HashMap;
+import java.util.Collections;
+import java.util.List;
 import java.util.Map;
 import javax.annotation.Nullable;
+import org.apache.commons.collections.CollectionUtils;
 import org.apache.pinot.spi.config.BaseJsonConfig;
 
 
@@ -51,8 +51,8 @@ public class UpsertConfig extends BaseJsonConfig {
   @JsonPropertyDescription("default upsert strategy for partial mode")
   private Strategy _defaultPartialUpsertStrategy;
 
-  @JsonPropertyDescription("Column for upsert comparison, default to time column")
-  private String _comparisonColumn;
+  @JsonPropertyDescription("Columns for upsert comparison, default to time column")
+  private List<String> _comparisonColumns;
 
   @JsonPropertyDescription("Whether to use snapshot for fast upsert metadata recovery")
   private boolean _enableSnapshot;
@@ -63,28 +63,6 @@ public class UpsertConfig extends BaseJsonConfig {
   @JsonPropertyDescription("Custom configs for upsert metadata manager")
   private Map<String, String> _metadataManagerConfigs;
 
-  @Deprecated
-  public UpsertConfig(@JsonProperty(value = "mode", required = true) Mode mode,
-      @JsonProperty("partialUpsertStrategies") @Nullable Map<String, Strategy> partialUpsertStrategies,
-      @JsonProperty("defaultPartialUpsertStrategy") @Nullable Strategy defaultPartialUpsertStrategy,
-      @JsonProperty("comparisonColumn") @Nullable String comparisonColumn,
-      @JsonProperty("hashFunction") @Nullable HashFunction hashFunction) {
-    Preconditions.checkArgument(mode != null, "Upsert mode must be configured");
-    _mode = mode;
-
-    if (mode == Mode.PARTIAL) {
-      _partialUpsertStrategies = partialUpsertStrategies != null ? partialUpsertStrategies : new HashMap<>();
-      _defaultPartialUpsertStrategy =
-          defaultPartialUpsertStrategy != null ? defaultPartialUpsertStrategy : Strategy.OVERWRITE;
-    } else {
-      _partialUpsertStrategies = null;
-      _defaultPartialUpsertStrategy = null;
-    }
-
-    _comparisonColumn = comparisonColumn;
-    _hashFunction = hashFunction == null ? HashFunction.NONE : hashFunction;
-  }
-
   public UpsertConfig(Mode mode) {
     _mode = mode;
   }
@@ -97,6 +75,10 @@ public class UpsertConfig extends BaseJsonConfig {
     return _mode;
   }
 
+  public void setMode(Mode mode) {
+    _mode = mode;
+  }
+
   public HashFunction getHashFunction() {
     return _hashFunction;
   }
@@ -110,8 +92,8 @@ public class UpsertConfig extends BaseJsonConfig {
     return _defaultPartialUpsertStrategy;
   }
 
-  public String getComparisonColumn() {
-    return _comparisonColumn;
+  public List<String> getComparisonColumns() {
+    return _comparisonColumns;
   }
 
   public boolean isEnableSnapshot() {
@@ -154,10 +136,22 @@ public class UpsertConfig extends BaseJsonConfig {
    * same primary key, the record with the larger value of the time column is picked as the
    * latest update.
    * However, there are cases when users need to use another column to determine the order.
-   * In such case, you can use option comparisonColumn to override the column used for comparison.
+   * In such case, you can use option comparisonColumn to override the column used for comparison. When using
+   * multiple comparison columns, typically in the case of partial upserts, it is expected that input documents will
+   * each only have a singular non-null comparisonColumn. Multiple non-null values in an input document _will_ result
+   * in undefined behaviour. Typically, one comparisonColumn is allocated per distinct producer application of data
+   * in the case where there are multiple producers sinking to the same table.
    */
+  public void setComparisonColumns(List<String> comparisonColumns) {
+    if (CollectionUtils.isNotEmpty(comparisonColumns)) {
+      _comparisonColumns = comparisonColumns;
+    }
+  }
+
   public void setComparisonColumn(String comparisonColumn) {
-    _comparisonColumn = comparisonColumn;
+    if (comparisonColumn != null) {
+      _comparisonColumns = Collections.singletonList(comparisonColumn);
+    }
   }
 
   public void setEnableSnapshot(boolean enableSnapshot) {
diff --git a/pinot-spi/src/test/java/org/apache/pinot/spi/config/table/UpsertConfigTest.java b/pinot-spi/src/test/java/org/apache/pinot/spi/config/table/UpsertConfigTest.java
index df5875fe33..1311de9d41 100644
--- a/pinot-spi/src/test/java/org/apache/pinot/spi/config/table/UpsertConfigTest.java
+++ b/pinot-spi/src/test/java/org/apache/pinot/spi/config/table/UpsertConfigTest.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pinot.spi.config.table;
 
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 import org.testng.annotations.Test;
@@ -33,7 +34,7 @@ public class UpsertConfigTest {
     assertEquals(upsertConfig1.getMode(), UpsertConfig.Mode.FULL);
 
     upsertConfig1.setComparisonColumn("comparison");
-    assertEquals(upsertConfig1.getComparisonColumn(), "comparison");
+    assertEquals(upsertConfig1.getComparisonColumns(), Collections.singletonList("comparison"));
 
     upsertConfig1.setHashFunction(HashFunction.MURMUR3);
     assertEquals(upsertConfig1.getHashFunction(), HashFunction.MURMUR3);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org