You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2023/11/17 21:33:07 UTC
(pinot) branch master updated: Add upsert config - outOfOrderRecordColumn to track out-of-order events (#11877)
This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new c1fdb66ae8 Add upsert config - outOfOrderRecordColumn to track out-of-order events (#11877)
c1fdb66ae8 is described below
commit c1fdb66ae8f8a0127e90f029739abc23229abe11
Author: Pratik Tibrewal <ti...@uber.com>
AuthorDate: Sat Nov 18 03:03:01 2023 +0530
Add upsert config - outOfOrderRecordColumn to track out-of-order events (#11877)
---
.../realtime/RealtimeSegmentDataManager.java | 2 +
...adataAndDictionaryAggregationPlanMakerTest.java | 4 +-
.../indexsegment/mutable/MutableSegmentImpl.java | 12 +++-
.../local/realtime/impl/RealtimeSegmentConfig.java | 28 +++++++-
.../upsert/BasePartitionUpsertMetadataManager.java | 8 ++-
.../upsert/BaseTableUpsertMetadataManager.java | 2 -
...oncurrentMapPartitionUpsertMetadataManager.java | 19 +++---
.../ConcurrentMapTableUpsertMetadataManager.java | 2 +-
.../segment/local/utils/TableConfigUtils.java | 12 ++++
.../mutable/MutableSegmentImplTestUtils.java | 6 +-
.../MutableSegmentImplUpsertComparisonColTest.java | 78 +++++++++++++++++++---
...rrentMapPartitionUpsertMetadataManagerTest.java | 51 +++++++-------
.../segment/local/utils/TableConfigUtilsTest.java | 43 ++++++++++++
.../data/test_upsert_comparison_col_schema.json | 4 ++
.../apache/pinot/spi/config/table/TableConfig.java | 11 +++
.../pinot/spi/config/table/UpsertConfig.java | 16 ++++-
.../org/apache/pinot/spi/utils/BooleanUtils.java | 11 +++
17 files changed, 253 insertions(+), 56 deletions(-)
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
index 087f8c2cd2..3b66bc97fc 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
@@ -1438,6 +1438,8 @@ public class RealtimeSegmentDataManager extends SegmentDataManager {
.setPartitionDedupMetadataManager(partitionDedupMetadataManager)
.setUpsertComparisonColumns(tableConfig.getUpsertComparisonColumns())
.setUpsertDeleteRecordColumn(tableConfig.getUpsertDeleteRecordColumn())
+ .setUpsertOutOfOrderRecordColumn(tableConfig.getOutOfOrderRecordColumn())
+ .setUpsertDropOutOfOrderRecord(tableConfig.isDropOutOfOrderRecord())
.setFieldConfigList(tableConfig.getFieldConfigList());
// Create message decoder
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/plan/maker/MetadataAndDictionaryAggregationPlanMakerTest.java b/pinot-core/src/test/java/org/apache/pinot/core/plan/maker/MetadataAndDictionaryAggregationPlanMakerTest.java
index 9db6b12632..807ef5f9a0 100644
--- a/pinot-core/src/test/java/org/apache/pinot/core/plan/maker/MetadataAndDictionaryAggregationPlanMakerTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/core/plan/maker/MetadataAndDictionaryAggregationPlanMakerTest.java
@@ -131,8 +131,8 @@ public class MetadataAndDictionaryAggregationPlanMakerTest {
_upsertIndexSegment = ImmutableSegmentLoader.load(new File(INDEX_DIR, SEGMENT_NAME), ReadMode.heap);
((ImmutableSegmentImpl) _upsertIndexSegment).enableUpsert(
new ConcurrentMapPartitionUpsertMetadataManager("testTable_REALTIME", 0, Collections.singletonList("column6"),
- Collections.singletonList("daysSinceEpoch"), null, HashFunction.NONE, null, false, false, 0, INDEX_DIR,
- serverMetrics), new ThreadSafeMutableRoaringBitmap(), null);
+ Collections.singletonList("daysSinceEpoch"), null, HashFunction.NONE, null,
+ false, 0, INDEX_DIR, serverMetrics), new ThreadSafeMutableRoaringBitmap(), null);
}
@AfterClass
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java
index 7adeae3d7b..c47d420c82 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java
@@ -165,6 +165,8 @@ public class MutableSegmentImpl implements MutableSegment {
private final PartitionUpsertMetadataManager _partitionUpsertMetadataManager;
private final List<String> _upsertComparisonColumns;
private final String _deleteRecordColumn;
+ private final String _upsertOutOfOrderRecordColumn;
+ private final boolean _upsertDropOutOfOrderRecord;
// The valid doc ids are maintained locally instead of in the upsert metadata manager because:
// 1. There is only one consuming segment per partition, the committed segments do not need to modify the valid doc
// ids for the consuming segment.
@@ -381,6 +383,8 @@ public class MutableSegmentImpl implements MutableSegment {
_upsertComparisonColumns =
upsertComparisonColumns != null ? upsertComparisonColumns : Collections.singletonList(_timeColumnName);
_deleteRecordColumn = config.getUpsertDeleteRecordColumn();
+ _upsertOutOfOrderRecordColumn = config.getUpsertOutOfOrderRecordColumn();
+ _upsertDropOutOfOrderRecord = config.isUpsertDropOutOfOrderRecord();
_validDocIds = new ThreadSafeMutableRoaringBitmap();
if (_deleteRecordColumn != null) {
_queryableDocIds = new ThreadSafeMutableRoaringBitmap();
@@ -392,6 +396,8 @@ public class MutableSegmentImpl implements MutableSegment {
_deleteRecordColumn = null;
_validDocIds = null;
_queryableDocIds = null;
+ _upsertOutOfOrderRecordColumn = null;
+ _upsertDropOutOfOrderRecord = false;
}
}
@@ -496,7 +502,11 @@ public class MutableSegmentImpl implements MutableSegment {
// segment indexing or addNewRow call errors out in those scenario, there can be metadata inconsistency where
// a key is pointing to some other key's docID
// TODO fix this metadata mismatch scenario
- if (_partitionUpsertMetadataManager.addRecord(this, recordInfo)) {
+ boolean isOutOfOrderRecord = !_partitionUpsertMetadataManager.addRecord(this, recordInfo);
+ if (_upsertOutOfOrderRecordColumn != null) {
+ updatedRow.putValue(_upsertOutOfOrderRecordColumn, BooleanUtils.toInt(isOutOfOrderRecord));
+ }
+ if (!isOutOfOrderRecord || !_upsertDropOutOfOrderRecord) {
updateDictionary(updatedRow);
addNewRow(numDocsIndexed, updatedRow);
// Update number of documents indexed before handling the upsert metadata so that the record becomes queryable
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/RealtimeSegmentConfig.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/RealtimeSegmentConfig.java
index 2ad4a177d4..a95a68893c 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/RealtimeSegmentConfig.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/RealtimeSegmentConfig.java
@@ -63,6 +63,8 @@ public class RealtimeSegmentConfig {
private final UpsertConfig.Mode _upsertMode;
private final List<String> _upsertComparisonColumns;
private final String _upsertDeleteRecordColumn;
+ private final String _upsertOutOfOrderRecordColumn;
+ private final boolean _upsertDropOutOfOrderRecord;
private final PartitionUpsertMetadataManager _partitionUpsertMetadataManager;
private final PartitionDedupMetadataManager _partitionDedupMetadataManager;
private final String _consumerDir;
@@ -76,7 +78,8 @@ public class RealtimeSegmentConfig {
PinotDataBufferMemoryManager memoryManager, RealtimeSegmentStatsHistory statsHistory, String partitionColumn,
PartitionFunction partitionFunction, int partitionId, boolean aggregateMetrics, boolean nullHandlingEnabled,
String consumerDir, UpsertConfig.Mode upsertMode, List<String> upsertComparisonColumns,
- String upsertDeleteRecordColumn, PartitionUpsertMetadataManager partitionUpsertMetadataManager,
+ String upsertDeleteRecordColumn, String upsertOutOfOrderRecordColumn, boolean upsertDropOutOfOrderRecord,
+ PartitionUpsertMetadataManager partitionUpsertMetadataManager,
PartitionDedupMetadataManager partitionDedupMetadataManager, List<FieldConfig> fieldConfigList,
List<AggregationConfig> ingestionAggregationConfigs) {
_tableNameWithType = tableNameWithType;
@@ -100,6 +103,8 @@ public class RealtimeSegmentConfig {
_upsertMode = upsertMode != null ? upsertMode : UpsertConfig.Mode.NONE;
_upsertComparisonColumns = upsertComparisonColumns;
_upsertDeleteRecordColumn = upsertDeleteRecordColumn;
+ _upsertOutOfOrderRecordColumn = upsertOutOfOrderRecordColumn;
+ _upsertDropOutOfOrderRecord = upsertDropOutOfOrderRecord;
_partitionUpsertMetadataManager = partitionUpsertMetadataManager;
_partitionDedupMetadataManager = partitionDedupMetadataManager;
_fieldConfigList = fieldConfigList;
@@ -195,6 +200,14 @@ public class RealtimeSegmentConfig {
return _upsertDeleteRecordColumn;
}
+ public String getUpsertOutOfOrderRecordColumn() {
+ return _upsertOutOfOrderRecordColumn;
+ }
+
+ public boolean isUpsertDropOutOfOrderRecord() {
+ return _upsertDropOutOfOrderRecord;
+ }
+
public PartitionUpsertMetadataManager getPartitionUpsertMetadataManager() {
return _partitionUpsertMetadataManager;
}
@@ -233,6 +246,8 @@ public class RealtimeSegmentConfig {
private UpsertConfig.Mode _upsertMode;
private List<String> _upsertComparisonColumns;
private String _upsertDeleteRecordColumn;
+ private String _upsertOutOfOrderRecordColumn;
+ private boolean _upsertDropOutOfOrderRecord;
private PartitionUpsertMetadataManager _partitionUpsertMetadataManager;
private PartitionDedupMetadataManager _partitionDedupMetadataManager;
private List<FieldConfig> _fieldConfigList;
@@ -373,6 +388,16 @@ public class RealtimeSegmentConfig {
return this;
}
+ public Builder setUpsertOutOfOrderRecordColumn(String upsertOutOfOrderRecordColumn) {
+ _upsertOutOfOrderRecordColumn = upsertOutOfOrderRecordColumn;
+ return this;
+ }
+
+ public Builder setUpsertDropOutOfOrderRecord(boolean upsertDropOutOfOrderRecord) {
+ _upsertDropOutOfOrderRecord = upsertDropOutOfOrderRecord;
+ return this;
+ }
+
public Builder setPartitionUpsertMetadataManager(PartitionUpsertMetadataManager partitionUpsertMetadataManager) {
_partitionUpsertMetadataManager = partitionUpsertMetadataManager;
return this;
@@ -403,6 +428,7 @@ public class RealtimeSegmentConfig {
_capacity, _avgNumMultiValues, Collections.unmodifiableMap(indexConfigByCol), _segmentZKMetadata, _offHeap,
_memoryManager, _statsHistory, _partitionColumn, _partitionFunction, _partitionId, _aggregateMetrics,
_nullHandlingEnabled, _consumerDir, _upsertMode, _upsertComparisonColumns, _upsertDeleteRecordColumn,
+ _upsertOutOfOrderRecordColumn, _upsertDropOutOfOrderRecord,
_partitionUpsertMetadataManager, _partitionDedupMetadataManager, _fieldConfigList,
_ingestionAggregationConfigs);
}
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java
index 7d3de0346e..0e54d399bf 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java
@@ -71,7 +71,6 @@ public abstract class BasePartitionUpsertMetadataManager implements PartitionUps
protected final PartialUpsertHandler _partialUpsertHandler;
protected final boolean _enableSnapshot;
protected final double _metadataTTL;
- protected final boolean _dropOutOfOrderRecord;
protected final File _tableIndexDir;
protected final ServerMetrics _serverMetrics;
protected final Logger _logger;
@@ -99,7 +98,7 @@ public abstract class BasePartitionUpsertMetadataManager implements PartitionUps
protected BasePartitionUpsertMetadataManager(String tableNameWithType, int partitionId,
List<String> primaryKeyColumns, List<String> comparisonColumns, @Nullable String deleteRecordColumn,
HashFunction hashFunction, @Nullable PartialUpsertHandler partialUpsertHandler, boolean enableSnapshot,
- boolean dropOutOfOrderRecord, double metadataTTL, File tableIndexDir, ServerMetrics serverMetrics) {
+ double metadataTTL, File tableIndexDir, ServerMetrics serverMetrics) {
_tableNameWithType = tableNameWithType;
_partitionId = partitionId;
_primaryKeyColumns = primaryKeyColumns;
@@ -109,7 +108,6 @@ public abstract class BasePartitionUpsertMetadataManager implements PartitionUps
_partialUpsertHandler = partialUpsertHandler;
_enableSnapshot = enableSnapshot;
_metadataTTL = metadataTTL;
- _dropOutOfOrderRecord = dropOutOfOrderRecord;
_tableIndexDir = tableIndexDir;
_snapshotLock = enableSnapshot ? new ReentrantReadWriteLock() : null;
_serverMetrics = serverMetrics;
@@ -362,6 +360,10 @@ public abstract class BasePartitionUpsertMetadataManager implements PartitionUps
}
}
+ /**
+ Returns {@code true} when the record is added to the upsert metadata manager,
+ {@code false} when the record is out-of-order thus not added.
+ */
protected abstract boolean doAddRecord(MutableSegment segment, RecordInfo recordInfo);
@Override
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BaseTableUpsertMetadataManager.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BaseTableUpsertMetadataManager.java
index 7b6c66c0ae..bc783480b9 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BaseTableUpsertMetadataManager.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BaseTableUpsertMetadataManager.java
@@ -73,7 +73,6 @@ public abstract class BaseTableUpsertMetadataManager implements TableUpsertMetad
protected ServerMetrics _serverMetrics;
protected HelixManager _helixManager;
protected ExecutorService _segmentPreloadExecutor;
- protected boolean _dropOutOfOrderRecord;
private volatile boolean _isPreloading = false;
@@ -112,7 +111,6 @@ public abstract class BaseTableUpsertMetadataManager implements TableUpsertMetad
_enableSnapshot = upsertConfig.isEnableSnapshot();
_metadataTTL = upsertConfig.getMetadataTTL();
- _dropOutOfOrderRecord = upsertConfig.isDropOutOfOrderRecord();
_tableIndexDir = tableDataManager.getTableDataDir();
_serverMetrics = serverMetrics;
_helixManager = helixManager;
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java
index 47f2fa2a96..435bab20d1 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java
@@ -61,9 +61,9 @@ public class ConcurrentMapPartitionUpsertMetadataManager extends BasePartitionUp
public ConcurrentMapPartitionUpsertMetadataManager(String tableNameWithType, int partitionId,
List<String> primaryKeyColumns, List<String> comparisonColumns, @Nullable String deleteRecordColumn,
HashFunction hashFunction, @Nullable PartialUpsertHandler partialUpsertHandler, boolean enableSnapshot,
- boolean dropOutOfOrderRecord, double metadataTTL, File tableIndexDir, ServerMetrics serverMetrics) {
- super(tableNameWithType, partitionId, primaryKeyColumns, comparisonColumns, deleteRecordColumn, hashFunction,
- partialUpsertHandler, enableSnapshot, dropOutOfOrderRecord, metadataTTL, tableIndexDir, serverMetrics);
+ double metadataTTL, File tableIndexDir, ServerMetrics serverMetrics) {
+ super(tableNameWithType, partitionId, primaryKeyColumns, comparisonColumns, deleteRecordColumn,
+ hashFunction, partialUpsertHandler, enableSnapshot, metadataTTL, tableIndexDir, serverMetrics);
}
@Override
@@ -240,9 +240,13 @@ public class ConcurrentMapPartitionUpsertMetadataManager extends BasePartitionUp
persistWatermark(_largestSeenComparisonValue);
}
+ /**
+ Returns {@code true} when the record is added to the upsert metadata manager,
+ {@code false} when the record is out-of-order thus not added.
+ */
@Override
protected boolean doAddRecord(MutableSegment segment, RecordInfo recordInfo) {
- AtomicBoolean shouldDropRecord = new AtomicBoolean(false);
+ AtomicBoolean isOutOfOrderRecord = new AtomicBoolean(false);
ThreadSafeMutableRoaringBitmap validDocIds = Objects.requireNonNull(segment.getValidDocIds());
ThreadSafeMutableRoaringBitmap queryableDocIds = segment.getQueryableDocIds();
int newDocId = recordInfo.getDocId();
@@ -273,9 +277,8 @@ public class ConcurrentMapPartitionUpsertMetadataManager extends BasePartitionUp
return new RecordLocation(segment, newDocId, newComparisonValue);
} else {
handleOutOfOrderEvent(currentRecordLocation.getComparisonValue(), recordInfo.getComparisonValue());
- // this is a out-of-order record, if upsert config _dropOutOfOrderRecord is true, then set
- // shouldDropRecord to true. This method returns inverse of this value
- shouldDropRecord.set(_dropOutOfOrderRecord);
+ // this is a out-of-order record then set value to true - this indicates whether out-of-order or not
+ isOutOfOrderRecord.set(true);
return currentRecordLocation;
}
} else {
@@ -288,7 +291,7 @@ public class ConcurrentMapPartitionUpsertMetadataManager extends BasePartitionUp
// Update metrics
_serverMetrics.setValueOfPartitionGauge(_tableNameWithType, _partitionId, ServerGauge.UPSERT_PRIMARY_KEYS_COUNT,
_primaryKeyToRecordLocationMap.size());
- return !shouldDropRecord.get();
+ return !isOutOfOrderRecord.get();
}
@Override
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapTableUpsertMetadataManager.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapTableUpsertMetadataManager.java
index 55db795883..3380203656 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapTableUpsertMetadataManager.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapTableUpsertMetadataManager.java
@@ -37,7 +37,7 @@ public class ConcurrentMapTableUpsertMetadataManager extends BaseTableUpsertMeta
return _partitionMetadataManagerMap.computeIfAbsent(partitionId,
k -> new ConcurrentMapPartitionUpsertMetadataManager(_tableNameWithType, k, _primaryKeyColumns,
_comparisonColumns, _deleteRecordColumn, _hashFunction, _partialUpsertHandler,
- _enableSnapshot, _dropOutOfOrderRecord, _metadataTTL, _tableIndexDir, _serverMetrics));
+ _enableSnapshot, _metadataTTL, _tableIndexDir, _serverMetrics));
}
@Override
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
index f6d9ea957b..7605925432 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
@@ -723,6 +723,18 @@ public final class TableConfigUtils {
fieldSpec != null && fieldSpec.isSingleValueField() && fieldSpec.getDataType() == DataType.BOOLEAN,
"The delete record column must be a single-valued BOOLEAN column");
}
+
+ String outOfOrderRecordColumn = upsertConfig.getOutOfOrderRecordColumn();
+ Preconditions.checkState(
+ outOfOrderRecordColumn == null || !upsertConfig.isDropOutOfOrderRecord(),
+ "outOfOrderRecordColumn and dropOutOfOrderRecord shouldn't exist together for upsert table");
+
+ if (outOfOrderRecordColumn != null) {
+ FieldSpec fieldSpec = schema.getFieldSpecFor(outOfOrderRecordColumn);
+ Preconditions.checkState(
+ fieldSpec != null && fieldSpec.isSingleValueField() && fieldSpec.getDataType() == DataType.BOOLEAN,
+ "The outOfOrderRecordColumn must be a single-valued BOOLEAN column");
+ }
}
Preconditions.checkState(
diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplTestUtils.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplTestUtils.java
index 10270e5b6f..ade22fcad6 100644
--- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplTestUtils.java
+++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplTestUtils.java
@@ -100,6 +100,8 @@ public class MutableSegmentImplTestUtils {
UpsertConfig.Mode upsertMode = upsertConfig == null ? UpsertConfig.Mode.NONE : upsertConfig.getMode();
List<String> comparisonColumns = upsertConfig == null ? null : upsertConfig.getComparisonColumns();
+ boolean isUpsertDropOutOfOrderRecord = upsertConfig == null ? false : upsertConfig.isDropOutOfOrderRecord();
+ String upsertOutOfOrderRecordColumn = upsertConfig == null ? null : upsertConfig.getOutOfOrderRecordColumn();
DictionaryIndexConfig varLengthDictConf = new DictionaryIndexConfig(false, true);
RealtimeSegmentConfig.Builder segmentConfBuilder = new RealtimeSegmentConfig.Builder()
.setTableNameWithType(TABLE_NAME_WITH_TYPE).setSegmentName(SEGMENT_NAME)
@@ -114,7 +116,9 @@ public class MutableSegmentImplTestUtils {
.setUpsertComparisonColumns(comparisonColumns)
.setPartitionUpsertMetadataManager(partitionUpsertMetadataManager)
.setPartitionDedupMetadataManager(partitionDedupMetadataManager)
- .setIngestionAggregationConfigs(aggregationConfigs);
+ .setIngestionAggregationConfigs(aggregationConfigs)
+ .setUpsertDropOutOfOrderRecord(isUpsertDropOutOfOrderRecord)
+ .setUpsertOutOfOrderRecordColumn(upsertOutOfOrderRecordColumn);
for (Map.Entry<String, JsonIndexConfig> entry : jsonIndexConfigs.entrySet()) {
segmentConfBuilder.setIndex(entry.getKey(), StandardIndexes.json(), entry.getValue());
}
diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplUpsertComparisonColTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplUpsertComparisonColTest.java
index aae9b9cec6..fe17e40dc2 100644
--- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplUpsertComparisonColTest.java
+++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplUpsertComparisonColTest.java
@@ -29,6 +29,7 @@ import org.apache.pinot.segment.local.recordtransformer.CompositeTransformer;
import org.apache.pinot.segment.local.upsert.PartitionUpsertMetadataManager;
import org.apache.pinot.segment.local.upsert.TableUpsertMetadataManager;
import org.apache.pinot.segment.local.upsert.TableUpsertMetadataManagerFactory;
+import org.apache.pinot.spi.config.table.HashFunction;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.config.table.UpsertConfig;
@@ -37,10 +38,10 @@ import org.apache.pinot.spi.data.readers.FileFormat;
import org.apache.pinot.spi.data.readers.GenericRow;
import org.apache.pinot.spi.data.readers.RecordReader;
import org.apache.pinot.spi.data.readers.RecordReaderFactory;
+import org.apache.pinot.spi.utils.BooleanUtils;
import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
import org.roaringbitmap.buffer.ImmutableRoaringBitmap;
import org.testng.Assert;
-import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
import static org.mockito.Mockito.mock;
@@ -55,16 +56,20 @@ public class MutableSegmentImplUpsertComparisonColTest {
private static MutableSegmentImpl _mutableSegmentImpl;
private static PartitionUpsertMetadataManager _partitionUpsertMetadataManager;
- @BeforeClass
- public void setup()
+ private UpsertConfig createFullUpsertConfig(HashFunction hashFunction) {
+ UpsertConfig upsertConfigWithHash = new UpsertConfig(UpsertConfig.Mode.FULL);
+ upsertConfigWithHash.setHashFunction(hashFunction);
+ upsertConfigWithHash.setComparisonColumn("offset");
+ return upsertConfigWithHash;
+ }
+
+ public void setup(UpsertConfig upsertConfig)
throws Exception {
URL schemaResourceUrl = this.getClass().getClassLoader().getResource(SCHEMA_FILE_PATH);
URL dataResourceUrl = this.getClass().getClassLoader().getResource(DATA_FILE_PATH);
_schema = Schema.fromFile(new File(schemaResourceUrl.getFile()));
- UpsertConfig offsetUpsertConfig = new UpsertConfig(UpsertConfig.Mode.FULL);
- offsetUpsertConfig.setComparisonColumn("offset");
_tableConfig =
- new TableConfigBuilder(TableType.REALTIME).setTableName("testTable").setUpsertConfig(offsetUpsertConfig)
+ new TableConfigBuilder(TableType.REALTIME).setTableName("testTable").setUpsertConfig(upsertConfig)
.build();
_recordTransformer = CompositeTransformer.getDefaultTransformer(_tableConfig, _schema);
File jsonFile = new File(dataResourceUrl.getFile());
@@ -74,7 +79,7 @@ public class MutableSegmentImplUpsertComparisonColTest {
_partitionUpsertMetadataManager = tableUpsertMetadataManager.getOrCreatePartitionManager(0);
_mutableSegmentImpl =
MutableSegmentImplTestUtils.createMutableSegmentImpl(_schema, Collections.emptySet(), Collections.emptySet(),
- Collections.emptySet(), false, true, offsetUpsertConfig, "secondsSinceEpoch",
+ Collections.emptySet(), false, true, upsertConfig, "secondsSinceEpoch",
_partitionUpsertMetadataManager, null);
GenericRow reuse = new GenericRow();
try (RecordReader recordReader = RecordReaderFactory.getRecordReader(FileFormat.JSON, jsonFile,
@@ -89,12 +94,69 @@ public class MutableSegmentImplUpsertComparisonColTest {
}
@Test
- public void testUpsertIngestion() {
+ public void testHashFunctions()
+ throws Exception {
+ testUpsertIngestion(createFullUpsertConfig(HashFunction.NONE));
+ testUpsertIngestion(createFullUpsertConfig(HashFunction.MD5));
+ testUpsertIngestion(createFullUpsertConfig(HashFunction.MURMUR3));
+ }
+
+ @Test
+ public void testUpsertDropOutOfOrderRecord()
+ throws Exception {
+ testUpsertDropOfOrderRecordIngestion(createFullUpsertConfig(HashFunction.NONE));
+ testUpsertDropOfOrderRecordIngestion(createFullUpsertConfig(HashFunction.MD5));
+ testUpsertDropOfOrderRecordIngestion(createFullUpsertConfig(HashFunction.MURMUR3));
+ }
+
+ @Test
+ public void testUpsertOutOfOrderRecordColumn()
+ throws Exception {
+ testUpsertOutOfOrderRecordColumnIngestion(createFullUpsertConfig(HashFunction.NONE));
+ testUpsertOutOfOrderRecordColumnIngestion(createFullUpsertConfig(HashFunction.MD5));
+ testUpsertOutOfOrderRecordColumnIngestion(createFullUpsertConfig(HashFunction.MURMUR3));
+ }
+
+ public void testUpsertIngestion(UpsertConfig upsertConfig)
+ throws Exception {
+ setup(upsertConfig);
+ ImmutableRoaringBitmap bitmap = _mutableSegmentImpl.getValidDocIds().getMutableRoaringBitmap();
+ // note offset column is used for determining sequence but not time column
+ Assert.assertEquals(_mutableSegmentImpl.getNumDocsIndexed(), 4);
+ Assert.assertFalse(bitmap.contains(0));
+ Assert.assertTrue(bitmap.contains(1));
+ Assert.assertTrue(bitmap.contains(2));
+ Assert.assertFalse(bitmap.contains(3));
+ }
+
+ public void testUpsertDropOfOrderRecordIngestion(UpsertConfig upsertConfig)
+ throws Exception {
+ upsertConfig.setDropOutOfOrderRecord(true);
+ setup(upsertConfig);
+ ImmutableRoaringBitmap bitmap = _mutableSegmentImpl.getValidDocIds().getMutableRoaringBitmap();
+ // note offset column is used for determining sequence but not time column
+ Assert.assertEquals(_mutableSegmentImpl.getNumDocsIndexed(), 3);
+ Assert.assertFalse(bitmap.contains(0));
+ Assert.assertTrue(bitmap.contains(1));
+ Assert.assertTrue(bitmap.contains(2));
+ }
+
+ public void testUpsertOutOfOrderRecordColumnIngestion(UpsertConfig upsertConfig)
+ throws Exception {
+ String outOfOrderRecordColumn = "outOfOrderRecordColumn";
+ upsertConfig.setOutOfOrderRecordColumn(outOfOrderRecordColumn);
+ setup(upsertConfig);
ImmutableRoaringBitmap bitmap = _mutableSegmentImpl.getValidDocIds().getMutableRoaringBitmap();
// note offset column is used for determining sequence but not time column
+ Assert.assertEquals(_mutableSegmentImpl.getNumDocsIndexed(), 4);
Assert.assertFalse(bitmap.contains(0));
Assert.assertTrue(bitmap.contains(1));
Assert.assertTrue(bitmap.contains(2));
Assert.assertFalse(bitmap.contains(3));
+
+ Assert.assertFalse(BooleanUtils.toBoolean(_mutableSegmentImpl.getValue(0, outOfOrderRecordColumn)));
+ Assert.assertFalse(BooleanUtils.toBoolean(_mutableSegmentImpl.getValue(1, outOfOrderRecordColumn)));
+ Assert.assertFalse(BooleanUtils.toBoolean(_mutableSegmentImpl.getValue(2, outOfOrderRecordColumn)));
+ Assert.assertTrue(BooleanUtils.toBoolean(_mutableSegmentImpl.getValue(3, outOfOrderRecordColumn)));
}
}
diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerTest.java
index feaa8da304..b212a8b918 100644
--- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerTest.java
+++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerTest.java
@@ -91,7 +91,7 @@ public class ConcurrentMapPartitionUpsertMetadataManagerTest {
public void testStartFinishOperation() {
ConcurrentMapPartitionUpsertMetadataManager upsertMetadataManager =
new ConcurrentMapPartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 0, Collections.singletonList("pk"),
- Collections.singletonList("timeCol"), null, HashFunction.NONE, null, false, false, 0, INDEX_DIR,
+ Collections.singletonList("timeCol"), null, HashFunction.NONE, null, false, 0, INDEX_DIR,
mock(ServerMetrics.class));
// Start 2 operations
@@ -205,7 +205,7 @@ public class ConcurrentMapPartitionUpsertMetadataManagerTest {
String comparisonColumn = "timeCol";
ConcurrentMapPartitionUpsertMetadataManager upsertMetadataManager =
new ConcurrentMapPartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 0, Collections.singletonList("pk"),
- Collections.singletonList(comparisonColumn), null, hashFunction, null, false, false, 0, INDEX_DIR,
+ Collections.singletonList(comparisonColumn), null, hashFunction, null, false, 0, INDEX_DIR,
mock(ServerMetrics.class));
Map<Object, RecordLocation> recordLocationMap = upsertMetadataManager._primaryKeyToRecordLocationMap;
Set<IndexSegment> trackedSegments = upsertMetadataManager._trackedSegments;
@@ -368,7 +368,7 @@ public class ConcurrentMapPartitionUpsertMetadataManagerTest {
String deleteRecordColumn = "deleteCol";
ConcurrentMapPartitionUpsertMetadataManager upsertMetadataManager =
new ConcurrentMapPartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 0, Collections.singletonList("pk"),
- Collections.singletonList(comparisonColumn), deleteRecordColumn, hashFunction, null, false, false, 0,
+ Collections.singletonList(comparisonColumn), deleteRecordColumn, hashFunction, null, false, 0,
INDEX_DIR, mock(ServerMetrics.class));
Map<Object, RecordLocation> recordLocationMap = upsertMetadataManager._primaryKeyToRecordLocationMap;
Set<IndexSegment> trackedSegments = upsertMetadataManager._trackedSegments;
@@ -660,7 +660,7 @@ public class ConcurrentMapPartitionUpsertMetadataManagerTest {
String comparisonColumn = "timeCol";
ConcurrentMapPartitionUpsertMetadataManager upsertMetadataManager =
new ConcurrentMapPartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 0, Collections.singletonList("pk"),
- Collections.singletonList(comparisonColumn), null, hashFunction, null, false, false, 0, INDEX_DIR,
+ Collections.singletonList(comparisonColumn), null, hashFunction, null, false, 0, INDEX_DIR,
mock(ServerMetrics.class));
Map<Object, RecordLocation> recordLocationMap = upsertMetadataManager._primaryKeyToRecordLocationMap;
@@ -751,10 +751,9 @@ public class ConcurrentMapPartitionUpsertMetadataManagerTest {
private void verifyAddOutOfOrderRecord(HashFunction hashFunction)
throws IOException {
String comparisonColumn = "timeCol";
- // here dropOutOfOrderRecord = true
ConcurrentMapPartitionUpsertMetadataManager upsertMetadataManager =
new ConcurrentMapPartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 0, Collections.singletonList("pk"),
- Collections.singletonList(comparisonColumn), null, hashFunction, null, false, true, 0, INDEX_DIR,
+ Collections.singletonList(comparisonColumn), null, hashFunction, null, false, 0, INDEX_DIR,
mock(ServerMetrics.class));
Map<Object, RecordLocation> recordLocationMap = upsertMetadataManager._primaryKeyToRecordLocationMap;
@@ -773,10 +772,10 @@ public class ConcurrentMapPartitionUpsertMetadataManagerTest {
ThreadSafeMutableRoaringBitmap validDocIds2 = new ThreadSafeMutableRoaringBitmap();
MutableSegment segment2 = mockMutableSegment(1, validDocIds2, null);
- // new record, should return true to add it
- boolean shouldAddRecord =
- upsertMetadataManager.addRecord(segment2, new RecordInfo(makePrimaryKey(3), 0, new IntWrapper(100), false));
- assertTrue(shouldAddRecord);
+ // new record, should return false for out of order event
+ boolean isOutOfOrderRecord =
+ !upsertMetadataManager.addRecord(segment2, new RecordInfo(makePrimaryKey(3), 0, new IntWrapper(100), false));
+ assertFalse(isOutOfOrderRecord);
// segment1: 0 -> {0, 100}, 1 -> {1, 120}, 2 -> {2, 100}
// segment2: 3 -> {0, 100}
@@ -787,15 +786,15 @@ public class ConcurrentMapPartitionUpsertMetadataManagerTest {
assertEquals(validDocIds1.getMutableRoaringBitmap().toArray(), new int[]{0, 1, 2});
assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new int[]{0});
- // send an out-of-order event, should return false to drop event
- shouldAddRecord =
- upsertMetadataManager.addRecord(segment2, new RecordInfo(makePrimaryKey(2), 1, new IntWrapper(80), false));
- assertFalse(shouldAddRecord);
+ // send an out-of-order event, should return true for orderness of event
+ isOutOfOrderRecord =
+ !upsertMetadataManager.addRecord(segment2, new RecordInfo(makePrimaryKey(2), 1, new IntWrapper(80), false));
+ assertTrue(isOutOfOrderRecord);
// ordered event for an existing key
- shouldAddRecord =
- upsertMetadataManager.addRecord(segment2, new RecordInfo(makePrimaryKey(2), 1, new IntWrapper(150), false));
- assertTrue(shouldAddRecord);
+ isOutOfOrderRecord =
+ !upsertMetadataManager.addRecord(segment2, new RecordInfo(makePrimaryKey(2), 1, new IntWrapper(150), false));
+ assertFalse(isOutOfOrderRecord);
// segment1: 0 -> {0, 100}, 1 -> {1, 120}
// segment2: 3 -> {0, 100}, 2 -> {1, 150}
@@ -822,7 +821,7 @@ public class ConcurrentMapPartitionUpsertMetadataManagerTest {
String comparisonColumn = "timeCol";
ConcurrentMapPartitionUpsertMetadataManager upsertMetadataManager =
new ConcurrentMapPartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 0, Collections.singletonList("pk"),
- Collections.singletonList(comparisonColumn), null, hashFunction, null, false, false, 0, INDEX_DIR,
+ Collections.singletonList(comparisonColumn), null, hashFunction, null, false, 0, INDEX_DIR,
mock(ServerMetrics.class));
Map<Object, RecordLocation> recordLocationMap = upsertMetadataManager._primaryKeyToRecordLocationMap;
@@ -878,8 +877,8 @@ public class ConcurrentMapPartitionUpsertMetadataManagerTest {
String deleteColumn = "deleteCol";
ConcurrentMapPartitionUpsertMetadataManager upsertMetadataManager =
new ConcurrentMapPartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 0, Collections.singletonList("pk"),
- Collections.singletonList(comparisonColumn), deleteColumn, hashFunction, null, false, false, 0, INDEX_DIR,
- mock(ServerMetrics.class));
+ Collections.singletonList(comparisonColumn), deleteColumn, hashFunction, null,
+ false, 0, INDEX_DIR, mock(ServerMetrics.class));
Map<Object, RecordLocation> recordLocationMap = upsertMetadataManager._primaryKeyToRecordLocationMap;
// queryableDocIds is same as validDocIds in the absence of delete markers
@@ -980,7 +979,7 @@ public class ConcurrentMapPartitionUpsertMetadataManagerTest {
ConcurrentMapPartitionUpsertMetadataManager upsertMetadataManager =
new ConcurrentMapPartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 0, Collections.singletonList("pk"),
- Collections.singletonList("timeCol"), null, HashFunction.NONE, null, false, false, 30, tableDir,
+ Collections.singletonList("timeCol"), null, HashFunction.NONE, null, false, 30, tableDir,
mock(ServerMetrics.class));
Map<Object, ConcurrentMapPartitionUpsertMetadataManager.RecordLocation> recordLocationMap =
upsertMetadataManager._primaryKeyToRecordLocationMap;
@@ -1048,7 +1047,7 @@ public class ConcurrentMapPartitionUpsertMetadataManagerTest {
ConcurrentMapPartitionUpsertMetadataManager upsertMetadataManager =
new ConcurrentMapPartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 0, Collections.singletonList("pk"),
- Collections.singletonList("timeCol"), null, HashFunction.NONE, null, true, false, 30, tableDir,
+ Collections.singletonList("timeCol"), null, HashFunction.NONE, null, true, 30, tableDir,
mock(ServerMetrics.class));
Map<Object, ConcurrentMapPartitionUpsertMetadataManager.RecordLocation> recordLocationMap =
upsertMetadataManager._primaryKeyToRecordLocationMap;
@@ -1121,7 +1120,7 @@ public class ConcurrentMapPartitionUpsertMetadataManagerTest {
String deleteRecordColumn = "deleteCol";
ConcurrentMapPartitionUpsertMetadataManager upsertMetadataManager =
new ConcurrentMapPartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 0, Collections.singletonList("pk"),
- Collections.singletonList(comparisonColumn), deleteRecordColumn, HashFunction.NONE, null, true, false, 30,
+ Collections.singletonList(comparisonColumn), deleteRecordColumn, HashFunction.NONE, null, true, 30,
INDEX_DIR, mock(ServerMetrics.class));
Map<Object, RecordLocation> recordLocationMap = upsertMetadataManager._primaryKeyToRecordLocationMap;
Set<IndexSegment> trackedSegments = upsertMetadataManager._trackedSegments;
@@ -1206,7 +1205,7 @@ public class ConcurrentMapPartitionUpsertMetadataManagerTest {
String deleteRecordColumn = "deleteCol";
ConcurrentMapPartitionUpsertMetadataManager upsertMetadataManager =
new ConcurrentMapPartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 0, Collections.singletonList("pk"),
- Collections.singletonList(comparisonColumn), deleteRecordColumn, HashFunction.NONE, null, true, false, 30,
+ Collections.singletonList(comparisonColumn), deleteRecordColumn, HashFunction.NONE, null, true, 30,
INDEX_DIR, mock(ServerMetrics.class));
try (MockedConstruction<PinotSegmentColumnReader> deleteColReader = mockConstruction(PinotSegmentColumnReader.class,
@@ -1238,7 +1237,7 @@ public class ConcurrentMapPartitionUpsertMetadataManagerTest {
ConcurrentMapPartitionUpsertMetadataManager upsertMetadataManager =
new ConcurrentMapPartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 0, Collections.singletonList("pk"),
- Collections.singletonList("timeCol"), null, HashFunction.NONE, null, true, false, 30, tableDir,
+ Collections.singletonList("timeCol"), null, HashFunction.NONE, null, true, 30, tableDir,
mock(ServerMetrics.class));
Map<Object, ConcurrentMapPartitionUpsertMetadataManager.RecordLocation> recordLocationMap =
upsertMetadataManager._primaryKeyToRecordLocationMap;
@@ -1299,7 +1298,7 @@ public class ConcurrentMapPartitionUpsertMetadataManagerTest {
throws IOException {
ConcurrentMapPartitionUpsertMetadataManager upsertMetadataManager =
new ConcurrentMapPartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 0, Collections.singletonList("pk"),
- Collections.singletonList("timeCol"), null, HashFunction.NONE, null, true, false, 10, INDEX_DIR,
+ Collections.singletonList("timeCol"), null, HashFunction.NONE, null, true, 10, INDEX_DIR,
mock(ServerMetrics.class));
double currentTimeMs = System.currentTimeMillis();
diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java
index 1ad4cc74d6..4f691188f6 100644
--- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java
+++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java
@@ -1694,6 +1694,49 @@ public class TableConfigUtilsTest {
} catch (IllegalStateException e) {
Assert.fail("Shouldn't fail table creation when delete column type is boolean.");
}
+
+ // upsert out-of-order configs
+ String outOfOrderRecordColumn = "outOfOrderRecordColumn";
+ boolean dropOutOfOrderRecord = true;
+ schema = new Schema.SchemaBuilder().setSchemaName(TABLE_NAME).setPrimaryKeyColumns(Lists.newArrayList("myPkCol"))
+ .addSingleValueDimension("myCol", FieldSpec.DataType.STRING)
+ .addSingleValueDimension(outOfOrderRecordColumn, FieldSpec.DataType.BOOLEAN).build();
+ streamConfigs = getStreamConfigs();
+ streamConfigs.put("stream.kafka.consumer.type", "simple");
+
+ upsertConfig = new UpsertConfig(UpsertConfig.Mode.FULL);
+ upsertConfig.setDropOutOfOrderRecord(dropOutOfOrderRecord);
+ upsertConfig.setOutOfOrderRecordColumn(outOfOrderRecordColumn);
+ tableConfig = new TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME).setStreamConfigs(streamConfigs)
+ .setUpsertConfig(upsertConfig)
+ .setRoutingConfig(new RoutingConfig(null, null, RoutingConfig.STRICT_REPLICA_GROUP_INSTANCE_SELECTOR_TYPE))
+ .build();
+ try {
+ TableConfigUtils.validateUpsertAndDedupConfig(tableConfig, schema);
+ } catch (IllegalStateException e) {
+ Assert.assertEquals(e.getMessage(),
+ "outOfOrderRecordColumn and dropOutOfOrderRecord shouldn't exist together for upsert table");
+ }
+
+ // outOfOrderRecordColumn not of type BOOLEAN
+ schema = new Schema.SchemaBuilder().setSchemaName(TABLE_NAME).setPrimaryKeyColumns(Lists.newArrayList("myPkCol"))
+ .addSingleValueDimension("myCol", FieldSpec.DataType.STRING)
+ .addSingleValueDimension(outOfOrderRecordColumn, FieldSpec.DataType.STRING).build();
+ streamConfigs = getStreamConfigs();
+ streamConfigs.put("stream.kafka.consumer.type", "simple");
+
+ upsertConfig = new UpsertConfig(UpsertConfig.Mode.FULL);
+ upsertConfig.setOutOfOrderRecordColumn(outOfOrderRecordColumn);
+ tableConfig = new TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME).setStreamConfigs(streamConfigs)
+ .setUpsertConfig(upsertConfig)
+ .setRoutingConfig(new RoutingConfig(null, null, RoutingConfig.STRICT_REPLICA_GROUP_INSTANCE_SELECTOR_TYPE))
+ .build();
+ try {
+ TableConfigUtils.validateUpsertAndDedupConfig(tableConfig, schema);
+ } catch (IllegalStateException e) {
+ Assert.assertEquals(e.getMessage(),
+ "The outOfOrderRecordColumn must be a single-valued BOOLEAN column");
+ }
}
@Test
diff --git a/pinot-segment-local/src/test/resources/data/test_upsert_comparison_col_schema.json b/pinot-segment-local/src/test/resources/data/test_upsert_comparison_col_schema.json
index 4448f694d1..692ea60daf 100644
--- a/pinot-segment-local/src/test/resources/data/test_upsert_comparison_col_schema.json
+++ b/pinot-segment-local/src/test/resources/data/test_upsert_comparison_col_schema.json
@@ -12,6 +12,10 @@
{
"name": "offset",
"dataType": "LONG"
+ },
+ {
+ "name": "outOfOrderRecordColumn",
+ "dataType": "BOOLEAN"
}
],
"timeFieldSpec": {
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/TableConfig.java b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/TableConfig.java
index c71b5481d5..19f6397ddb 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/TableConfig.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/TableConfig.java
@@ -392,6 +392,17 @@ public class TableConfig extends BaseJsonConfig {
return _upsertConfig == null ? null : _upsertConfig.getDeleteRecordColumn();
}
+ @JsonIgnore
+ @Nullable
+ public String getOutOfOrderRecordColumn() {
+ return _upsertConfig == null ? null : _upsertConfig.getOutOfOrderRecordColumn();
+ }
+
+ @JsonIgnore
+ public boolean isDropOutOfOrderRecord() {
+ return _upsertConfig != null && _upsertConfig.isDropOutOfOrderRecord();
+ }
+
@JsonProperty(TUNER_CONFIG_LIST_KEY)
public List<TunerConfig> getTunerConfigsList() {
return _tunerConfigList;
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/UpsertConfig.java b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/UpsertConfig.java
index 2b389b8133..3f9b67c59c 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/UpsertConfig.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/UpsertConfig.java
@@ -57,6 +57,9 @@ public class UpsertConfig extends BaseJsonConfig {
@JsonPropertyDescription("Boolean column to indicate whether a records should be deleted")
private String _deleteRecordColumn;
+ @JsonPropertyDescription("Boolean column to indicate whether a records is out-of-order")
+ private String _outOfOrderRecordColumn;
+
@JsonPropertyDescription("Whether to use snapshot for fast upsert metadata recovery")
private boolean _enableSnapshot;
@@ -113,6 +116,11 @@ public class UpsertConfig extends BaseJsonConfig {
return _deleteRecordColumn;
}
+ @Nullable
+ public String getOutOfOrderRecordColumn() {
+ return _outOfOrderRecordColumn;
+ }
+
public boolean isEnableSnapshot() {
return _enableSnapshot;
}
@@ -184,9 +192,11 @@ public class UpsertConfig extends BaseJsonConfig {
}
public void setDeleteRecordColumn(String deleteRecordColumn) {
- if (deleteRecordColumn != null) {
- _deleteRecordColumn = deleteRecordColumn;
- }
+ _deleteRecordColumn = deleteRecordColumn;
+ }
+
+ public void setOutOfOrderRecordColumn(String outOfOrderRecordColumn) {
+ _outOfOrderRecordColumn = outOfOrderRecordColumn;
}
public void setEnableSnapshot(boolean enableSnapshot) {
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/BooleanUtils.java b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/BooleanUtils.java
index aeee5286c0..07566bd14b 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/BooleanUtils.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/BooleanUtils.java
@@ -64,6 +64,17 @@ public class BooleanUtils {
return toBoolean(booleanString) ? INTERNAL_TRUE : INTERNAL_FALSE;
}
+ /**
+ * Returns the int value (1 for true, 0 for false) for the given boolean value.
+ * <ul>
+ * <li> 'true' -> '1'</li>
+ * <li> 'false' -> '0'</li>
+ * </ul>
+ */
+ public static int toInt(boolean booleanValue) {
+ return booleanValue ? INTERNAL_TRUE : INTERNAL_FALSE;
+ }
+
/**
* Returns the boolean value for the given non-null Integer object (internal value for BOOLEAN).
*/
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org