You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2023/11/17 21:33:07 UTC

(pinot) branch master updated: Add upsert config - outOfOrderRecordColumn to track out-of-order events (#11877)

This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new c1fdb66ae8 Add upsert config - outOfOrderRecordColumn to track out-of-order events (#11877)
c1fdb66ae8 is described below

commit c1fdb66ae8f8a0127e90f029739abc23229abe11
Author: Pratik Tibrewal <ti...@uber.com>
AuthorDate: Sat Nov 18 03:03:01 2023 +0530

    Add upsert config - outOfOrderRecordColumn to track out-of-order events (#11877)
---
 .../realtime/RealtimeSegmentDataManager.java       |  2 +
 ...adataAndDictionaryAggregationPlanMakerTest.java |  4 +-
 .../indexsegment/mutable/MutableSegmentImpl.java   | 12 +++-
 .../local/realtime/impl/RealtimeSegmentConfig.java | 28 +++++++-
 .../upsert/BasePartitionUpsertMetadataManager.java |  8 ++-
 .../upsert/BaseTableUpsertMetadataManager.java     |  2 -
 ...oncurrentMapPartitionUpsertMetadataManager.java | 19 +++---
 .../ConcurrentMapTableUpsertMetadataManager.java   |  2 +-
 .../segment/local/utils/TableConfigUtils.java      | 12 ++++
 .../mutable/MutableSegmentImplTestUtils.java       |  6 +-
 .../MutableSegmentImplUpsertComparisonColTest.java | 78 +++++++++++++++++++---
 ...rrentMapPartitionUpsertMetadataManagerTest.java | 51 +++++++-------
 .../segment/local/utils/TableConfigUtilsTest.java  | 43 ++++++++++++
 .../data/test_upsert_comparison_col_schema.json    |  4 ++
 .../apache/pinot/spi/config/table/TableConfig.java | 11 +++
 .../pinot/spi/config/table/UpsertConfig.java       | 16 ++++-
 .../org/apache/pinot/spi/utils/BooleanUtils.java   | 11 +++
 17 files changed, 253 insertions(+), 56 deletions(-)

diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
index 087f8c2cd2..3b66bc97fc 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
@@ -1438,6 +1438,8 @@ public class RealtimeSegmentDataManager extends SegmentDataManager {
             .setPartitionDedupMetadataManager(partitionDedupMetadataManager)
             .setUpsertComparisonColumns(tableConfig.getUpsertComparisonColumns())
             .setUpsertDeleteRecordColumn(tableConfig.getUpsertDeleteRecordColumn())
+            .setUpsertOutOfOrderRecordColumn(tableConfig.getOutOfOrderRecordColumn())
+            .setUpsertDropOutOfOrderRecord(tableConfig.isDropOutOfOrderRecord())
             .setFieldConfigList(tableConfig.getFieldConfigList());
 
     // Create message decoder
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/plan/maker/MetadataAndDictionaryAggregationPlanMakerTest.java b/pinot-core/src/test/java/org/apache/pinot/core/plan/maker/MetadataAndDictionaryAggregationPlanMakerTest.java
index 9db6b12632..807ef5f9a0 100644
--- a/pinot-core/src/test/java/org/apache/pinot/core/plan/maker/MetadataAndDictionaryAggregationPlanMakerTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/core/plan/maker/MetadataAndDictionaryAggregationPlanMakerTest.java
@@ -131,8 +131,8 @@ public class MetadataAndDictionaryAggregationPlanMakerTest {
     _upsertIndexSegment = ImmutableSegmentLoader.load(new File(INDEX_DIR, SEGMENT_NAME), ReadMode.heap);
     ((ImmutableSegmentImpl) _upsertIndexSegment).enableUpsert(
         new ConcurrentMapPartitionUpsertMetadataManager("testTable_REALTIME", 0, Collections.singletonList("column6"),
-            Collections.singletonList("daysSinceEpoch"), null, HashFunction.NONE, null, false, false, 0, INDEX_DIR,
-            serverMetrics), new ThreadSafeMutableRoaringBitmap(), null);
+            Collections.singletonList("daysSinceEpoch"), null, HashFunction.NONE, null,
+            false, 0, INDEX_DIR, serverMetrics), new ThreadSafeMutableRoaringBitmap(), null);
   }
 
   @AfterClass
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java
index 7adeae3d7b..c47d420c82 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java
@@ -165,6 +165,8 @@ public class MutableSegmentImpl implements MutableSegment {
   private final PartitionUpsertMetadataManager _partitionUpsertMetadataManager;
   private final List<String> _upsertComparisonColumns;
   private final String _deleteRecordColumn;
+  private final String _upsertOutOfOrderRecordColumn;
+  private final boolean _upsertDropOutOfOrderRecord;
   // The valid doc ids are maintained locally instead of in the upsert metadata manager because:
   // 1. There is only one consuming segment per partition, the committed segments do not need to modify the valid doc
   //    ids for the consuming segment.
@@ -381,6 +383,8 @@ public class MutableSegmentImpl implements MutableSegment {
       _upsertComparisonColumns =
           upsertComparisonColumns != null ? upsertComparisonColumns : Collections.singletonList(_timeColumnName);
       _deleteRecordColumn = config.getUpsertDeleteRecordColumn();
+      _upsertOutOfOrderRecordColumn = config.getUpsertOutOfOrderRecordColumn();
+      _upsertDropOutOfOrderRecord = config.isUpsertDropOutOfOrderRecord();
       _validDocIds = new ThreadSafeMutableRoaringBitmap();
       if (_deleteRecordColumn != null) {
         _queryableDocIds = new ThreadSafeMutableRoaringBitmap();
@@ -392,6 +396,8 @@ public class MutableSegmentImpl implements MutableSegment {
       _deleteRecordColumn = null;
       _validDocIds = null;
       _queryableDocIds = null;
+      _upsertOutOfOrderRecordColumn = null;
+      _upsertDropOutOfOrderRecord = false;
     }
   }
 
@@ -496,7 +502,11 @@ public class MutableSegmentImpl implements MutableSegment {
       // segment indexing or addNewRow call errors out in those scenario, there can be metadata inconsistency where
       // a key is pointing to some other key's docID
       // TODO fix this metadata mismatch scenario
-      if (_partitionUpsertMetadataManager.addRecord(this, recordInfo)) {
+      boolean isOutOfOrderRecord = !_partitionUpsertMetadataManager.addRecord(this, recordInfo);
+      if (_upsertOutOfOrderRecordColumn != null) {
+        updatedRow.putValue(_upsertOutOfOrderRecordColumn, BooleanUtils.toInt(isOutOfOrderRecord));
+      }
+      if (!isOutOfOrderRecord || !_upsertDropOutOfOrderRecord) {
         updateDictionary(updatedRow);
         addNewRow(numDocsIndexed, updatedRow);
         // Update number of documents indexed before handling the upsert metadata so that the record becomes queryable
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/RealtimeSegmentConfig.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/RealtimeSegmentConfig.java
index 2ad4a177d4..a95a68893c 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/RealtimeSegmentConfig.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/RealtimeSegmentConfig.java
@@ -63,6 +63,8 @@ public class RealtimeSegmentConfig {
   private final UpsertConfig.Mode _upsertMode;
   private final List<String> _upsertComparisonColumns;
   private final String _upsertDeleteRecordColumn;
+  private final String _upsertOutOfOrderRecordColumn;
+  private final boolean _upsertDropOutOfOrderRecord;
   private final PartitionUpsertMetadataManager _partitionUpsertMetadataManager;
   private final PartitionDedupMetadataManager _partitionDedupMetadataManager;
   private final String _consumerDir;
@@ -76,7 +78,8 @@ public class RealtimeSegmentConfig {
       PinotDataBufferMemoryManager memoryManager, RealtimeSegmentStatsHistory statsHistory, String partitionColumn,
       PartitionFunction partitionFunction, int partitionId, boolean aggregateMetrics, boolean nullHandlingEnabled,
       String consumerDir, UpsertConfig.Mode upsertMode, List<String> upsertComparisonColumns,
-      String upsertDeleteRecordColumn, PartitionUpsertMetadataManager partitionUpsertMetadataManager,
+      String upsertDeleteRecordColumn, String upsertOutOfOrderRecordColumn, boolean upsertDropOutOfOrderRecord,
+      PartitionUpsertMetadataManager partitionUpsertMetadataManager,
       PartitionDedupMetadataManager partitionDedupMetadataManager, List<FieldConfig> fieldConfigList,
       List<AggregationConfig> ingestionAggregationConfigs) {
     _tableNameWithType = tableNameWithType;
@@ -100,6 +103,8 @@ public class RealtimeSegmentConfig {
     _upsertMode = upsertMode != null ? upsertMode : UpsertConfig.Mode.NONE;
     _upsertComparisonColumns = upsertComparisonColumns;
     _upsertDeleteRecordColumn = upsertDeleteRecordColumn;
+    _upsertOutOfOrderRecordColumn = upsertOutOfOrderRecordColumn;
+    _upsertDropOutOfOrderRecord = upsertDropOutOfOrderRecord;
     _partitionUpsertMetadataManager = partitionUpsertMetadataManager;
     _partitionDedupMetadataManager = partitionDedupMetadataManager;
     _fieldConfigList = fieldConfigList;
@@ -195,6 +200,14 @@ public class RealtimeSegmentConfig {
     return _upsertDeleteRecordColumn;
   }
 
+  public String getUpsertOutOfOrderRecordColumn() {
+    return _upsertOutOfOrderRecordColumn;
+  }
+
+  public boolean isUpsertDropOutOfOrderRecord() {
+    return _upsertDropOutOfOrderRecord;
+  }
+
   public PartitionUpsertMetadataManager getPartitionUpsertMetadataManager() {
     return _partitionUpsertMetadataManager;
   }
@@ -233,6 +246,8 @@ public class RealtimeSegmentConfig {
     private UpsertConfig.Mode _upsertMode;
     private List<String> _upsertComparisonColumns;
     private String _upsertDeleteRecordColumn;
+    private String _upsertOutOfOrderRecordColumn;
+    private boolean _upsertDropOutOfOrderRecord;
     private PartitionUpsertMetadataManager _partitionUpsertMetadataManager;
     private PartitionDedupMetadataManager _partitionDedupMetadataManager;
     private List<FieldConfig> _fieldConfigList;
@@ -373,6 +388,16 @@ public class RealtimeSegmentConfig {
       return this;
     }
 
+    public Builder setUpsertOutOfOrderRecordColumn(String upsertOutOfOrderRecordColumn) {
+      _upsertOutOfOrderRecordColumn = upsertOutOfOrderRecordColumn;
+      return this;
+    }
+
+    public Builder setUpsertDropOutOfOrderRecord(boolean upsertDropOutOfOrderRecord) {
+      _upsertDropOutOfOrderRecord = upsertDropOutOfOrderRecord;
+      return this;
+    }
+
     public Builder setPartitionUpsertMetadataManager(PartitionUpsertMetadataManager partitionUpsertMetadataManager) {
       _partitionUpsertMetadataManager = partitionUpsertMetadataManager;
       return this;
@@ -403,6 +428,7 @@ public class RealtimeSegmentConfig {
           _capacity, _avgNumMultiValues, Collections.unmodifiableMap(indexConfigByCol), _segmentZKMetadata, _offHeap,
           _memoryManager, _statsHistory, _partitionColumn, _partitionFunction, _partitionId, _aggregateMetrics,
           _nullHandlingEnabled, _consumerDir, _upsertMode, _upsertComparisonColumns, _upsertDeleteRecordColumn,
+          _upsertOutOfOrderRecordColumn, _upsertDropOutOfOrderRecord,
           _partitionUpsertMetadataManager, _partitionDedupMetadataManager, _fieldConfigList,
           _ingestionAggregationConfigs);
     }
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java
index 7d3de0346e..0e54d399bf 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java
@@ -71,7 +71,6 @@ public abstract class BasePartitionUpsertMetadataManager implements PartitionUps
   protected final PartialUpsertHandler _partialUpsertHandler;
   protected final boolean _enableSnapshot;
   protected final double _metadataTTL;
-  protected final boolean _dropOutOfOrderRecord;
   protected final File _tableIndexDir;
   protected final ServerMetrics _serverMetrics;
   protected final Logger _logger;
@@ -99,7 +98,7 @@ public abstract class BasePartitionUpsertMetadataManager implements PartitionUps
   protected BasePartitionUpsertMetadataManager(String tableNameWithType, int partitionId,
       List<String> primaryKeyColumns, List<String> comparisonColumns, @Nullable String deleteRecordColumn,
       HashFunction hashFunction, @Nullable PartialUpsertHandler partialUpsertHandler, boolean enableSnapshot,
-      boolean dropOutOfOrderRecord, double metadataTTL, File tableIndexDir, ServerMetrics serverMetrics) {
+      double metadataTTL, File tableIndexDir, ServerMetrics serverMetrics) {
     _tableNameWithType = tableNameWithType;
     _partitionId = partitionId;
     _primaryKeyColumns = primaryKeyColumns;
@@ -109,7 +108,6 @@ public abstract class BasePartitionUpsertMetadataManager implements PartitionUps
     _partialUpsertHandler = partialUpsertHandler;
     _enableSnapshot = enableSnapshot;
     _metadataTTL = metadataTTL;
-    _dropOutOfOrderRecord = dropOutOfOrderRecord;
     _tableIndexDir = tableIndexDir;
     _snapshotLock = enableSnapshot ? new ReentrantReadWriteLock() : null;
     _serverMetrics = serverMetrics;
@@ -362,6 +360,10 @@ public abstract class BasePartitionUpsertMetadataManager implements PartitionUps
     }
   }
 
+  /**
+   Returns {@code true} when the record is added to the upsert metadata manager,
+   {@code false} when the record is out-of-order thus not added.
+   */
   protected abstract boolean doAddRecord(MutableSegment segment, RecordInfo recordInfo);
 
   @Override
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BaseTableUpsertMetadataManager.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BaseTableUpsertMetadataManager.java
index 7b6c66c0ae..bc783480b9 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BaseTableUpsertMetadataManager.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BaseTableUpsertMetadataManager.java
@@ -73,7 +73,6 @@ public abstract class BaseTableUpsertMetadataManager implements TableUpsertMetad
   protected ServerMetrics _serverMetrics;
   protected HelixManager _helixManager;
   protected ExecutorService _segmentPreloadExecutor;
-  protected boolean _dropOutOfOrderRecord;
 
   private volatile boolean _isPreloading = false;
 
@@ -112,7 +111,6 @@ public abstract class BaseTableUpsertMetadataManager implements TableUpsertMetad
 
     _enableSnapshot = upsertConfig.isEnableSnapshot();
     _metadataTTL = upsertConfig.getMetadataTTL();
-    _dropOutOfOrderRecord = upsertConfig.isDropOutOfOrderRecord();
     _tableIndexDir = tableDataManager.getTableDataDir();
     _serverMetrics = serverMetrics;
     _helixManager = helixManager;
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java
index 47f2fa2a96..435bab20d1 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java
@@ -61,9 +61,9 @@ public class ConcurrentMapPartitionUpsertMetadataManager extends BasePartitionUp
   public ConcurrentMapPartitionUpsertMetadataManager(String tableNameWithType, int partitionId,
       List<String> primaryKeyColumns, List<String> comparisonColumns, @Nullable String deleteRecordColumn,
       HashFunction hashFunction, @Nullable PartialUpsertHandler partialUpsertHandler, boolean enableSnapshot,
-      boolean dropOutOfOrderRecord, double metadataTTL, File tableIndexDir, ServerMetrics serverMetrics) {
-    super(tableNameWithType, partitionId, primaryKeyColumns, comparisonColumns, deleteRecordColumn, hashFunction,
-        partialUpsertHandler, enableSnapshot, dropOutOfOrderRecord, metadataTTL, tableIndexDir, serverMetrics);
+      double metadataTTL, File tableIndexDir, ServerMetrics serverMetrics) {
+    super(tableNameWithType, partitionId, primaryKeyColumns, comparisonColumns, deleteRecordColumn,
+        hashFunction, partialUpsertHandler, enableSnapshot, metadataTTL, tableIndexDir, serverMetrics);
   }
 
   @Override
@@ -240,9 +240,13 @@ public class ConcurrentMapPartitionUpsertMetadataManager extends BasePartitionUp
     persistWatermark(_largestSeenComparisonValue);
   }
 
+  /**
+   Returns {@code true} when the record is added to the upsert metadata manager,
+   {@code false} when the record is out-of-order thus not added.
+   */
   @Override
   protected boolean doAddRecord(MutableSegment segment, RecordInfo recordInfo) {
-    AtomicBoolean shouldDropRecord = new AtomicBoolean(false);
+    AtomicBoolean isOutOfOrderRecord = new AtomicBoolean(false);
     ThreadSafeMutableRoaringBitmap validDocIds = Objects.requireNonNull(segment.getValidDocIds());
     ThreadSafeMutableRoaringBitmap queryableDocIds = segment.getQueryableDocIds();
     int newDocId = recordInfo.getDocId();
@@ -273,9 +277,8 @@ public class ConcurrentMapPartitionUpsertMetadataManager extends BasePartitionUp
               return new RecordLocation(segment, newDocId, newComparisonValue);
             } else {
               handleOutOfOrderEvent(currentRecordLocation.getComparisonValue(), recordInfo.getComparisonValue());
-              // this is a out-of-order record, if upsert config _dropOutOfOrderRecord is true, then set
-              // shouldDropRecord to true. This method returns inverse of this value
-              shouldDropRecord.set(_dropOutOfOrderRecord);
+              // this is a out-of-order record then set value to true - this indicates whether out-of-order or not
+              isOutOfOrderRecord.set(true);
               return currentRecordLocation;
             }
           } else {
@@ -288,7 +291,7 @@ public class ConcurrentMapPartitionUpsertMetadataManager extends BasePartitionUp
     // Update metrics
     _serverMetrics.setValueOfPartitionGauge(_tableNameWithType, _partitionId, ServerGauge.UPSERT_PRIMARY_KEYS_COUNT,
         _primaryKeyToRecordLocationMap.size());
-    return !shouldDropRecord.get();
+    return !isOutOfOrderRecord.get();
   }
 
   @Override
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapTableUpsertMetadataManager.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapTableUpsertMetadataManager.java
index 55db795883..3380203656 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapTableUpsertMetadataManager.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapTableUpsertMetadataManager.java
@@ -37,7 +37,7 @@ public class ConcurrentMapTableUpsertMetadataManager extends BaseTableUpsertMeta
     return _partitionMetadataManagerMap.computeIfAbsent(partitionId,
         k -> new ConcurrentMapPartitionUpsertMetadataManager(_tableNameWithType, k, _primaryKeyColumns,
             _comparisonColumns, _deleteRecordColumn, _hashFunction, _partialUpsertHandler,
-            _enableSnapshot, _dropOutOfOrderRecord, _metadataTTL, _tableIndexDir, _serverMetrics));
+            _enableSnapshot, _metadataTTL, _tableIndexDir, _serverMetrics));
   }
 
   @Override
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
index f6d9ea957b..7605925432 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
@@ -723,6 +723,18 @@ public final class TableConfigUtils {
             fieldSpec != null && fieldSpec.isSingleValueField() && fieldSpec.getDataType() == DataType.BOOLEAN,
             "The delete record column must be a single-valued BOOLEAN column");
       }
+
+      String outOfOrderRecordColumn = upsertConfig.getOutOfOrderRecordColumn();
+      Preconditions.checkState(
+          outOfOrderRecordColumn == null || !upsertConfig.isDropOutOfOrderRecord(),
+          "outOfOrderRecordColumn and dropOutOfOrderRecord shouldn't exist together for upsert table");
+
+      if (outOfOrderRecordColumn != null) {
+        FieldSpec fieldSpec = schema.getFieldSpecFor(outOfOrderRecordColumn);
+        Preconditions.checkState(
+            fieldSpec != null && fieldSpec.isSingleValueField() && fieldSpec.getDataType() == DataType.BOOLEAN,
+            "The outOfOrderRecordColumn must be a single-valued BOOLEAN column");
+      }
     }
 
     Preconditions.checkState(
diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplTestUtils.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplTestUtils.java
index 10270e5b6f..ade22fcad6 100644
--- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplTestUtils.java
+++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplTestUtils.java
@@ -100,6 +100,8 @@ public class MutableSegmentImplTestUtils {
 
     UpsertConfig.Mode upsertMode = upsertConfig == null ? UpsertConfig.Mode.NONE : upsertConfig.getMode();
     List<String> comparisonColumns = upsertConfig == null ? null : upsertConfig.getComparisonColumns();
+    boolean isUpsertDropOutOfOrderRecord = upsertConfig == null ? false : upsertConfig.isDropOutOfOrderRecord();
+    String upsertOutOfOrderRecordColumn = upsertConfig == null ? null : upsertConfig.getOutOfOrderRecordColumn();
     DictionaryIndexConfig varLengthDictConf = new DictionaryIndexConfig(false, true);
     RealtimeSegmentConfig.Builder segmentConfBuilder = new RealtimeSegmentConfig.Builder()
         .setTableNameWithType(TABLE_NAME_WITH_TYPE).setSegmentName(SEGMENT_NAME)
@@ -114,7 +116,9 @@ public class MutableSegmentImplTestUtils {
         .setUpsertComparisonColumns(comparisonColumns)
         .setPartitionUpsertMetadataManager(partitionUpsertMetadataManager)
         .setPartitionDedupMetadataManager(partitionDedupMetadataManager)
-        .setIngestionAggregationConfigs(aggregationConfigs);
+        .setIngestionAggregationConfigs(aggregationConfigs)
+        .setUpsertDropOutOfOrderRecord(isUpsertDropOutOfOrderRecord)
+        .setUpsertOutOfOrderRecordColumn(upsertOutOfOrderRecordColumn);
     for (Map.Entry<String, JsonIndexConfig> entry : jsonIndexConfigs.entrySet()) {
       segmentConfBuilder.setIndex(entry.getKey(), StandardIndexes.json(), entry.getValue());
     }
diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplUpsertComparisonColTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplUpsertComparisonColTest.java
index aae9b9cec6..fe17e40dc2 100644
--- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplUpsertComparisonColTest.java
+++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplUpsertComparisonColTest.java
@@ -29,6 +29,7 @@ import org.apache.pinot.segment.local.recordtransformer.CompositeTransformer;
 import org.apache.pinot.segment.local.upsert.PartitionUpsertMetadataManager;
 import org.apache.pinot.segment.local.upsert.TableUpsertMetadataManager;
 import org.apache.pinot.segment.local.upsert.TableUpsertMetadataManagerFactory;
+import org.apache.pinot.spi.config.table.HashFunction;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.config.table.TableType;
 import org.apache.pinot.spi.config.table.UpsertConfig;
@@ -37,10 +38,10 @@ import org.apache.pinot.spi.data.readers.FileFormat;
 import org.apache.pinot.spi.data.readers.GenericRow;
 import org.apache.pinot.spi.data.readers.RecordReader;
 import org.apache.pinot.spi.data.readers.RecordReaderFactory;
+import org.apache.pinot.spi.utils.BooleanUtils;
 import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
 import org.roaringbitmap.buffer.ImmutableRoaringBitmap;
 import org.testng.Assert;
-import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
 import static org.mockito.Mockito.mock;
@@ -55,16 +56,20 @@ public class MutableSegmentImplUpsertComparisonColTest {
   private static MutableSegmentImpl _mutableSegmentImpl;
   private static PartitionUpsertMetadataManager _partitionUpsertMetadataManager;
 
-  @BeforeClass
-  public void setup()
+  private UpsertConfig createFullUpsertConfig(HashFunction hashFunction) {
+    UpsertConfig upsertConfigWithHash = new UpsertConfig(UpsertConfig.Mode.FULL);
+    upsertConfigWithHash.setHashFunction(hashFunction);
+    upsertConfigWithHash.setComparisonColumn("offset");
+    return upsertConfigWithHash;
+  }
+
+  public void setup(UpsertConfig upsertConfig)
       throws Exception {
     URL schemaResourceUrl = this.getClass().getClassLoader().getResource(SCHEMA_FILE_PATH);
     URL dataResourceUrl = this.getClass().getClassLoader().getResource(DATA_FILE_PATH);
     _schema = Schema.fromFile(new File(schemaResourceUrl.getFile()));
-    UpsertConfig offsetUpsertConfig = new UpsertConfig(UpsertConfig.Mode.FULL);
-    offsetUpsertConfig.setComparisonColumn("offset");
     _tableConfig =
-        new TableConfigBuilder(TableType.REALTIME).setTableName("testTable").setUpsertConfig(offsetUpsertConfig)
+        new TableConfigBuilder(TableType.REALTIME).setTableName("testTable").setUpsertConfig(upsertConfig)
             .build();
     _recordTransformer = CompositeTransformer.getDefaultTransformer(_tableConfig, _schema);
     File jsonFile = new File(dataResourceUrl.getFile());
@@ -74,7 +79,7 @@ public class MutableSegmentImplUpsertComparisonColTest {
     _partitionUpsertMetadataManager = tableUpsertMetadataManager.getOrCreatePartitionManager(0);
     _mutableSegmentImpl =
         MutableSegmentImplTestUtils.createMutableSegmentImpl(_schema, Collections.emptySet(), Collections.emptySet(),
-            Collections.emptySet(), false, true, offsetUpsertConfig, "secondsSinceEpoch",
+            Collections.emptySet(), false, true, upsertConfig, "secondsSinceEpoch",
             _partitionUpsertMetadataManager, null);
     GenericRow reuse = new GenericRow();
     try (RecordReader recordReader = RecordReaderFactory.getRecordReader(FileFormat.JSON, jsonFile,
@@ -89,12 +94,69 @@ public class MutableSegmentImplUpsertComparisonColTest {
   }
 
   @Test
-  public void testUpsertIngestion() {
+  public void testHashFunctions()
+      throws Exception {
+    testUpsertIngestion(createFullUpsertConfig(HashFunction.NONE));
+    testUpsertIngestion(createFullUpsertConfig(HashFunction.MD5));
+    testUpsertIngestion(createFullUpsertConfig(HashFunction.MURMUR3));
+  }
+
+  @Test
+  public void testUpsertDropOutOfOrderRecord()
+      throws Exception {
+    testUpsertDropOfOrderRecordIngestion(createFullUpsertConfig(HashFunction.NONE));
+    testUpsertDropOfOrderRecordIngestion(createFullUpsertConfig(HashFunction.MD5));
+    testUpsertDropOfOrderRecordIngestion(createFullUpsertConfig(HashFunction.MURMUR3));
+  }
+
+  @Test
+  public void testUpsertOutOfOrderRecordColumn()
+      throws Exception {
+    testUpsertOutOfOrderRecordColumnIngestion(createFullUpsertConfig(HashFunction.NONE));
+    testUpsertOutOfOrderRecordColumnIngestion(createFullUpsertConfig(HashFunction.MD5));
+    testUpsertOutOfOrderRecordColumnIngestion(createFullUpsertConfig(HashFunction.MURMUR3));
+  }
+
+  public void testUpsertIngestion(UpsertConfig upsertConfig)
+      throws Exception {
+    setup(upsertConfig);
+    ImmutableRoaringBitmap bitmap = _mutableSegmentImpl.getValidDocIds().getMutableRoaringBitmap();
+    // note offset column is used for determining sequence but not time column
+    Assert.assertEquals(_mutableSegmentImpl.getNumDocsIndexed(), 4);
+    Assert.assertFalse(bitmap.contains(0));
+    Assert.assertTrue(bitmap.contains(1));
+    Assert.assertTrue(bitmap.contains(2));
+    Assert.assertFalse(bitmap.contains(3));
+  }
+
+  public void testUpsertDropOfOrderRecordIngestion(UpsertConfig upsertConfig)
+      throws Exception {
+    upsertConfig.setDropOutOfOrderRecord(true);
+    setup(upsertConfig);
+    ImmutableRoaringBitmap bitmap = _mutableSegmentImpl.getValidDocIds().getMutableRoaringBitmap();
+    // note offset column is used for determining sequence but not time column
+    Assert.assertEquals(_mutableSegmentImpl.getNumDocsIndexed(), 3);
+    Assert.assertFalse(bitmap.contains(0));
+    Assert.assertTrue(bitmap.contains(1));
+    Assert.assertTrue(bitmap.contains(2));
+  }
+
+  public void testUpsertOutOfOrderRecordColumnIngestion(UpsertConfig upsertConfig)
+      throws Exception {
+    String outOfOrderRecordColumn = "outOfOrderRecordColumn";
+    upsertConfig.setOutOfOrderRecordColumn(outOfOrderRecordColumn);
+    setup(upsertConfig);
     ImmutableRoaringBitmap bitmap = _mutableSegmentImpl.getValidDocIds().getMutableRoaringBitmap();
     // note offset column is used for determining sequence but not time column
+    Assert.assertEquals(_mutableSegmentImpl.getNumDocsIndexed(), 4);
     Assert.assertFalse(bitmap.contains(0));
     Assert.assertTrue(bitmap.contains(1));
     Assert.assertTrue(bitmap.contains(2));
     Assert.assertFalse(bitmap.contains(3));
+
+    Assert.assertFalse(BooleanUtils.toBoolean(_mutableSegmentImpl.getValue(0, outOfOrderRecordColumn)));
+    Assert.assertFalse(BooleanUtils.toBoolean(_mutableSegmentImpl.getValue(1, outOfOrderRecordColumn)));
+    Assert.assertFalse(BooleanUtils.toBoolean(_mutableSegmentImpl.getValue(2, outOfOrderRecordColumn)));
+    Assert.assertTrue(BooleanUtils.toBoolean(_mutableSegmentImpl.getValue(3, outOfOrderRecordColumn)));
   }
 }
diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerTest.java
index feaa8da304..b212a8b918 100644
--- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerTest.java
+++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerTest.java
@@ -91,7 +91,7 @@ public class ConcurrentMapPartitionUpsertMetadataManagerTest {
   public void testStartFinishOperation() {
     ConcurrentMapPartitionUpsertMetadataManager upsertMetadataManager =
         new ConcurrentMapPartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 0, Collections.singletonList("pk"),
-            Collections.singletonList("timeCol"), null, HashFunction.NONE, null, false, false, 0, INDEX_DIR,
+            Collections.singletonList("timeCol"), null, HashFunction.NONE, null, false, 0, INDEX_DIR,
             mock(ServerMetrics.class));
 
     // Start 2 operations
@@ -205,7 +205,7 @@ public class ConcurrentMapPartitionUpsertMetadataManagerTest {
     String comparisonColumn = "timeCol";
     ConcurrentMapPartitionUpsertMetadataManager upsertMetadataManager =
         new ConcurrentMapPartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 0, Collections.singletonList("pk"),
-            Collections.singletonList(comparisonColumn), null, hashFunction, null, false, false, 0, INDEX_DIR,
+            Collections.singletonList(comparisonColumn), null, hashFunction, null, false, 0, INDEX_DIR,
             mock(ServerMetrics.class));
     Map<Object, RecordLocation> recordLocationMap = upsertMetadataManager._primaryKeyToRecordLocationMap;
     Set<IndexSegment> trackedSegments = upsertMetadataManager._trackedSegments;
@@ -368,7 +368,7 @@ public class ConcurrentMapPartitionUpsertMetadataManagerTest {
     String deleteRecordColumn = "deleteCol";
     ConcurrentMapPartitionUpsertMetadataManager upsertMetadataManager =
         new ConcurrentMapPartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 0, Collections.singletonList("pk"),
-            Collections.singletonList(comparisonColumn), deleteRecordColumn, hashFunction, null, false, false, 0,
+            Collections.singletonList(comparisonColumn), deleteRecordColumn, hashFunction, null, false, 0,
             INDEX_DIR, mock(ServerMetrics.class));
     Map<Object, RecordLocation> recordLocationMap = upsertMetadataManager._primaryKeyToRecordLocationMap;
     Set<IndexSegment> trackedSegments = upsertMetadataManager._trackedSegments;
@@ -660,7 +660,7 @@ public class ConcurrentMapPartitionUpsertMetadataManagerTest {
     String comparisonColumn = "timeCol";
     ConcurrentMapPartitionUpsertMetadataManager upsertMetadataManager =
         new ConcurrentMapPartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 0, Collections.singletonList("pk"),
-            Collections.singletonList(comparisonColumn), null, hashFunction, null, false, false, 0, INDEX_DIR,
+            Collections.singletonList(comparisonColumn), null, hashFunction, null, false, 0, INDEX_DIR,
             mock(ServerMetrics.class));
     Map<Object, RecordLocation> recordLocationMap = upsertMetadataManager._primaryKeyToRecordLocationMap;
 
@@ -751,10 +751,9 @@ public class ConcurrentMapPartitionUpsertMetadataManagerTest {
   private void verifyAddOutOfOrderRecord(HashFunction hashFunction)
       throws IOException {
     String comparisonColumn = "timeCol";
-    // here dropOutOfOrderRecord = true
     ConcurrentMapPartitionUpsertMetadataManager upsertMetadataManager =
         new ConcurrentMapPartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 0, Collections.singletonList("pk"),
-            Collections.singletonList(comparisonColumn), null, hashFunction, null, false, true, 0, INDEX_DIR,
+            Collections.singletonList(comparisonColumn), null, hashFunction, null, false, 0, INDEX_DIR,
             mock(ServerMetrics.class));
     Map<Object, RecordLocation> recordLocationMap = upsertMetadataManager._primaryKeyToRecordLocationMap;
 
@@ -773,10 +772,10 @@ public class ConcurrentMapPartitionUpsertMetadataManagerTest {
     ThreadSafeMutableRoaringBitmap validDocIds2 = new ThreadSafeMutableRoaringBitmap();
     MutableSegment segment2 = mockMutableSegment(1, validDocIds2, null);
 
-    // new record, should return true to add it
-    boolean shouldAddRecord =
-        upsertMetadataManager.addRecord(segment2, new RecordInfo(makePrimaryKey(3), 0, new IntWrapper(100), false));
-    assertTrue(shouldAddRecord);
+    // new record, should return false for out of order event
+    boolean isOutOfOrderRecord =
+        !upsertMetadataManager.addRecord(segment2, new RecordInfo(makePrimaryKey(3), 0, new IntWrapper(100), false));
+    assertFalse(isOutOfOrderRecord);
 
     // segment1: 0 -> {0, 100}, 1 -> {1, 120}, 2 -> {2, 100}
     // segment2: 3 -> {0, 100}
@@ -787,15 +786,15 @@ public class ConcurrentMapPartitionUpsertMetadataManagerTest {
     assertEquals(validDocIds1.getMutableRoaringBitmap().toArray(), new int[]{0, 1, 2});
     assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new int[]{0});
 
-    // send an out-of-order event, should return false to drop event
-    shouldAddRecord =
-        upsertMetadataManager.addRecord(segment2, new RecordInfo(makePrimaryKey(2), 1, new IntWrapper(80), false));
-    assertFalse(shouldAddRecord);
+    // send an out-of-order event, should return true for orderness of event
+    isOutOfOrderRecord =
+        !upsertMetadataManager.addRecord(segment2, new RecordInfo(makePrimaryKey(2), 1, new IntWrapper(80), false));
+    assertTrue(isOutOfOrderRecord);
 
     // ordered event for an existing key
-    shouldAddRecord =
-        upsertMetadataManager.addRecord(segment2, new RecordInfo(makePrimaryKey(2), 1, new IntWrapper(150), false));
-    assertTrue(shouldAddRecord);
+    isOutOfOrderRecord =
+        !upsertMetadataManager.addRecord(segment2, new RecordInfo(makePrimaryKey(2), 1, new IntWrapper(150), false));
+    assertFalse(isOutOfOrderRecord);
 
     // segment1: 0 -> {0, 100}, 1 -> {1, 120}
     // segment2: 3 -> {0, 100}, 2 -> {1, 150}
@@ -822,7 +821,7 @@ public class ConcurrentMapPartitionUpsertMetadataManagerTest {
     String comparisonColumn = "timeCol";
     ConcurrentMapPartitionUpsertMetadataManager upsertMetadataManager =
         new ConcurrentMapPartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 0, Collections.singletonList("pk"),
-            Collections.singletonList(comparisonColumn), null, hashFunction, null, false, false, 0, INDEX_DIR,
+            Collections.singletonList(comparisonColumn), null, hashFunction, null, false, 0, INDEX_DIR,
             mock(ServerMetrics.class));
     Map<Object, RecordLocation> recordLocationMap = upsertMetadataManager._primaryKeyToRecordLocationMap;
 
@@ -878,8 +877,8 @@ public class ConcurrentMapPartitionUpsertMetadataManagerTest {
     String deleteColumn = "deleteCol";
     ConcurrentMapPartitionUpsertMetadataManager upsertMetadataManager =
         new ConcurrentMapPartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 0, Collections.singletonList("pk"),
-            Collections.singletonList(comparisonColumn), deleteColumn, hashFunction, null, false, false, 0, INDEX_DIR,
-            mock(ServerMetrics.class));
+            Collections.singletonList(comparisonColumn), deleteColumn, hashFunction, null,
+            false, 0, INDEX_DIR, mock(ServerMetrics.class));
     Map<Object, RecordLocation> recordLocationMap = upsertMetadataManager._primaryKeyToRecordLocationMap;
 
     // queryableDocIds is same as validDocIds in the absence of delete markers
@@ -980,7 +979,7 @@ public class ConcurrentMapPartitionUpsertMetadataManagerTest {
 
     ConcurrentMapPartitionUpsertMetadataManager upsertMetadataManager =
         new ConcurrentMapPartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 0, Collections.singletonList("pk"),
-            Collections.singletonList("timeCol"), null, HashFunction.NONE, null, false, false, 30, tableDir,
+            Collections.singletonList("timeCol"), null, HashFunction.NONE, null, false, 30, tableDir,
             mock(ServerMetrics.class));
     Map<Object, ConcurrentMapPartitionUpsertMetadataManager.RecordLocation> recordLocationMap =
         upsertMetadataManager._primaryKeyToRecordLocationMap;
@@ -1048,7 +1047,7 @@ public class ConcurrentMapPartitionUpsertMetadataManagerTest {
 
     ConcurrentMapPartitionUpsertMetadataManager upsertMetadataManager =
         new ConcurrentMapPartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 0, Collections.singletonList("pk"),
-            Collections.singletonList("timeCol"), null, HashFunction.NONE, null, true, false, 30, tableDir,
+            Collections.singletonList("timeCol"), null, HashFunction.NONE, null, true, 30, tableDir,
             mock(ServerMetrics.class));
     Map<Object, ConcurrentMapPartitionUpsertMetadataManager.RecordLocation> recordLocationMap =
         upsertMetadataManager._primaryKeyToRecordLocationMap;
@@ -1121,7 +1120,7 @@ public class ConcurrentMapPartitionUpsertMetadataManagerTest {
     String deleteRecordColumn = "deleteCol";
     ConcurrentMapPartitionUpsertMetadataManager upsertMetadataManager =
         new ConcurrentMapPartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 0, Collections.singletonList("pk"),
-            Collections.singletonList(comparisonColumn), deleteRecordColumn, HashFunction.NONE, null, true, false, 30,
+            Collections.singletonList(comparisonColumn), deleteRecordColumn, HashFunction.NONE, null, true, 30,
             INDEX_DIR, mock(ServerMetrics.class));
     Map<Object, RecordLocation> recordLocationMap = upsertMetadataManager._primaryKeyToRecordLocationMap;
     Set<IndexSegment> trackedSegments = upsertMetadataManager._trackedSegments;
@@ -1206,7 +1205,7 @@ public class ConcurrentMapPartitionUpsertMetadataManagerTest {
     String deleteRecordColumn = "deleteCol";
     ConcurrentMapPartitionUpsertMetadataManager upsertMetadataManager =
         new ConcurrentMapPartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 0, Collections.singletonList("pk"),
-            Collections.singletonList(comparisonColumn), deleteRecordColumn, HashFunction.NONE, null, true, false, 30,
+            Collections.singletonList(comparisonColumn), deleteRecordColumn, HashFunction.NONE, null, true, 30,
             INDEX_DIR, mock(ServerMetrics.class));
 
     try (MockedConstruction<PinotSegmentColumnReader> deleteColReader = mockConstruction(PinotSegmentColumnReader.class,
@@ -1238,7 +1237,7 @@ public class ConcurrentMapPartitionUpsertMetadataManagerTest {
 
     ConcurrentMapPartitionUpsertMetadataManager upsertMetadataManager =
         new ConcurrentMapPartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 0, Collections.singletonList("pk"),
-            Collections.singletonList("timeCol"), null, HashFunction.NONE, null, true, false, 30, tableDir,
+            Collections.singletonList("timeCol"), null, HashFunction.NONE, null, true, 30, tableDir,
             mock(ServerMetrics.class));
     Map<Object, ConcurrentMapPartitionUpsertMetadataManager.RecordLocation> recordLocationMap =
         upsertMetadataManager._primaryKeyToRecordLocationMap;
@@ -1299,7 +1298,7 @@ public class ConcurrentMapPartitionUpsertMetadataManagerTest {
       throws IOException {
     ConcurrentMapPartitionUpsertMetadataManager upsertMetadataManager =
         new ConcurrentMapPartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 0, Collections.singletonList("pk"),
-            Collections.singletonList("timeCol"), null, HashFunction.NONE, null, true, false, 10, INDEX_DIR,
+            Collections.singletonList("timeCol"), null, HashFunction.NONE, null, true, 10, INDEX_DIR,
             mock(ServerMetrics.class));
 
     double currentTimeMs = System.currentTimeMillis();
diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java
index 1ad4cc74d6..4f691188f6 100644
--- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java
+++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java
@@ -1694,6 +1694,49 @@ public class TableConfigUtilsTest {
     } catch (IllegalStateException e) {
       Assert.fail("Shouldn't fail table creation when delete column type is boolean.");
     }
+
+    // upsert out-of-order configs
+    String outOfOrderRecordColumn = "outOfOrderRecordColumn";
+    boolean dropOutOfOrderRecord = true;
+    schema = new Schema.SchemaBuilder().setSchemaName(TABLE_NAME).setPrimaryKeyColumns(Lists.newArrayList("myPkCol"))
+        .addSingleValueDimension("myCol", FieldSpec.DataType.STRING)
+        .addSingleValueDimension(outOfOrderRecordColumn, FieldSpec.DataType.BOOLEAN).build();
+    streamConfigs = getStreamConfigs();
+    streamConfigs.put("stream.kafka.consumer.type", "simple");
+
+    upsertConfig = new UpsertConfig(UpsertConfig.Mode.FULL);
+    upsertConfig.setDropOutOfOrderRecord(dropOutOfOrderRecord);
+    upsertConfig.setOutOfOrderRecordColumn(outOfOrderRecordColumn);
+    tableConfig = new TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME).setStreamConfigs(streamConfigs)
+        .setUpsertConfig(upsertConfig)
+        .setRoutingConfig(new RoutingConfig(null, null, RoutingConfig.STRICT_REPLICA_GROUP_INSTANCE_SELECTOR_TYPE))
+        .build();
+    try {
+      TableConfigUtils.validateUpsertAndDedupConfig(tableConfig, schema);
+    } catch (IllegalStateException e) {
+      Assert.assertEquals(e.getMessage(),
+          "outOfOrderRecordColumn and dropOutOfOrderRecord shouldn't exist together for upsert table");
+    }
+
+    // outOfOrderRecordColumn not of type BOOLEAN
+    schema = new Schema.SchemaBuilder().setSchemaName(TABLE_NAME).setPrimaryKeyColumns(Lists.newArrayList("myPkCol"))
+        .addSingleValueDimension("myCol", FieldSpec.DataType.STRING)
+        .addSingleValueDimension(outOfOrderRecordColumn, FieldSpec.DataType.STRING).build();
+    streamConfigs = getStreamConfigs();
+    streamConfigs.put("stream.kafka.consumer.type", "simple");
+
+    upsertConfig = new UpsertConfig(UpsertConfig.Mode.FULL);
+    upsertConfig.setOutOfOrderRecordColumn(outOfOrderRecordColumn);
+    tableConfig = new TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME).setStreamConfigs(streamConfigs)
+        .setUpsertConfig(upsertConfig)
+        .setRoutingConfig(new RoutingConfig(null, null, RoutingConfig.STRICT_REPLICA_GROUP_INSTANCE_SELECTOR_TYPE))
+        .build();
+    try {
+      TableConfigUtils.validateUpsertAndDedupConfig(tableConfig, schema);
+    } catch (IllegalStateException e) {
+      Assert.assertEquals(e.getMessage(),
+          "The outOfOrderRecordColumn must be a single-valued BOOLEAN column");
+    }
   }
 
   @Test
diff --git a/pinot-segment-local/src/test/resources/data/test_upsert_comparison_col_schema.json b/pinot-segment-local/src/test/resources/data/test_upsert_comparison_col_schema.json
index 4448f694d1..692ea60daf 100644
--- a/pinot-segment-local/src/test/resources/data/test_upsert_comparison_col_schema.json
+++ b/pinot-segment-local/src/test/resources/data/test_upsert_comparison_col_schema.json
@@ -12,6 +12,10 @@
     {
       "name": "offset",
       "dataType": "LONG"
+    },
+    {
+      "name": "outOfOrderRecordColumn",
+      "dataType": "BOOLEAN"
     }
   ],
   "timeFieldSpec": {
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/TableConfig.java b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/TableConfig.java
index c71b5481d5..19f6397ddb 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/TableConfig.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/TableConfig.java
@@ -392,6 +392,17 @@ public class TableConfig extends BaseJsonConfig {
     return _upsertConfig == null ? null : _upsertConfig.getDeleteRecordColumn();
   }
 
+  @JsonIgnore
+  @Nullable
+  public String getOutOfOrderRecordColumn() {
+    return _upsertConfig == null ? null : _upsertConfig.getOutOfOrderRecordColumn();
+  }
+
+  @JsonIgnore
+  public boolean isDropOutOfOrderRecord() {
+    return _upsertConfig != null && _upsertConfig.isDropOutOfOrderRecord();
+  }
+
   @JsonProperty(TUNER_CONFIG_LIST_KEY)
   public List<TunerConfig> getTunerConfigsList() {
     return _tunerConfigList;
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/UpsertConfig.java b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/UpsertConfig.java
index 2b389b8133..3f9b67c59c 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/UpsertConfig.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/UpsertConfig.java
@@ -57,6 +57,9 @@ public class UpsertConfig extends BaseJsonConfig {
   @JsonPropertyDescription("Boolean column to indicate whether a records should be deleted")
   private String _deleteRecordColumn;
 
+  @JsonPropertyDescription("Boolean column to indicate whether a records is out-of-order")
+  private String _outOfOrderRecordColumn;
+
   @JsonPropertyDescription("Whether to use snapshot for fast upsert metadata recovery")
   private boolean _enableSnapshot;
 
@@ -113,6 +116,11 @@ public class UpsertConfig extends BaseJsonConfig {
     return _deleteRecordColumn;
   }
 
+  @Nullable
+  public String getOutOfOrderRecordColumn() {
+    return _outOfOrderRecordColumn;
+  }
+
   public boolean isEnableSnapshot() {
     return _enableSnapshot;
   }
@@ -184,9 +192,11 @@ public class UpsertConfig extends BaseJsonConfig {
   }
 
   public void setDeleteRecordColumn(String deleteRecordColumn) {
-    if (deleteRecordColumn != null) {
-      _deleteRecordColumn = deleteRecordColumn;
-    }
+    _deleteRecordColumn = deleteRecordColumn;
+  }
+
+  public void setOutOfOrderRecordColumn(String outOfOrderRecordColumn) {
+    _outOfOrderRecordColumn = outOfOrderRecordColumn;
   }
 
   public void setEnableSnapshot(boolean enableSnapshot) {
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/BooleanUtils.java b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/BooleanUtils.java
index aeee5286c0..07566bd14b 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/BooleanUtils.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/BooleanUtils.java
@@ -64,6 +64,17 @@ public class BooleanUtils {
     return toBoolean(booleanString) ? INTERNAL_TRUE : INTERNAL_FALSE;
   }
 
+  /**
+   * Returns the int value (1 for true, 0 for false) for the given boolean value.
+   * <ul>
+   *   <li> 'true' -> '1'</li>
+   *   <li> 'false' -> '0'</li>
+   * </ul>
+   */
+  public static int toInt(boolean booleanValue) {
+    return booleanValue ? INTERNAL_TRUE : INTERNAL_FALSE;
+  }
+
   /**
    * Returns the boolean value for the given non-null Integer object (internal value for BOOLEAN).
    */


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org